Source code for apem.config_loader

import json
from numbers import Integral, Real
from typing import Any, Dict

from apem.order_book_based_model.euphemia.enums.cut_types import CutTypes
from apem.unit_based_model.allocation.algorithms.nodal_clearing.dcopf import DCOPF
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_fbmc_included import Zonal_FBMC
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_ntc_multiedge import Zonal_NTC_multiedge
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_ntc_aggregated import Zonal_NTC_aggregated
from apem.order_book_based_model.euphemia.enums.datasets import OrderBookBased_Datasets
from apem.core import MarketModels
from apem.unit_based_model.enums import FBMCBaseCases, PricingAlgorithms, RedispatchAlgorithms, UnitBased_Datasets


[docs] class ConfigLoader: """ Entry point for reading `config.json`. It normalizes the user-facing schema, validates allowed combinations (datasets, models, pricing, redispatch, solver options), and exposes typed getters used by the execution layer. """ def __init__(self, config_path: str = "config.json"): """Initialize a configuration from disk. :param config_path: Path to a JSON configuration file. """ self.config_path = config_path self.raw_config = self._normalize_config_format(self._load_raw_config()) self._validate_config() self.config = self._filter_documentation_fields() def _load_raw_config(self) -> Dict[str, Any]: with open(self.config_path, "r", encoding="utf-8-sig") as f: return json.load(f) def _normalize_config_format(self, raw: Dict[str, Any]) -> Dict[str, Any]: """ Accept model-scoped config format with canonical keys ('run', 'unit_based_model', 'order_book_based_model'). """ if "run" not in raw: raise ValueError( "Invalid config format: expected model-scoped format " "with a top-level 'run' section." ) return self._normalize_model_scoped_config(raw) def _normalize_model_scoped_config(self, raw: Dict[str, Any]) -> Dict[str, Any]: run_cfg = raw.get("run", {}) unit_cfg = raw.get("unit_based_model", {}) order_book_cfg = raw.get("order_book_based_model", {}) if not isinstance(run_cfg, dict): raise ValueError("Invalid run: expected an object.") if unit_cfg is None: unit_cfg = {} if order_book_cfg is None: order_book_cfg = {} if not isinstance(unit_cfg, dict): raise ValueError("Invalid unit_based_model: expected an object.") if not isinstance(order_book_cfg, dict): raise ValueError("Invalid order_book_based_model: expected an object.") redispatch_cfg = unit_cfg.get("redispatch", {}) if redispatch_cfg is None: redispatch_cfg = {} if not isinstance(redispatch_cfg, dict): raise ValueError("Invalid unit_based_model.redispatch: expected an object.") unit_solver_cfg = unit_cfg.get("solver_configuration") market_model = run_cfg.get("market_model", "unit_based_model") normalized = { "verbosity": run_cfg.get("verbosity", raw.get("verbosity", True)), "scenario": { "market_model": market_model, "unit_based_dataset": unit_cfg.get("dataset", "IEEE_RTS"), "order_book_based_dataset": order_book_cfg.get("dataset", "GENERATED_SMALL"), "power_flow_model": unit_cfg.get("power_flow_model", {"type": "DCOPF"}), "cut_type": order_book_cfg.get("cut_type", CutTypes.PB.value), "pricing_algorithm": unit_cfg.get("pricing_algorithm", "IP"), "redispatch_algorithm": redispatch_cfg.get("algorithm", "MinCostRD"), "redispatch_constraint_units": redispatch_cfg.get("constraint_units", False), "redispatch_threshold": redispatch_cfg.get("threshold", 0.0), "alpha": redispatch_cfg.get("alpha", 0.0), }, "euphemia_configuration": order_book_cfg.get("euphemia_configuration", raw.get("euphemia_configuration", {})), "zonal_configuration": unit_cfg.get( "zonal_configuration", raw.get("zonal_configuration", {"type": "zonal_DE2-s", "factor": 0.8, "base_case": "BC4"}), ), } if unit_solver_cfg is not None: normalized["unit_based_solver_configuration"] = unit_solver_cfg # Preserve optional documentation/helper fields. for key, value in raw.items(): if key.startswith("_"): normalized[key] = value return normalized def _filter_documentation_fields(self) -> Dict[str, Any]: """Remove documentation fields (those starting with _) from the config.""" return {k: v for k, v in self.raw_config.items() if not k.startswith("_")} def _validate_config(self): # Validate datasets if self.raw_config["scenario"]["unit_based_dataset"] not in [d.name for d in UnitBased_Datasets]: raise ValueError(f"Invalid unit_based_dataset: {self.raw_config['scenario']['unit_based_dataset']}") if self.raw_config["scenario"]["order_book_based_dataset"] not in [d.name for d in OrderBookBased_Datasets]: raise ValueError( f"Invalid order_book_based_dataset: {self.raw_config['scenario']['order_book_based_dataset']}" ) # Validate market model if self.raw_config["scenario"]["market_model"] not in ["unit_based_model", "order_book_based_model"]: raise ValueError(f"Invalid market model: {self.raw_config['scenario']['market_model']}") # Validate power flow model if self.raw_config["scenario"]["power_flow_model"]["type"] not in [ "DCOPF", "Zonal_NTC_aggregated", "Zonal_NTC_multiedge", "Zonal_FBMC", ]: raise ValueError( f"Invalid power flow model: {self.raw_config['scenario']['power_flow_model']['type']}" ) # Validate cut type if self.raw_config["scenario"]["cut_type"] not in [c.value for c in CutTypes]: raise ValueError(f"Invalid cut type: {self.raw_config['scenario']['cut_type']}") # Validate pricing algorithm if self.raw_config["scenario"]["pricing_algorithm"] not in [p.name for p in PricingAlgorithms]: raise ValueError(f"Invalid pricing algorithm: {self.raw_config['scenario']['pricing_algorithm']}") # Validate redispatch algorithm if self.raw_config["scenario"]["redispatch_algorithm"] not in [r.name for r in RedispatchAlgorithms]: raise ValueError(f"Invalid redispatch algorithm: {self.raw_config['scenario']['redispatch_algorithm']}") # Validate redispatch constraint units if self.raw_config["scenario"]["redispatch_constraint_units"] not in [True, False]: raise ValueError( f"Invalid redispatch constraint: {self.raw_config['scenario']['redispatch_constraint_units']}" ) # Validate redispatch threshold if self.raw_config["scenario"]["redispatch_threshold"] < 0: raise ValueError(f"Invalid redispatch threshold: {self.raw_config['scenario']['redispatch_threshold']}") # Validate alpha for markup pricing if not 0 <= self.raw_config["scenario"]["alpha"] < 1: raise ValueError(f"Invalid alpha: {self.raw_config['scenario']['alpha']}") # Validate zonal configuration when a zonal model is selected pf_type = self.raw_config["scenario"]["power_flow_model"]["type"] if pf_type in ["Zonal_NTC_aggregated", "Zonal_NTC_multiedge", "Zonal_FBMC"]: zonal_config = self.raw_config["zonal_configuration"] available_configs = self.raw_config.get("_available_zonal_configurations", []) if available_configs and zonal_config["type"] not in available_configs: raise ValueError(f"Invalid zonal configuration type: {zonal_config['type']}") if pf_type == "Zonal_NTC_aggregated": if not 0 <= zonal_config["factor"] <= 1: raise ValueError(f"Invalid zonal factor: {zonal_config['factor']}. Must be between 0 and 1.") if pf_type == "Zonal_NTC_multiedge": if not 0 <= zonal_config["factor"] <= 1: raise ValueError(f"Invalid zonal factor: {zonal_config['factor']}. Must be between 0 and 1.") if pf_type == "Zonal_FBMC": available_base_cases = self.raw_config.get("_available_base_cases", ["BC1"]) base_case = zonal_config.get("base_case", available_base_cases[0]) self.raw_config["zonal_configuration"]["base_case"] = base_case if base_case not in [c.value for c in FBMCBaseCases]: raise ValueError(f"Invalid FBMC base case: {base_case}.") self._validate_unit_based_solver_configuration() self._validate_euphemia_configuration() def _validate_unit_based_solver_configuration(self) -> None: if self.raw_config["scenario"]["market_model"] != "unit_based_model": return if "unit_based_solver_configuration" not in self.raw_config: raise ValueError( "Missing solver configuration: provide 'unit_based_model.solver_configuration'." ) if not isinstance(self.raw_config["unit_based_solver_configuration"], dict): raise ValueError("Invalid unit_based_solver_configuration: expected an object.") cfg = self.raw_config["unit_based_solver_configuration"] if "slack_penalty" in cfg: if not self._is_number(cfg["slack_penalty"]): raise ValueError("Invalid unit_based_solver_configuration.slack_penalty: must be numeric.") if cfg["slack_penalty"] <= 0: raise ValueError("Invalid unit_based_solver_configuration.slack_penalty: must be > 0.") @staticmethod def _is_number(value: Any) -> bool: return isinstance(value, Real) and not isinstance(value, bool) @staticmethod def _is_int(value: Any) -> bool: return isinstance(value, Integral) and not isinstance(value, bool) def _validate_euphemia_configuration(self) -> None: cfg = self.raw_config.get("euphemia_configuration", {}) if cfg is None: return if not isinstance(cfg, dict): raise ValueError("Invalid euphemia_configuration: expected an object.") allowed = { "disable_reinsertion", "calculate_corrected_welfare", "price_lower_bound", "price_upper_bound", "beta_MIC", "delta_load_gradient", "delta_PAB", "epsilon", "max_iterations", "reinsertion_max_iterations", "max_prb_reinsertion_attempts", "big_m", "network_model", "lazy_constraints", "output_flag", "time_limit", "mip_gap", "threads", "seed", } unknown = sorted(set(cfg) - allowed) if unknown: raise ValueError(f"Invalid euphemia_configuration key(s): {', '.join(unknown)}") bool_fields = {"disable_reinsertion", "calculate_corrected_welfare"} int_fields = { "max_iterations", "reinsertion_max_iterations", "max_prb_reinsertion_attempts", "threads", "seed", "output_flag", "lazy_constraints", } number_fields = {"price_lower_bound", "price_upper_bound", "beta_MIC", "delta_load_gradient", "delta_PAB", "epsilon", "big_m", "time_limit", "mip_gap"} for field in bool_fields: if field in cfg and not isinstance(cfg[field], bool): raise ValueError(f"Invalid euphemia_configuration.{field}: must be boolean.") for field in int_fields: if field in cfg and cfg[field] is not None and not self._is_int(cfg[field]): raise ValueError(f"Invalid euphemia_configuration.{field}: must be integer.") for field in number_fields: if field in cfg and not self._is_number(cfg[field]): raise ValueError(f"Invalid euphemia_configuration.{field}: must be numeric.") if "network_model" in cfg: if not isinstance(cfg["network_model"], str): raise ValueError("Invalid euphemia_configuration.network_model: must be a string.") network_model = cfg["network_model"].upper() if network_model not in {"ATC", "FBMC"}: raise ValueError("Invalid euphemia_configuration.network_model: must be 'ATC' or 'FBMC'.") cfg["network_model"] = network_model if "beta_MIC" in cfg and not 0 <= cfg["beta_MIC"] <= 1: raise ValueError("Invalid euphemia_configuration.beta_MIC: must be in [0, 1].") if "delta_load_gradient" in cfg and cfg["delta_load_gradient"] < 0: raise ValueError("Invalid euphemia_configuration.delta_load_gradient: must be >= 0.") if "delta_PAB" in cfg and cfg["delta_PAB"] < 0: raise ValueError("Invalid euphemia_configuration.delta_PAB: must be >= 0.") if "epsilon" in cfg and cfg["epsilon"] <= 0: raise ValueError("Invalid euphemia_configuration.epsilon: must be > 0.") if "max_iterations" in cfg and cfg["max_iterations"] <= 0: raise ValueError("Invalid euphemia_configuration.max_iterations: must be > 0.") if "reinsertion_max_iterations" in cfg and cfg["reinsertion_max_iterations"] <= 0: raise ValueError("Invalid euphemia_configuration.reinsertion_max_iterations: must be > 0.") if ( "max_prb_reinsertion_attempts" in cfg and cfg["max_prb_reinsertion_attempts"] is not None and cfg["max_prb_reinsertion_attempts"] <= 0 ): raise ValueError("Invalid euphemia_configuration.max_prb_reinsertion_attempts: must be > 0 or null.") if "big_m" in cfg and cfg["big_m"] <= 0: raise ValueError("Invalid euphemia_configuration.big_m: must be > 0.") if "threads" in cfg and cfg["threads"] < 0: raise ValueError("Invalid euphemia_configuration.threads: must be >= 0.") if "seed" in cfg and cfg["seed"] < 0: raise ValueError("Invalid euphemia_configuration.seed: must be >= 0.") if "time_limit" in cfg and cfg["time_limit"] <= 0: raise ValueError("Invalid euphemia_configuration.time_limit: must be > 0.") if "mip_gap" in cfg and cfg["mip_gap"] < 0: raise ValueError("Invalid euphemia_configuration.mip_gap: must be >= 0.") if "output_flag" in cfg and cfg["output_flag"] not in [0, 1]: raise ValueError("Invalid euphemia_configuration.output_flag: must be 0 or 1.") if "lazy_constraints" in cfg and cfg["lazy_constraints"] not in [0, 1]: raise ValueError("Invalid euphemia_configuration.lazy_constraints: must be 0 or 1.") price_lower = cfg.get("price_lower_bound", -500) price_upper = cfg.get("price_upper_bound", 4000) if price_lower >= price_upper: raise ValueError("Invalid euphemia_configuration: price_lower_bound must be < price_upper_bound.") def get_unit_based_dataset(self) -> UnitBased_Datasets: return UnitBased_Datasets[self.config["scenario"]["unit_based_dataset"]] def get_order_book_based_dataset(self) -> OrderBookBased_Datasets: return OrderBookBased_Datasets[self.config["scenario"]["order_book_based_dataset"]]
[docs] def get_market_model(self) -> MarketModels: """Return the selected high-level market model enum.""" return MarketModels[self.config["scenario"]["market_model"]]
[docs] def get_power_flow_model(self): """Instantiate the configured power-flow model object.""" model_type = self.config["scenario"]["power_flow_model"]["type"] if model_type == "Zonal_NTC_aggregated": zonal_config = self.config["zonal_configuration"] return Zonal_NTC_aggregated(zonal_configuration=zonal_config["type"], factor=zonal_config["factor"]) if model_type == "Zonal_NTC_multiedge": zonal_config = self.config["zonal_configuration"] return Zonal_NTC_multiedge(zonal_configuration=zonal_config["type"], factor=zonal_config["factor"]) if model_type == "Zonal_FBMC": zonal_config = self.config["zonal_configuration"] base_case = zonal_config["base_case"] if base_case not in [c.value for c in FBMCBaseCases]: raise ValueError(f"Invalid FBMC base case: {base_case}") return Zonal_FBMC(zonal_configuration=zonal_config["type"], base_case_type=base_case) if model_type == "DCOPF": return DCOPF() raise ValueError(f"Invalid power flow model: {model_type}")
def get_cut_type(self) -> CutTypes: return CutTypes(self.config["scenario"]["cut_type"]) def get_pricing_algorithm(self) -> PricingAlgorithms: return PricingAlgorithms[self.config["scenario"]["pricing_algorithm"]] def get_redispatch_algorithm(self) -> RedispatchAlgorithms: return RedispatchAlgorithms[self.config["scenario"]["redispatch_algorithm"]] def get_redispatch_constraint_units(self) -> bool: return self.config["scenario"]["redispatch_constraint_units"] def get_redispatch_threshold(self) -> float: return self.config["scenario"]["redispatch_threshold"] def get_alpha(self) -> float: return self.config["scenario"]["alpha"]
[docs] def get_unit_based_solver_congiruation(self) -> Dict[str, Any]: """Return the normalized unit-based solver configuration object.""" if "unit_based_solver_configuration" in self.config: return self.config["unit_based_solver_configuration"] raise ValueError("Missing unit-based solver configuration.")
[docs] def get_euphemia_configuration(self) -> Dict[str, Any]: """Return Euphemia-specific options for order-book runs.""" return self.config.get("euphemia_configuration", {})