Source code for apem.execution_chain

import logging
import os
from datetime import datetime, timezone
from typing import Optional, Union
from uuid import uuid4

from apem.order_book_based_model.euphemia.enums.cut_types import CutTypes
from apem.order_book_based_model.euphemia.enums.datasets import OrderBookBased_Datasets
from apem.order_book_based_model.euphemia.runner import solve_euphemia
from apem.unit_based_model.allocation.algorithms.nodal_clearing.dcopf import DCOPF
from apem.unit_based_model.allocation.algorithms.nodal_clearing.nodal_fbmc_included import NodalFBMC
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_fbmc_included import Zonal_FBMC
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_ntc_aggregated import Zonal_NTC_aggregated
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_ntc_multiedge import Zonal_NTC_multiedge
from apem.unit_based_model.allocation.allocation import SellersAllocation, Allocation
from apem.unit_based_model.solver_configuration import SolverConfiguration
from apem.unit_based_model.error import Error
from apem.unit_based_model.data.parsing.scenario import Scenario
from apem.unit_based_model.pricing.analysis.price_analysis import PriceAnalysis
from apem.unit_based_model.pricing.analysis.pricing import Pricing
from apem.config_loader import ConfigLoader
from apem.core import MarketModels
from apem.unit_based_model.enums import PowerFlowModels, PricingAlgorithms, RedispatchAlgorithms, UnitBased_Datasets
from apem.unit_based_model.allocation.power_flow_model import PowerFlowModel

logger = logging.getLogger(__name__)
if not logger.handlers:
    logging.basicConfig(level=logging.INFO, format="%(message)s")


def _announce_results_path(results_root: Optional[str]) -> None:
    """Print the run output directory to the console."""
    if not results_root:
        return
    logger.info("Results written to: %s", os.path.abspath(results_root))


def _new_run_id() -> str:
    """Create a unique run id with UTC timestamp and random suffix."""
    timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    return f"{timestamp}_{uuid4().hex[:8]}"


def _retrieve_data(dataset: UnitBased_Datasets) -> Scenario:
    """Load and parse a unit-based dataset into a Scenario object."""
    return dataset.value.parse_data()


def _create_configuration() -> SolverConfiguration:
    """Create a SolverConfiguration instance using the current configuration."""
    config = ConfigLoader().get_unit_based_solver_congiruation()
    return SolverConfiguration(
        MIP_gap=config.get("MIP_gap", 1e-4),
        optimality_tol=config.get("optimality_tol", 1e-6),
        time_limit=config.get("time_limit", 3600),
        work_limit=config.get("work_limit", 3600),
        threads=config.get("threads", 0),
        presparsify=config.get("presparsify", -1),
        strict_supply_demand_eq=config.get("strict_supply_demand_eq", True),
        relaxation=config.get("relaxation", False),
        output_flag=config.get("output_flag", 0),
        verbosity=config.get("verbosity", True),
        slack_penalty=config.get("slack_penalty", 1e15),
    )


def _zonal_part(power_flow_model: PowerFlowModel) -> str:
    """Encodes zonal configuration (and influencing params) into the result path."""
    if isinstance(power_flow_model, Zonal_FBMC):
        base_case = getattr(power_flow_model, "base_case_type", "")
        suffix = f"{power_flow_model.zonal_configuration}"
        if base_case:
            suffix += f"_{base_case}"
        return f"{suffix}/"
    if isinstance(power_flow_model, (Zonal_NTC_aggregated, Zonal_NTC_multiedge)):
        factor = getattr(power_flow_model, "factor", None)
        factor_str = f"_f{factor}" if factor is not None else ""
        return f"{power_flow_model.zonal_configuration}{factor_str}/"
    return ""


