"""Helpers for reusing or loading the latest matching US-model runs from the results directory."""
from __future__ import annotations
from pathlib import Path
import pandas as pd
from apem.execution_chain import (
analyse_results,
solve_unit_based_allocation_and_redispatch_only,
solve_unit_based_allocation_only,
solve_unit_based_scenario,
)
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_fbmc_included import Zonal_FBMC
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_ntc_aggregated import Zonal_NTC_aggregated
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_ntc_multiedge import Zonal_NTC_multiedge
from apem.unit_based_model.evaluation.lost_opp_cost_analysis import load_lost_opp_cost_table
from apem.unit_based_model.evaluation.redispatch_analysis import load_redispatch_metric_file
from apem.unit_based_model.evaluation.welfare_analysis import load_welfare_table
from apem.unit_based_model.enums import PricingAlgorithms, RedispatchAlgorithms, UnitBased_Datasets
[docs]
def normalize_run_dir(path: Path | str, repo_root: Path) -> Path:
"""
Resolve a run directory path, using ``repo_root`` for relative paths.
:param path: absolute or relative run-directory path
:param repo_root: repository root used to resolve relative paths
:return: normalized absolute-like path rooted at ``repo_root`` when needed
"""
run_dir = Path(path)
if not run_dir.is_absolute():
run_dir = repo_root / run_dir
return run_dir
[docs]
def parse_run_config(run_config_path: Path) -> dict[str, str]:
"""
Parse key-value run metadata stored in ``run_config.txt``.
:param run_config_path: path to a run configuration file
:return: dictionary with parsed metadata entries
"""
metadata: dict[str, str] = {}
with open(run_config_path, "r", encoding="utf-8") as handle:
for line in handle:
if "=" not in line:
continue
key, value = line.strip().split("=", 1)
metadata[key] = value
return metadata
def expected_zonal_path(power_flow_model) -> str:
"""Return the zonal-path metadata expected for one power-flow model."""
if isinstance(power_flow_model, Zonal_FBMC):
base_case = getattr(power_flow_model, "base_case_type", "")
return f"{power_flow_model.zonal_configuration}_{base_case}" if base_case else power_flow_model.zonal_configuration
if isinstance(power_flow_model, (Zonal_NTC_aggregated, Zonal_NTC_multiedge)):
factor = getattr(power_flow_model, "factor", None)
factor_str = f"_f{factor}" if factor is not None else ""
return f"{power_flow_model.zonal_configuration}{factor_str}"
return ""
[docs]
def find_latest_matching_run(
results_root: Path,
dataset: UnitBased_Datasets,
pricing_algorithm: PricingAlgorithms,
power_flow_model_name: str,
zonal_path: str = "",
) -> Path | None:
"""
Return the newest run folder for a dataset, pricing algorithm, and model.
:param results_root: root directory containing run folders
:param dataset: dataset enum expected in run metadata
:param pricing_algorithm: pricing algorithm enum expected in run metadata
:param power_flow_model_name: selected power-flow model name
:param zonal_path: expected zonal path metadata value (empty for non-zonal)
:return: latest matching run directory, or ``None`` if no match is found
"""
if not results_root.exists():
return None
candidates: list[tuple[str, float, Path]] = []
for run_config_path in results_root.glob("*/run_config.txt"):
run_dir = run_config_path.parent
metadata = parse_run_config(run_config_path)
run_zonal_path = metadata.get("zonal_path", "")
price_file = (
run_dir
/ power_flow_model_name
/ run_zonal_path
/ f"{pricing_algorithm.name}_results"
/ f"{pricing_algorithm.name}_prices.csv"
)
if not price_file.exists():
continue
if metadata.get("dataset") != dataset.name:
continue
if metadata.get("power_flow_model") != power_flow_model_name:
continue
if metadata.get("pricing_algorithm") != pricing_algorithm.name:
continue
if run_zonal_path != zonal_path:
continue
created_at = metadata.get("created_at_utc", "")
candidates.append((created_at, run_dir.stat().st_mtime, run_dir))
if not candidates:
return None
candidates.sort(key=lambda item: (item[0], item[1]), reverse=True)
return candidates[0][2]
[docs]
def find_latest_matching_lost_opp_cost_run(
results_root: Path,
dataset: UnitBased_Datasets,
pricing_algorithm: PricingAlgorithms,
power_flow_model_name: str,
zonal_path: str = "",
) -> Path | None:
"""
Return the newest matching run folder with lost-opportunity-cost stats.
:param results_root: root directory containing run folders
:param dataset: dataset enum expected in run metadata
:param pricing_algorithm: pricing algorithm enum expected in run metadata
:param power_flow_model_name: selected power-flow model name
:param zonal_path: expected zonal path metadata value
:return: latest matching run directory with stats file, or ``None``
"""
if not results_root.exists():
return None
candidates: list[tuple[str, float, Path]] = []
for run_config_path in results_root.glob("*/run_config.txt"):
run_dir = run_config_path.parent
metadata = parse_run_config(run_config_path)
run_zonal_path = metadata.get("zonal_path", "")
stats_file = (
run_dir
/ power_flow_model_name
/ run_zonal_path
/ f"{pricing_algorithm.name}_results"
/ f"{pricing_algorithm.name}_stats.txt"
)
if not stats_file.exists():
continue
if metadata.get("dataset") != dataset.name:
continue
if metadata.get("power_flow_model") != power_flow_model_name:
continue
if metadata.get("pricing_algorithm") != pricing_algorithm.name:
continue
if run_zonal_path != zonal_path:
continue
created_at = metadata.get("created_at_utc", "")
candidates.append((created_at, run_dir.stat().st_mtime, run_dir))
if not candidates:
return None
candidates.sort(key=lambda item: (item[0], item[1]), reverse=True)
return candidates[0][2]
[docs]
def ensure_run_for_configuration(
results_root: Path,
repo_root: Path,
dataset: UnitBased_Datasets,
pricing_algorithm: PricingAlgorithms,
power_flow_model,
power_flow_model_name: str,
) -> tuple[Path, str]:
"""
Reuse or compute a run for one dataset/pricing/model configuration.
:param results_root: root directory containing run folders
:param repo_root: repository root used to normalize computed paths
:param dataset: dataset enum to solve
:param pricing_algorithm: pricing algorithm enum to solve
:param power_flow_model: instantiated power-flow model object
:param power_flow_model_name: model name used in run folder structure
:return: tuple ``(run_dir, status)`` where status is ``"reused"`` or
``"computed"``
"""
zonal_path = expected_zonal_path(power_flow_model)
existing_run = find_latest_matching_run(
results_root,
dataset,
pricing_algorithm,
power_flow_model_name=power_flow_model_name,
zonal_path=zonal_path,
)
if existing_run is not None:
return existing_run, "reused"
analysis = solve_unit_based_scenario(
dataset=dataset,
power_flow_model=power_flow_model,
pricing_algorithm=pricing_algorithm,
)
return normalize_run_dir(analysis.results_root, repo_root), "computed"
[docs]
def ensure_lost_opp_cost_run_for_configuration(
results_root: Path,
repo_root: Path,
dataset: UnitBased_Datasets,
pricing_algorithm: PricingAlgorithms,
power_flow_model,
power_flow_model_name: str,
) -> tuple[Path, str]:
"""
Reuse or compute a run that includes lost-opportunity-cost analysis outputs.
:param results_root: root directory containing run folders
:param repo_root: repository root used to normalize computed paths
:param dataset: dataset enum to solve
:param pricing_algorithm: pricing algorithm enum to solve
:param power_flow_model: instantiated power-flow model object
:param power_flow_model_name: model name used in run folder structure
:return: tuple ``(run_dir, status)`` where status is ``"reused"`` or
``"computed"``
"""
zonal_path = expected_zonal_path(power_flow_model)
existing_run = find_latest_matching_lost_opp_cost_run(
results_root,
dataset,
pricing_algorithm,
power_flow_model_name=power_flow_model_name,
zonal_path=zonal_path,
)
if existing_run is not None:
return existing_run, "reused"
analysis = solve_unit_based_scenario(
dataset=dataset,
power_flow_model=power_flow_model,
pricing_algorithm=pricing_algorithm,
)
run_root = normalize_run_dir(analysis.results_root, repo_root)
analyse_results(
analysis.scenario,
analysis.allocation,
analysis.pricing,
analysis.configuration,
power_flow_model,
base_scenario=getattr(analysis, "base_scenario", None),
results_root=str(run_root),
)
return run_root, "computed"
[docs]
def load_prices_from_run(
run_dir: Path,
scenario_name: str,
pricing_algorithm: PricingAlgorithms,
power_flow_model_name: str,
) -> pd.DataFrame:
"""
Load one pricing algorithm's node-period prices from a selected run folder.
:param run_dir: run directory containing ``run_config.txt``
:param scenario_name: dataset/scenario label added to the output table
:param pricing_algorithm: pricing algorithm enum
:param power_flow_model_name: model name used in run folder structure
:return: normalized price table with ``dataset``, ``algorithm``, ``node``,
``period``, and ``price``
"""
metadata = parse_run_config(run_dir / "run_config.txt")
zonal_path = metadata.get("zonal_path", "")
price_file = (
run_dir
/ power_flow_model_name
/ zonal_path
/ f"{pricing_algorithm.name}_results"
/ f"{pricing_algorithm.name}_prices.csv"
)
df = pd.read_csv(price_file)
df["dataset"] = scenario_name
df["algorithm"] = pricing_algorithm.name
return df[["dataset", "algorithm", "node", "period", "price"]]
[docs]
def load_lost_opp_costs_from_run(
run_dir: Path,
scenario_name: str,
pricing_algorithm: PricingAlgorithms,
power_flow_model_name: str,
) -> pd.DataFrame:
"""
Load one pricing algorithm's lost-opportunity-cost components from a run.
:param run_dir: run directory containing ``run_config.txt``
:param scenario_name: dataset/scenario label added to the output table
:param pricing_algorithm: pricing algorithm enum
:param power_flow_model_name: model name used in run folder structure
:return: normalized table with ``dataset``, ``algorithm``,
``lost_opp_cost``, ``component``, ``value``
"""
metadata = parse_run_config(run_dir / "run_config.txt")
zonal_path = metadata.get("zonal_path", "")
stats_file = (
run_dir
/ power_flow_model_name
/ zonal_path
/ f"{pricing_algorithm.name}_results"
/ f"{pricing_algorithm.name}_stats.txt"
)
df = load_lost_opp_cost_table(stats_file)
df["dataset"] = scenario_name
df["algorithm"] = pricing_algorithm.name
return df[["dataset", "algorithm", "lost_opp_cost", "component", "value"]]
[docs]
def find_latest_matching_welfare_run(
results_root: Path,
dataset: UnitBased_Datasets,
power_flow_model_name: str,
zonal_path: str = "",
) -> Path | None:
"""
Return the newest matching run folder with allocation welfare stats.
:param results_root: root directory containing run folders
:param dataset: dataset enum expected in run metadata
:param power_flow_model_name: selected power-flow model name
:param zonal_path: expected zonal path metadata value
:return: latest matching run directory with allocation stats, or ``None``
"""
if not results_root.exists():
return None
candidates: list[tuple[str, float, Path]] = []
for run_config_path in results_root.glob("*/run_config.txt"):
run_dir = run_config_path.parent
metadata = parse_run_config(run_config_path)
run_zonal_path = metadata.get("zonal_path", "")
stats_file = (
run_dir
/ power_flow_model_name
/ run_zonal_path
/ "allocation_results"
/ f"{power_flow_model_name}_stats.txt"
)
if not stats_file.exists():
continue
if metadata.get("dataset") != dataset.name:
continue
if metadata.get("power_flow_model") != power_flow_model_name:
continue
if run_zonal_path != zonal_path:
continue
created_at = metadata.get("created_at_utc", "")
candidates.append((created_at, run_dir.stat().st_mtime, run_dir))
if not candidates:
return None
candidates.sort(key=lambda item: (item[0], item[1]), reverse=True)
return candidates[0][2]
[docs]
def ensure_welfare_run_for_configuration(
results_root: Path,
repo_root: Path,
dataset: UnitBased_Datasets,
power_flow_model,
power_flow_model_name: str,
) -> tuple[Path, str]:
"""
Reuse or compute a run that includes allocation welfare stats.
:param results_root: root directory containing run folders
:param repo_root: repository root used to normalize computed paths
:param dataset: dataset enum to solve
:param power_flow_model: instantiated power-flow model object
:param power_flow_model_name: model name used in run folder structure
:return: tuple ``(run_dir, status)`` where status is ``"reused"`` or
``"computed"``
"""
zonal_path = expected_zonal_path(power_flow_model)
existing_run = find_latest_matching_welfare_run(
results_root,
dataset,
power_flow_model_name=power_flow_model_name,
zonal_path=zonal_path,
)
if existing_run is not None:
return existing_run, "reused"
run_root = solve_unit_based_allocation_only(
dataset=dataset,
power_flow_model=power_flow_model,
)
return normalize_run_dir(run_root, repo_root), "computed"
[docs]
def load_welfare_from_run(
run_dir: Path,
scenario_name: str,
power_flow_model_name: str,
) -> pd.DataFrame:
"""
Load welfare values from a selected run folder.
:param run_dir: run directory containing ``run_config.txt``
:param scenario_name: dataset/scenario label added to the output table
:param power_flow_model_name: model name used in run folder structure
:return: normalized welfare table with ``dataset``, ``power_flow_model``,
``welfare_scope``, ``period``, and ``welfare``
"""
metadata = parse_run_config(run_dir / "run_config.txt")
zonal_path = metadata.get("zonal_path", "")
stats_file = (
run_dir
/ power_flow_model_name
/ zonal_path
/ "allocation_results"
/ f"{power_flow_model_name}_stats.txt"
)
df = load_welfare_table(stats_file, power_flow_model_name=power_flow_model_name)
df["dataset"] = scenario_name
return df[["dataset", "power_flow_model", "welfare_scope", "period", "welfare"]]
[docs]
def find_latest_matching_redispatch_run(
results_root: Path,
dataset: UnitBased_Datasets,
power_flow_model_name: str,
redispatch_algorithm: RedispatchAlgorithms,
redispatch_constraint_units: bool = False,
redispatch_threshold: float = 0,
zonal_path: str = "",
) -> Path | None:
"""
Return the newest matching run folder with redispatch metric outputs.
:param results_root: root directory containing run folders
:param dataset: dataset enum expected in run metadata
:param power_flow_model_name: selected power-flow model name
:param redispatch_algorithm: redispatch algorithm enum
:param redispatch_constraint_units: redispatch option expected in run output
:param redispatch_threshold: threshold option expected in run output
:param zonal_path: expected zonal path metadata value
:return: latest matching run directory with redispatch files, or ``None``
"""
if not results_root.exists():
return None
candidates: list[tuple[str, float, Path]] = []
redispatch_suffix = (
f"{redispatch_algorithm.name}_{redispatch_constraint_units}_{redispatch_threshold}_redispatch_costs.csv"
)
for run_config_path in results_root.glob("*/run_config.txt"):
run_dir = run_config_path.parent
metadata = parse_run_config(run_config_path)
run_zonal_path = metadata.get("zonal_path", "")
stats_file = (
run_dir
/ power_flow_model_name
/ run_zonal_path
/ "allocation_results"
/ "redispatch"
/ redispatch_suffix
)
if not stats_file.exists():
continue
if metadata.get("dataset") != dataset.name:
continue
if metadata.get("power_flow_model") != power_flow_model_name:
continue
if metadata.get("redispatch_algorithm") != redispatch_algorithm.name:
continue
if run_zonal_path != zonal_path:
continue
created_at = metadata.get("created_at_utc", "")
candidates.append((created_at, run_dir.stat().st_mtime, run_dir))
if not candidates:
return None
candidates.sort(key=lambda item: (item[0], item[1]), reverse=True)
return candidates[0][2]
[docs]
def ensure_redispatch_run_for_configuration(
results_root: Path,
repo_root: Path,
dataset: UnitBased_Datasets,
power_flow_model,
power_flow_model_name: str,
redispatch_algorithm: RedispatchAlgorithms,
redispatch_constraint_units: bool = False,
redispatch_threshold: float = 0,
) -> tuple[Path, str]:
"""
Reuse or compute a run that includes redispatch metrics.
:param results_root: root directory containing run folders
:param repo_root: repository root used to normalize computed paths
:param dataset: dataset enum to solve
:param power_flow_model: instantiated power-flow model object
:param power_flow_model_name: model name used in run folder structure
:param redispatch_algorithm: redispatch algorithm enum
:param redispatch_constraint_units: redispatch option forwarded to solver
:param redispatch_threshold: threshold option forwarded to solver
:return: tuple ``(run_dir, status)`` where status is ``"reused"`` or
``"computed"``
"""
zonal_path = expected_zonal_path(power_flow_model)
existing_run = find_latest_matching_redispatch_run(
results_root,
dataset,
power_flow_model_name=power_flow_model_name,
redispatch_algorithm=redispatch_algorithm,
redispatch_constraint_units=redispatch_constraint_units,
redispatch_threshold=redispatch_threshold,
zonal_path=zonal_path,
)
if existing_run is not None:
return existing_run, "reused"
run_root = solve_unit_based_allocation_and_redispatch_only(
dataset=dataset,
power_flow_model=power_flow_model,
redispatch_algorithm=redispatch_algorithm,
redispatch_constraint_units=redispatch_constraint_units,
redispatch_threshold=redispatch_threshold,
)
return normalize_run_dir(run_root, repo_root), "computed"
[docs]
def load_redispatch_metrics_from_run(
run_dir: Path,
scenario_name: str,
power_flow_model_name: str,
redispatch_algorithm: RedispatchAlgorithms,
redispatch_constraint_units: bool = False,
redispatch_threshold: float = 0,
) -> pd.DataFrame:
"""
Load redispatch costs/volumes from a selected run folder.
:param run_dir: run directory containing ``run_config.txt``
:param scenario_name: dataset/scenario label added to the output table
:param power_flow_model_name: model name used in run folder structure
:param redispatch_algorithm: redispatch algorithm enum
:param redispatch_constraint_units: redispatch option used to build file
names and output metadata
:param redispatch_threshold: threshold used to build file names and output
metadata
:return: normalized table with ``dataset``, ``power_flow_model``,
``redispatch_algorithm``, redispatch options, ``metric``, and
``value``
"""
metadata = parse_run_config(run_dir / "run_config.txt")
zonal_path = metadata.get("zonal_path", "")
redispatch_root = (
run_dir
/ power_flow_model_name
/ zonal_path
/ "allocation_results"
/ "redispatch"
)
stem = f"{redispatch_algorithm.name}_{redispatch_constraint_units}_{redispatch_threshold}"
metric_files = {
"costs": redispatch_root / f"{stem}_redispatch_costs.csv",
"volumes": redispatch_root / f"{stem}_redispatch_vols.csv",
}
metrics = [
load_redispatch_metric_file(
path,
redispatch_algorithm=redispatch_algorithm.name,
metric=metric,
)
for metric, path in metric_files.items()
]
df = pd.concat(metrics, ignore_index=True)
df["dataset"] = scenario_name
df["power_flow_model"] = power_flow_model_name
df["redispatch_constraint_units"] = redispatch_constraint_units
df["redispatch_threshold"] = redispatch_threshold
return df[
[
"dataset",
"power_flow_model",
"redispatch_algorithm",
"redispatch_constraint_units",
"redispatch_threshold",
"metric",
"value",
]
]