def _write_run_metadata(dataset: UnitBased_Datasets, scenario: Scenario, power_flow_model: PowerFlowModel,
                        pricing_algorithm: Optional[PricingAlgorithms], redispatch_algorithm: Optional[RedispatchAlgorithms],
                        zonal_part: str, run_root: str, run_id: str) -> None:
    """Persist a quick summary of the run configuration alongside results."""
    base_dir = run_root
    os.makedirs(base_dir, exist_ok=True)
    meta_path = os.path.join(base_dir, "run_config.txt")
    with open(meta_path, "w") as f:
        f.write(f"run_id={run_id}\n")
        f.write(f"created_at_utc={datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')}\n")
        f.write(f"dataset={dataset.name}\n")
        f.write(f"power_flow_model={power_flow_model}\n")
        if zonal_part:
            f.write(f"zonal_path={zonal_part.rstrip('/')}\n")
        if isinstance(power_flow_model, Zonal_FBMC):
            f.write(f"base_case={getattr(power_flow_model, 'base_case_type', '')}\n")
        if isinstance(power_flow_model, (Zonal_NTC_aggregated, Zonal_NTC_multiedge)):
            f.write(f"factor={getattr(power_flow_model, 'factor', '')}\n")
        if pricing_algorithm is not None:
            f.write(f"pricing_algorithm={pricing_algorithm.name}\n")
        if redispatch_algorithm is not None:
            f.write(f"redispatch_algorithm={redispatch_algorithm.name}\n")


def _solve_unit_based_allocation_problem(
    scenario: Scenario,
    power_flow_model: PowerFlowModel,
    configuration: SolverConfiguration,
    run_root: str,
    u_fixed: Optional[dict] = None,
):
    if configuration.verbosity:
        extra = ""
        if isinstance(power_flow_model, Zonal_FBMC):
            extra = f" base_case={getattr(power_flow_model, 'base_case_type', '')}"
        if isinstance(power_flow_model, (Zonal_NTC_aggregated, Zonal_NTC_multiedge)):
            extra = f" factor={getattr(power_flow_model, 'factor', '')}"
        logger.info("allocation start dataset=%s model=%s%s", scenario, power_flow_model, extra)

    zonal_part = _zonal_part(power_flow_model)
    base_path = f"{run_root}/{power_flow_model}"
    path = base_path + "/" + zonal_part + "allocation_results"
    os.makedirs(path, exist_ok=True)

    return power_flow_model.solve(
        scenario,
        configuration,
        results_file=path + f"/{power_flow_model}.csv",
        stats_file=path + f"/{power_flow_model}_stats.txt",
        u_fixed=u_fixed,
    )


def _solve_unit_based_redispatch_problem(
    scenario: Scenario,
    power_flow_model: PowerFlowModel,
    redispatch_algorithm: RedispatchAlgorithms,
    nodal_scenario: Scenario,
    zonal_allocation: SellersAllocation,
    configuration: SolverConfiguration,
    redispatch_constraint_units: bool,
    redispatch_threshold: float,
    run_root: str,
) -> Union[Allocation, Error]:
    if configuration.verbosity:
        logger.info("redispatch start algo=%s model=%s dataset=%s", redispatch_algorithm, power_flow_model, scenario)
    redispatch_algorithm = redispatch_algorithm.value

    zonal_part = _zonal_part(power_flow_model)
    base_path = f"{run_root}/{power_flow_model}"
    path = base_path + "/" + zonal_part + "allocation_results/redispatch"
    os.makedirs(path, exist_ok=True)

    return redispatch_algorithm.compute_redispatch(
        nodal_scenario,
        zonal_allocation,
        configuration,
        path,
        redispatch_constraint_units,
        redispatch_threshold,
    )


def _solve_unit_based_pricing_problem(
    scenario: Scenario,
    allocation: Allocation,
    pricing_algorithm: PricingAlgorithms,
    power_flow_model: PowerFlowModel,
    configuration: SolverConfiguration,
    run_root: str,
    prices=None,
) -> Pricing:
    if configuration.verbosity:
        logger.info("pricing start algo=%s model=%s dataset=%s", pricing_algorithm, power_flow_model, scenario)

    pricing_algorithm = pricing_algorithm.value

    zonal_part = _zonal_part(power_flow_model)
    path = f"{run_root}/{power_flow_model}/{zonal_part}{pricing_algorithm}_results"
    os.makedirs(path, exist_ok=True)

    pricing = pricing_algorithm.compute_prices(
        allocation,
        scenario,
        configuration,
        file_prices=path + f"/{pricing_algorithm}_prices.csv",
        fixed_prices=prices,
    )
    return pricing


[docs] def analyse_results( scenario: Scenario, allocation: Allocation, pricing: Pricing, configuration: SolverConfiguration, pf_model_value, base_scenario: Optional[Scenario] = None, results_root: Optional[str] = None, ) -> PriceAnalysis: """Run post-pricing analysis and write stats/plot data to the run folder. This validates that pricing succeeded, builds a ``PriceAnalysis`` object, computes all configured analysis outputs, and returns the populated analysis instance. """ if isinstance(pricing, Error) or getattr(pricing, "status", 0) != 1: raise RuntimeError( f"Cannot analyse results because pricing failed with status {getattr(pricing, 'status', 'unknown')}." ) path = results_root or f"results/unit_based_model/{scenario}_results" os.makedirs(path, exist_ok=True) analysis = PriceAnalysis(scenario, allocation, pricing, configuration, base_scenario) analysis.compute_all_stats_and_plot_data(path, pf_model_value) return analysis
[docs] def solve_unit_based_allocation_only( dataset: UnitBased_Datasets, power_flow_model: PowerFlowModel, ) -> str: """Compute only the allocation stage and return the run root for welfare-style analyses.""" scenario = _retrieve_data(dataset) configuration = _create_configuration() run_id = _new_run_id() run_root = f"results/unit_based_model/{scenario}_results/{run_id}" zonal_part = _zonal_part(power_flow_model) _write_run_metadata( dataset, scenario, power_flow_model, pricing_algorithm=None, redispatch_algorithm=None, zonal_part=zonal_part, run_root=run_root, run_id=run_id, ) if isinstance(power_flow_model, DCOPF): allocation = _solve_unit_based_allocation_problem( scenario, power_flow_model, configuration, run_root=run_root, ) if isinstance(allocation, Error): raise RuntimeError(f"{power_flow_model} allocation failed with status {allocation.status}.") return run_root if dataset not in [UnitBased_Datasets.PyPSAEurLarge, UnitBased_Datasets.PyPSAEurSmall]: raise ValueError( f"The dataset {dataset.name} cannot be used in combination with the power flow model " f"{power_flow_model}. Zonal prices can only be computed for the PyPSA datasets." ) zonal_scenario, allocation = _solve_unit_based_allocation_problem( scenario, power_flow_model, configuration, run_root=run_root, ) if isinstance(allocation, Error): raise RuntimeError(f"{power_flow_model} allocation failed with status {allocation.status}.") return run_root
[docs] def solve_unit_based_allocation_and_redispatch_only( dataset: UnitBased_Datasets, power_flow_model: PowerFlowModel, redispatch_algorithm: RedispatchAlgorithms = RedispatchAlgorithms.MinCostRD, redispatch_constraint_units: bool = False, redispatch_threshold: float = 0, ) -> str: """Compute allocation plus redispatch only and return the run root for redispatch analyses.""" scenario = _retrieve_data(dataset) configuration = _create_configuration() run_id = _new_run_id() run_root = f"results/unit_based_model/{scenario}_results/{run_id}" zonal_part = _zonal_part(power_flow_model) _write_run_metadata( dataset, scenario, power_flow_model, pricing_algorithm=None, redispatch_algorithm=redispatch_algorithm, zonal_part=zonal_part, run_root=run_root, run_id=run_id, ) if isinstance(power_flow_model, DCOPF): raise ValueError("Redispatch requires a zonal power-flow model.") if dataset not in [UnitBased_Datasets.PyPSAEurLarge, UnitBased_Datasets.PyPSAEurSmall]: raise ValueError( f"The dataset {dataset.name} cannot be used in combination with the power flow model " f"{power_flow_model}. Zonal redispatch can only be computed for the PyPSA datasets." ) zonal_scenario, allocation = _solve_unit_based_allocation_problem( scenario, power_flow_model, configuration, run_root=run_root, ) if isinstance(allocation, Error): raise RuntimeError(f"{power_flow_model} allocation failed with status {allocation.status}.") redispatch_result = _solve_unit_based_redispatch_problem( zonal_scenario, power_flow_model, redispatch_algorithm=redispatch_algorithm, nodal_scenario=scenario, configuration=configuration, zonal_allocation=allocation.SellersAllocation, redispatch_constraint_units=redispatch_constraint_units, redispatch_threshold=redispatch_threshold, run_root=run_root, ) if isinstance(redispatch_result, Error): raise RuntimeError( f"{power_flow_model} redispatch failed with status {redispatch_result.status}." ) return run_root
[docs] def solve_unit_based_scenario( dataset: UnitBased_Datasets, power_flow_model: PowerFlowModel, pricing_algorithm: PricingAlgorithms, redispatch_algorithm: RedispatchAlgorithms = RedispatchAlgorithms.MinCostRD, redispatch_constraint_units: bool = False, redispatch_threshold: float = 0, ) -> PriceAnalysis: """Run allocation and pricing for one unit-based scenario.""" scenario = _retrieve_data(dataset) configuration = _create_configuration() run_id = _new_run_id() run_root = f"results/unit_based_model/{scenario}_results/{run_id}" zonal_part = _zonal_part(power_flow_model) _write_run_metadata( dataset, scenario, power_flow_model, pricing_algorithm, redispatch_algorithm, zonal_part, run_root=run_root, run_id=run_id, ) if isinstance(power_flow_model, DCOPF): allocation = _solve_unit_based_allocation_problem( scenario, power_flow_model, configuration, run_root=run_root ) if isinstance(allocation, Error): raise RuntimeError(f"{power_flow_model} allocation failed with status {allocation.status}.") pricing = _solve_unit_based_pricing_problem( scenario, allocation, pricing_algorithm, power_flow_model, configuration, run_root=run_root, ) if isinstance(pricing, Error) or getattr(pricing, "status", 0) != 1: raise RuntimeError(f"{power_flow_model} pricing failed with status {getattr(pricing, 'status', 'unknown')}.") analysis = PriceAnalysis(scenario, allocation, pricing, configuration) analysis.results_root = run_root return analysis if dataset not in [UnitBased_Datasets.PyPSAEurLarge, UnitBased_Datasets.PyPSAEurSmall]: raise ValueError( f"The dataset {dataset.name} cannot be used in combination with the power flow model " f"{power_flow_model}. Zonal prices can only be computed for the PyPSA datasets." ) zonal_scenario, allocation = _solve_unit_based_allocation_problem( scenario, power_flow_model, configuration, run_root=run_root, ) if isinstance(allocation, Error): raise RuntimeError(f"{power_flow_model} allocation failed with status {allocation.status}.") redispatch_result = _solve_unit_based_redispatch_problem( zonal_scenario, power_flow_model, redispatch_algorithm=redispatch_algorithm, nodal_scenario=scenario, configuration=configuration, zonal_allocation=allocation.SellersAllocation, redispatch_constraint_units=redispatch_constraint_units, redispatch_threshold=redispatch_threshold, run_root=run_root, ) if isinstance(redispatch_result, Error): raise RuntimeError( f"{power_flow_model} redispatch failed with status {redispatch_result.status}." ) pricing = _solve_unit_based_pricing_problem( zonal_scenario, allocation, pricing_algorithm, power_flow_model, configuration, run_root=run_root, ) if isinstance(pricing, Error) or getattr(pricing, "status", 0) != 1: raise RuntimeError(f"{power_flow_model} pricing failed with status {getattr(pricing, 'status', 'unknown')}.") analysis = PriceAnalysis(zonal_scenario, allocation, pricing, configuration, scenario) analysis.results_root = run_root return analysis
[docs] def solve_and_analyse_scenario( unit_based_dataset: UnitBased_Datasets, order_book_based_dataset: OrderBookBased_Datasets, market_model: MarketModels, power_flow_model: PowerFlowModel, cut_type: CutTypes, pricing_algorithm: PricingAlgorithms, redispatch_algorithm: RedispatchAlgorithms = RedispatchAlgorithms.MinCostRD, redispatch_constraint_units: bool = False, redispatch_threshold: float = 0, alpha: float = 0, ): """Run the selected market-model workflow and produce analysis outputs. For ``order_book_based_model``, this executes the Euphemia pipeline. For ``unit_based_model``, this runs allocation/pricing (and redispatch when applicable) and then computes analysis artifacts. """ if market_model == MarketModels.order_book_based_model: euphemia_config = ConfigLoader().get_euphemia_configuration() results_root = solve_euphemia(order_book_based_dataset, cut_type, euphemia_config) _announce_results_path(results_root) return if market_model != MarketModels.unit_based_model: raise ValueError(f"Unsupported market model: {market_model}") if pricing_algorithm != PricingAlgorithms.Markup: price_analysis = solve_unit_based_scenario( unit_based_dataset, power_flow_model, pricing_algorithm, redispatch_algorithm, redispatch_constraint_units, redispatch_threshold, ) is_pypsa_dataset = unit_based_dataset in [UnitBased_Datasets.PyPSAEurLarge, UnitBased_Datasets.PyPSAEurSmall] base_scenario = None if is_pypsa_dataset: is_dcopf_like = isinstance(power_flow_model, (DCOPF, NodalFBMC)) scenario_to_analyse = price_analysis.scenario if is_dcopf_like else price_analysis.base_scenario base_scenario = None if is_dcopf_like else price_analysis.base_scenario zonal_config = _zonal_part(power_flow_model).rstrip("/") if isinstance(power_flow_model, (Zonal_NTC_aggregated, Zonal_NTC_multiedge, Zonal_FBMC)) else "" run_root = getattr(price_analysis, "results_root", None) scenario_to_analyse.analyse_scenario(results_root=run_root) # analyse base scenario scenario_to_analyse.plot_network(power_flow_model, zonal_config, results_root=run_root) # plot underlying network analysis = analyse_results( price_analysis.scenario, price_analysis.allocation, price_analysis.pricing, price_analysis.configuration, power_flow_model, base_scenario, results_root=getattr(price_analysis, "results_root", None), ) _announce_results_path(getattr(price_analysis, "results_root", None)) return analysis scenario = _retrieve_data(unit_based_dataset) configuration = _create_configuration() run_id = _new_run_id() run_root = f"results/unit_based_model/{scenario}_results/{run_id}" zonal_part = _zonal_part(power_flow_model) os.makedirs(run_root, exist_ok=True) _write_run_metadata( unit_based_dataset, scenario, power_flow_model, pricing_algorithm, redispatch_algorithm, zonal_part, run_root=run_root, run_id=run_id, ) allowed_markup = {UnitBased_Datasets.IEEE_RTS, UnitBased_Datasets.PJM, UnitBased_Datasets.ARPA} if unit_based_dataset not in allowed_markup: raise ValueError(f"Markup pricing is only supported for datasets {', '.join([d.name for d in allowed_markup])}.") base_path = f"{run_root}/{power_flow_model}" path = base_path + "/" + zonal_part os.makedirs(path, exist_ok=True) allocation, pricing = pricing_algorithm.Markup.value.compute_prices( scenario, configuration, file_prices=path + f"/{pricing_algorithm.value}_results.csv", alpha=alpha, ) price_analysis = PriceAnalysis( scenario=scenario, allocation=allocation, pricing=pricing, configuration=configuration, base_scenario=scenario, ) price_analysis.results_root = run_root analysis = analyse_results( price_analysis.scenario, price_analysis.allocation, price_analysis.pricing, price_analysis.configuration, power_flow_model, results_root=run_root, ) _announce_results_path(run_root) return analysis