import json
import logging
import os
import random
import re
import time
import csv
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from uuid import uuid4
import gurobipy as gp
import pandas as pd
from gurobipy import GRB
import apem.order_book_based_model.euphemia.cutting_strategies.price_based as price_based_cutting
import apem.order_book_based_model.euphemia.cutting_strategies.combinatorial_benders as combinatorial_benders_cutting
import apem.order_book_based_model.euphemia.cutting_strategies.no_good as no_good_cutting
from apem.order_book_based_model.euphemia.enums.cut_types import CutTypes
from apem.order_book_based_model.euphemia.euphemia_config import EuphemiaConfig
from apem.order_book_based_model.euphemia.model.setup_model import add_objective, add_market_constraints, \
add_network_constraints
from apem.order_book_based_model.euphemia.pricing.price_determination_subproblem import PriceSubproblem
from apem.order_book_based_model.euphemia.reinsertions.prmic_prb_reinsertion import PRMIC_PRB_reinsertion
from apem.order_book_based_model.euphemia.utils.calculations import calculate_flexible_order_active_period, \
calculate_block_demand_surplus
from apem.order_book_based_model.euphemia.utils.extraction import get, parse_step_order_ids
def _slugify(value: str) -> str:
"""Convert free-text labels to folder-safe identifiers."""
return re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_")
def _new_run_id() -> str:
"""Create a unique run id with UTC timestamp and random suffix."""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
return f"{timestamp}_{uuid4().hex[:8]}"
def _normalize_zone(zone: object, default_zone: str) -> str:
"""Return a canonical zone label and strip accidental quote characters."""
if pd.isna(zone):
return default_zone
zone_str = str(zone).strip().strip("'\"").strip()
return zone_str if zone_str else default_zone
[docs]
class MasterProblem:
"""
Formulate and solve the Euphemia master problem.
"""
def __init__(self, config: EuphemiaConfig) -> None:
if not config:
config = EuphemiaConfig()
self.config = config
self.model = gp.Model('Euphemia Master Problem')
self.scenario = config.scenario
self.periods = self.scenario.periods
self.step_orders = self.scenario.step_orders
self.block_orders = self.scenario.block_orders
self.complex_orders = self.scenario.complex_orders
self.complex_step_orders = self.scenario.complex_step_orders
self.scalable_complex_orders = self.scenario.scalable_complex_orders
self.scalable_step_orders = self.scenario.scalable_step_orders
self.piecewise_linear_orders = self.scenario.piecewise_linear_orders
self.default_zone = "Z1"
# Normalize zone labels across all zonal order tables.
for df in (
self.step_orders,
self.block_orders,
self.complex_step_orders,
self.scalable_step_orders,
self.piecewise_linear_orders,
):
if "zone" not in df.columns:
df["zone"] = self.default_zone
df["zone"] = df["zone"].apply(lambda zone: _normalize_zone(zone, self.default_zone))
scenario_zones = getattr(self.scenario, "zones", None)
if scenario_zones:
self.zones = sorted({_normalize_zone(z, self.default_zone) for z in scenario_zones})
else:
self.zones = sorted(
set(self.step_orders["zone"])
.union(self.block_orders["zone"])
.union(self.complex_step_orders["zone"])
.union(self.scalable_step_orders["zone"])
.union(self.piecewise_linear_orders["zone"])
)
if not self.zones:
self.zones = [self.default_zone]
self.default_zone = self.zones[0]
self.network_model = str(getattr(config, "network_model", "ATC")).upper()
if self.network_model not in {"ATC", "FBMC"}:
raise ValueError(f"Unsupported network_model '{self.network_model}'. Use 'ATC' or 'FBMC'.")
self.atc = getattr(self.scenario, "atc", pd.DataFrame(columns=["from_zone", "to_zone", "t", "cap"]))
self.atc_cap = {}
self.atc_ramp_up = {}
self.atc_ramp_down = {}
if isinstance(self.atc, pd.DataFrame) and not self.atc.empty:
for _, row in self.atc.iterrows():
t = int(row["t"])
if t not in self.periods:
continue
from_zone = _normalize_zone(row["from_zone"], self.default_zone)
to_zone = _normalize_zone(row["to_zone"], self.default_zone)
cap = float(row["cap"])
key = (from_zone, to_zone, t)
if key in self.atc_cap:
self.atc_cap[key] = min(self.atc_cap[key], cap)
else:
self.atc_cap[key] = cap
if "ramp_up" in row and pd.notna(row["ramp_up"]):
self.atc_ramp_up[(from_zone, to_zone)] = float(row["ramp_up"])
if "ramp_down" in row and pd.notna(row["ramp_down"]):
self.atc_ramp_down[(from_zone, to_zone)] = float(row["ramp_down"])
if self.atc_cap:
self.zones = sorted(
set(self.zones).union({i for (i, _, _) in self.atc_cap}).union({j for (_, j, _) in self.atc_cap})
)
self.fb_constraints = getattr(
self.scenario, "fb_constraints", pd.DataFrame(columns=["cnec_id", "t", "ram"])
)
self.fb_ptdf = getattr(self.scenario, "fb_ptdf", pd.DataFrame(columns=["cnec_id", "t", "zone", "ptdf"]))
self.fb_ram = {}
self.fb_lb = {}
self.fb_ptdf_map = {}
if isinstance(self.fb_constraints, pd.DataFrame) and not self.fb_constraints.empty:
for _, row in self.fb_constraints.iterrows():
t = int(row["t"])
if t not in self.periods:
continue
cnec_id = str(row["cnec_id"])
key = (cnec_id, t)
ram = float(row["ram"])
self.fb_ram[key] = min(self.fb_ram[key], ram) if key in self.fb_ram else ram
if "lb" in row and pd.notna(row["lb"]):
lb = float(row["lb"])
self.fb_lb[key] = max(self.fb_lb[key], lb) if key in self.fb_lb else lb
if isinstance(self.fb_ptdf, pd.DataFrame) and not self.fb_ptdf.empty:
for _, row in self.fb_ptdf.iterrows():
t = int(row["t"])
if t not in self.periods:
continue
cnec_id = str(row["cnec_id"])
zone = _normalize_zone(row["zone"], self.default_zone)
self.fb_ptdf_map[(cnec_id, t, zone)] = self.fb_ptdf_map.get((cnec_id, t, zone), 0.0) + float(row["ptdf"])
if self.fb_ptdf_map:
self.zones = sorted(set(self.zones).union({z for (_, _, z) in self.fb_ptdf_map}))
self.atc_index = sorted(self.atc_cap.keys())
self.fb_index = sorted((c, t) for (c, t) in self.fb_ram if any((c, t, z) in self.fb_ptdf_map for z in self.zones))
if self.network_model == "ATC":
self.network_constraints_enabled = bool(self.atc_index) and len(self.zones) > 1
else:
self.network_constraints_enabled = bool(self.fb_index) and len(self.zones) > 1
self.zonal_pricing_enabled = self.network_constraints_enabled
self.accept_step = self.model.addVars(list(self.step_orders['id']), vtype=GRB.CONTINUOUS, lb=0, ub=1,
name='accept_step')
self.accept_block = self.model.addVars(list(self.block_orders['id']), vtype=GRB.CONTINUOUS, lb=0, ub=1,
name='accept_block')
# required for the big-M constraint to satisfy the MAR condition of block orders
self.MAR_aux = self.model.addVars(list(self.block_orders['id']), vtype=GRB.BINARY, name='y')
# required for flexible orders - decide in which period the order is accepted
self.flex_period = self.model.addVars(
list(self.block_orders[self.block_orders['block_type'] == 'flexible']['id']), self.periods,
vtype=GRB.BINARY, name='flex_period')
self.accept_complex = self.model.addVars(list(self.complex_orders['id']), vtype=GRB.BINARY, lb=0, ub=1,
name='accept_complex')
self.accept_complex_step = self.model.addVars(list(self.complex_step_orders['id']), vtype=GRB.CONTINUOUS,
lb=0, ub=1, name='accept_complex_step')
self.accept_scalable = self.model.addVars(list(self.scalable_complex_orders['id']), vtype=GRB.BINARY,
lb=0, ub=1, name='accept_scalable_complex')
self.accept_scalable_step = self.model.addVars(list(self.scalable_step_orders['id']), vtype=GRB.CONTINUOUS,
lb=0, ub=1, name='accept_scalable_step')
self.accept_piecewise_linear = self.model.addVars(list(self.piecewise_linear_orders['id']),
vtype=GRB.CONTINUOUS, lb=0, ub=1,
name='accept_piecewise_linear')
if self.network_model == "ATC":
self.f_atc = self.model.addVars(self.atc_index, vtype=GRB.CONTINUOUS, lb=0, name="f_atc")
self.net_position = gp.tupledict()
else:
self.f_atc = gp.tupledict()
self.net_position = self.model.addVars(
self.zones, self.periods, vtype=GRB.CONTINUOUS, lb=-GRB.INFINITY, name="net_position"
)
self.add_acceptance_variables_to_dataframe()
# Compute overlapping block orders for price-based cuts
self.block_overlap = self.compute_block_overlaps()
self.block_orders['overlap_set'] = self.block_orders['id'].map(self.block_overlap)
self.current_alloc_solution = {}
self.found_solution = False
self.current_best_objective = -1
self.reinsertion_run = False
self.iteration = 0
self.start_time = 0
self.M = config.big_m
self.prices = {}
self.prices_reinsertion = {}
self.price_lower_bound = config.price_lower_bound
self.price_upper_bound = config.price_upper_bound
self.delta_PAB = config.delta_PAB
self.beta_MIC = config.beta_MIC
self.delta_load_gradient = config.delta_load_gradient
self.epsilon = config.epsilon
self.max_iterations = config.max_iterations
self.reinsertion_max_iterations = config.reinsertion_max_iterations
self.max_prb_reinsertion_attempts = config.max_prb_reinsertion_attempts
self.cutting_strategy = config.cutting_strategy
self.disable_reinsertion = config.disable_reinsertion
self.calculate_corrected_welfare = config.calculate_corrected_welfare
self.output_flag = config.output_flag
self.time_limit = config.time_limit
self.mip_gap = config.mip_gap
self.threads = config.threads
self.seed = config.seed
self.lazy_constraints = config.lazy_constraints
self.model.Params.LazyConstraints = int(self.lazy_constraints)
self.model.setParam("OutputFlag", int(self.output_flag))
if self.time_limit is not None:
self.model.setParam("TimeLimit", float(self.time_limit))
if self.mip_gap is not None:
self.model.setParam("MIPGap", float(self.mip_gap))
if self.threads is not None:
self.model.setParam("Threads", int(self.threads))
if self.seed is not None:
self.model.setParam("Seed", int(self.seed))
self.run_id = _new_run_id()
self.cut_type_key = _slugify(self.cutting_strategy.value)
self.created_at_utc = datetime.now(timezone.utc)
self.started_at_utc = None
project_root = Path(__file__).resolve().parents[4]
results_root = project_root / "results" / "order_book_based_model" / "euphemia"
self.run_root = results_root / self.config.dataset / self.run_id / self.cut_type_key
self.paths = {
"alloc": self.run_root / "allocation",
"prices": self.run_root / "prices",
"pab": self.run_root / "pab",
"block_inm_threshold": self.run_root / "block_inm_threshold",
"complex_mic": self.run_root / "complex_mic",
"complex_mic_inm_threshold": self.run_root / "complex_mic_inm_threshold",
"scalable_mic": self.run_root / "scalable_mic",
"scalable_mic_inm_threshold": self.run_root / "scalable_mic_inm_threshold",
"debug": self.run_root / "debug",
"evaluation": self.run_root / "evaluation",
}
for attr, path in self.paths.items():
setattr(self, attr, path)
os.makedirs(path, exist_ok=True)
self.run_metadata_path = self.run_root / "run.json"
self.run_logger = logging.getLogger(f"apem.euphemia.{self.config.dataset}.{self.cut_type_key}.{self.run_id}")
self._setup_run_logger()
self.run_metadata = {
"run_id": self.run_id,
"dataset": self.config.dataset,
"network_model": self.network_model,
"scenario_name": self.scenario.name,
"cut_type": self.cutting_strategy.value,
"cut_type_key": self.cut_type_key,
"created_at_utc": self.created_at_utc.isoformat().replace("+00:00", "Z"),
"started_at_utc": None,
"ended_at_utc": None,
"status": "initialized",
"reinsertion_run": self.reinsertion_run,
"iteration": 0,
"found_solution": False,
"best_objective": None,
"model_status": None,
"paths": {key: str(path) for key, path in self.paths.items()},
"run_log": str(self.run_root / "run.log"),
}
self._write_run_metadata()
def _setup_run_logger(self) -> None:
"""Configure a per-run file logger."""
self.run_logger.setLevel(logging.INFO)
self.run_logger.propagate = False
for handler in list(self.run_logger.handlers):
self.run_logger.removeHandler(handler)
handler.close()
log_file = self.run_root / "run.log"
handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
self.run_logger.addHandler(handler)
def _emit(self, message: str, level: int = logging.INFO) -> None:
"""Write a message to stdout and the per-run log file."""
print(message)
self.run_logger.log(level, message)
def _safe_model_status(self) -> Optional[int]:
"""Return the current Gurobi status code when available."""
try:
return int(self.model.Status)
except Exception: # noqa: BLE001
return None
def _write_run_metadata(self) -> None:
"""Persist run metadata as JSON for downstream tracking."""
with open(self.run_metadata_path, "w", encoding="utf-8") as metadata_file:
json.dump(self.run_metadata, metadata_file, indent=2, sort_keys=True)
def _finalize_run_metadata(self, status: str, error: Optional[str] = None) -> None:
"""Persist final run metadata after optimization or failure."""
elapsed_seconds = None
if self.start_time:
elapsed_seconds = time.time() - self.start_time
self.run_metadata.update(
{
"status": status,
"reinsertion_run": self.reinsertion_run,
"iteration": self.iteration,
"found_solution": self.found_solution,
"best_objective": self.current_best_objective if self.current_best_objective >= 0 else None,
"model_status": self._safe_model_status(),
"ended_at_utc": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"elapsed_seconds": elapsed_seconds,
}
)
if error:
self.run_metadata["error"] = error
self._write_run_metadata()
[docs]
def resolve_zone(self, zone: object) -> str:
"""Normalize a raw zone label and fall back to the default zone when needed."""
return _normalize_zone(zone, self.default_zone)
[docs]
def get_order_zone(self, df: pd.DataFrame, order_id: object) -> str:
"""Return the normalized zone assigned to a specific order row."""
if "zone" not in df.columns:
return self.default_zone
return self.resolve_zone(get(df, "zone", order_id))
[docs]
def get_price_value(self, t: int, zone: Optional[str] = None, reinsertion: Optional[bool] = False) -> float:
"""
Return the clearing price for a period.
:param t: Market period.
:param zone: Optional zone label for zonal pricing runs.
:param reinsertion: Whether to use the reinsertion price map.
:return: Clearing price for the requested period and zone.
"""
prices = self.prices_reinsertion if reinsertion else self.prices
if self.zonal_pricing_enabled:
resolved_zone = self.resolve_zone(zone)
if (resolved_zone, t) in prices:
return prices[(resolved_zone, t)]
if (self.default_zone, t) in prices:
return prices[(self.default_zone, t)]
return prices[t]
def run(self) -> None:
"""
Run the full Euphemia solve loop for the configured scenario.
The method formulates the master problem, solves it with lazy cuts, checks each
incumbent through the pricing subproblem, stores accepted allocations and prices,
and optionally launches reinsertion passes once a feasible market outcome is found.
"""
self.start_time = time.time()
self.started_at_utc = datetime.now(timezone.utc)
self.run_metadata.update(
{
"started_at_utc": self.started_at_utc.isoformat().replace("+00:00", "Z"),
"status": "running",
"reinsertion_run": self.reinsertion_run,
"error": None,
}
)
self._write_run_metadata()
run_status = "failed"
run_error = None
try:
self._emit("Formulating master problem (objective + constraints)...")
formulation_start = time.perf_counter()
add_objective(self)
add_market_constraints(self)
add_network_constraints(self)
self.model.update()
formulation_elapsed = time.perf_counter() - formulation_start
self._emit(
f"Master problem formulated in {formulation_elapsed:.3f}s: vars={self.model.NumVars}, constrs={self.model.NumConstrs}"
)
self.max_iterations = self.max_iterations if not self.reinsertion_run else self.reinsertion_max_iterations
self._emit("Starting master problem optimization...")
optimization_start = time.perf_counter()
self.solve_master_problem()
optimization_elapsed = time.perf_counter() - optimization_start
self._emit(f"Master problem optimization finished in {optimization_elapsed:.3f}s.")
self.model.write(str(self.paths["debug"] / "master_problem.lp"))
self._emit(f"Master problem status: {self.model.Status}")
if self.model.Status == GRB.Status.INFEASIBLE:
self._emit("Master problem is infeasible")
run_status = "infeasible"
else:
run_status = "completed_no_solution"
if self.found_solution:
self._emit(
f"------- Surplus maximization and price problem successfully finished after {self.iteration} iterations -------"
)
self._emit(
f'Final economic surplus{" of reinsertion run" if self.reinsertion_run else ""}: {self.current_best_objective}'
)
self._emit(f"Found prices: {self.prices}")
run_status = "success"
# Log metrics for evaluation
if not self.reinsertion_run:
elapsed = time.time() - self.start_time
file_path = self.paths["evaluation"] / "evaluation.txt"
with open(file_path, "a", buffering=1) as file:
file.write(f"--- Evaluation: {self.cutting_strategy} on {self.scenario.name} ---\n")
if self.cutting_strategy == CutTypes.PB:
file.write(
f"- beta_MIC: {self.beta_MIC} ; delta_load_gradient: {self.delta_load_gradient} - \n"
)
file.write(f"Iterations: {self.iteration}\n")
file.write(f"Final welfare: {self.current_best_objective}\n")
file.write(f"Time passed: {elapsed:.3f} seconds\n")
file.write(f"Clearing prices {self.prices}\n")
if self.calculate_corrected_welfare:
inelastic_surplus = calculate_block_demand_surplus(self)
file.write(f"Corrected welfare: {self.current_best_objective - inelastic_surplus}\n")
file.write("\n")
file.flush()
os.fsync(file.fileno())
if not self.reinsertion_run and not self.disable_reinsertion:
PRMIC_PRB_reinsertion(self, is_prmic_reinsertion=True)
PRMIC_PRB_reinsertion(self, is_prmic_reinsertion=False)
except Exception as exc: # noqa: BLE001
run_error = str(exc)
self._emit(f"Run failed: {exc}", level=logging.ERROR)
self.run_logger.exception("Unhandled exception during run")
raise
finally:
self._finalize_run_metadata(status=run_status, error=run_error)
[docs]
def solve_master_problem(self) -> None:
"""
Optimize the master problem and evaluate incumbents through the callback.
"""
self.model.optimize(callback=self.master_problem_callback)
[docs]
def master_problem_callback(self, callback_model: gp.Model, where: int) -> None:
"""
Process incumbent integer solutions produced by the master problem.
For each MIP solution, the callback updates the in-memory allocation tables,
solves the pricing subproblem, stores a new incumbent when prices are feasible,
or adds the configured lazy cut when the pricing problem is infeasible.
"""
# when a MIP solution was found
if where == GRB.Callback.MIPSOL:
# Check iteration limit
if self.iteration >= self.max_iterations:
self._emit(f"Maximum iterations ({self.max_iterations}) reached. Terminating.")
callback_model.terminate() # terminate optimization early
if not self.reinsertion_run:
elapsed = time.time() - self.start_time
# Log if no solution could be found
file_path = self.paths["evaluation"] / "evaluation.txt"
with open(file_path, "a", buffering=1) as file:
file.write(f"--- Evaluation: {self.cutting_strategy} on {self.scenario.name} ---\n")
if self.cutting_strategy == CutTypes.PB:
file.write(
f"- beta_MIC: {self.beta_MIC} ; delta_load_gradient: {self.delta_load_gradient} - \n")
file.write(f"No solution in iteration limit of {self.max_iterations}\n")
file.write(f"Time passed: {elapsed:.3f} seconds\n\n")
file.flush()
os.fsync(file.fileno())
return
# get current solution
objective_value = callback_model.cbGet(GRB.Callback.MIPSOL_OBJ)
vars = callback_model.getVars()
solution = callback_model.cbGetSolution(vars)
if solution is not None:
self._emit("Found integer solution")
self._emit(f"Objective value: {objective_value}")
self.iteration += 1
# match variables with value in current solution
self.current_alloc_solution = {v.VarName: [val] for v, val in zip(vars, solution)}
self.update_order_dataframes()
# Write current allocation solution to file
file_path = self.paths["alloc"] / "results.csv"
with open(file_path, "w", newline="", buffering=1, encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["variable", "value"])
for var in callback_model.getVars():
writer.writerow([var.VarName, callback_model.cbGetSolution(var)])
f.flush()
os.fsync(f.fileno())
self._emit("Solving price determination subproblem...")
price_subproblem = PriceSubproblem(master_problem=self)
price_subproblem.solve_price_determination_subproblem()
# If price subproblem optimal check if new incumbent was found
if price_subproblem.pricing_model.Status == GRB.OPTIMAL:
self._emit("Found market clearing prices")
# Write MCPs to file
file_path = self.paths["prices"] / "prices.csv"
file_exists = file_path.exists() and file_path.stat().st_size > 0
with open(file_path, "a", buffering=1, encoding="utf-8") as file: # 'a' = append
if not file_exists:
file.write("variable,value\n")
for v in price_subproblem.pricing_model.getVars():
file.write(f"{v.varName},{v.X}\n")
# self._emit(f"{v.varName}: {v.X}") # for console output and run log
file.flush()
os.fsync(file.fileno())
if objective_value > self.current_best_objective:
self.set_prices(price_subproblem.extract_prices(), reinsertion=False)
self.current_best_objective = objective_value
self.found_solution = True
# if price subproblem is infeasible, add cut to master problem
if price_subproblem.pricing_model.Status == GRB.INFEASIBLE:
self._emit("Price subproblem is infeasible")
if self.cutting_strategy == CutTypes.CB:
combinatorial_benders_cutting.add_combinatorial_benders_cut(self=self,
callback_model=callback_model,
price_subproblem=price_subproblem)
elif self.cutting_strategy == CutTypes.NG:
no_good_cutting.add_no_good_cut(self=self, callback_model=callback_model)
elif self.cutting_strategy == CutTypes.PB:
price_based_cutting.handle_price_based_cutting(self=self, callback_model=callback_model)
[docs]
def add_acceptance_variables_to_dataframe(self) -> None:
"""
Attach each Gurobi acceptance variable to the corresponding order dataframe row.
"""
self.step_orders['acceptance_var'] = self.step_orders['id'].map(self.accept_step)
self.piecewise_linear_orders['acceptance_var'] = self.piecewise_linear_orders['id'].map(
self.accept_piecewise_linear)
self.block_orders['acceptance_var'] = self.block_orders['id'].map(self.accept_block)
self.complex_orders['acceptance_var'] = self.complex_orders['id'].map(self.accept_complex)
self.scalable_complex_orders['acceptance_var'] = self.scalable_complex_orders['id'].map(self.accept_scalable)
[docs]
def update_order_dataframes(self) -> None:
"""
Populate the order dataframes with acceptance values from the current incumbent.
"""
solution_df = pd.DataFrame(self.current_alloc_solution)
# step orders
accept_step_order_columns = [col for col in solution_df.columns if 'accept_step' in col]
accept_step_values = solution_df[accept_step_order_columns].values.flatten()
self.step_orders['acceptance'] = accept_step_values
# piecewise linear orders
accept_piecewise_linear_order_columns = [col for col in solution_df.columns if
'accept_piecewise_linear' in col]
accept_piecewise_linear_order_values = solution_df[
accept_piecewise_linear_order_columns].values.flatten()
self.piecewise_linear_orders['acceptance'] = accept_piecewise_linear_order_values
# block orders
accept_block_columns = [col for col in solution_df.columns if 'accept_block' in col]
accept_block_values = solution_df[accept_block_columns].values.flatten()
self.block_orders['acceptance'] = accept_block_values
# complex orders
accept_complex_columns = [col for col in solution_df.columns if 'accept_complex[' in col]
accept_complex_step_columns = [col for col in solution_df.columns if 'accept_complex_step[' in col]
accept_complex_values = solution_df[accept_complex_columns].values.flatten()
accept_complex_step_values = solution_df[accept_complex_step_columns].values.flatten()
self.complex_orders['acceptance'] = accept_complex_values
self.complex_step_orders['acceptance'] = accept_complex_step_values
# scalable complex orders
accept_scalable_columns = [col for col in solution_df.columns if 'accept_scalable_complex[' in col]
accept_scalable_step_columns = [col for col in solution_df.columns if 'accept_scalable_step[' in col]
accept_scalable_values = solution_df[accept_scalable_columns].values.flatten()
accept_scalable_step_values = solution_df[accept_scalable_step_columns].values.flatten()
self.scalable_complex_orders['acceptance'] = accept_scalable_values
self.scalable_step_orders['acceptance'] = accept_scalable_step_values
[docs]
def compute_block_overlaps(self) -> dict[int, set[int]]:
"""
Compute block-order overlap sets for price-based cut generation.
Two block orders overlap when they both inject or withdraw non-zero volume in at
least one common period. The returned mapping is keyed by block order id.
"""
period_cols = [f"q{t}" for t in self.periods]
# Extract only the 'id' column and the period quantity columns
df = self.block_orders[['id'] + period_cols].copy()
# Boolean mask: True if quantity is non-zero in that period
mask = df[period_cols].ne(0)
# Extract the list of IDs (ensured to be unique)
ids = df['id'].tolist()
# Initialize overlap dictionary with order IDs as keys
overlap = {i: set() for i in ids}
# Compare each pair of orders only once (i < j)
for idx1 in range(len(ids)):
i = ids[idx1]
for idx2 in range(idx1 + 1, len(ids)):
j = ids[idx2]
# Check for overlapping periods with non-zero quantities
if (mask.iloc[idx1] & mask.iloc[idx2]).any():
overlap[i].add(j)
overlap[j].add(i)
return overlap
[docs]
def get_block_bids(self, threshold: bool, reinsertion: Optional[bool] = False) -> list[int]:
"""
Return accepted block orders that violate or nearly violate the PAB condition.
:param threshold: When ``True``, return in-the-money block orders within
``delta_PAB`` of becoming paradoxically accepted. When ``False``, return
currently paradoxically accepted block orders.
:param reinsertion: Whether to evaluate the reinsertion price map.
:return: Block order ids that satisfy the requested filter.
"""
res = []
for i in list(self.block_orders['id']):
accepted = get(self.block_orders, 'acceptance', i) > self.epsilon
if not accepted:
continue
zone = self.get_order_zone(self.block_orders, i)
p = get(self.block_orders, 'p', i)
q = {t: get(self.block_orders, f'q{t}', i) for t in self.periods if get(self.block_orders, f'q{t}', i) != 0}
sale = True if sum(q.values()) > 0 else False
type = get(self.block_orders, 'block_type', i)
# Check if this is a linked parent order
is_linked_parent = any(
other_order['block_type'] == 'linked' and i == other_order['code_prm']
for _, other_order in self.block_orders.iterrows()
)
if is_linked_parent:
# For linked parent orders, calculate family surplus (parent + all children)
family_surplus = 0
# Parent surplus: acceptance * q_t * (MCP_t - p)
parent_surplus = 0
for t in self.periods:
q_t = get(self.block_orders, f'q{t}', i)
if q_t != 0:
parent_surplus += get(self.block_orders, 'acceptance', i) * q_t * (
self.get_price_value(t, zone, reinsertion) - p
)
family_surplus += parent_surplus
# Children surplus: Find children where code_prm == parent_id
children_df = self.block_orders[
(self.block_orders['code_prm'] == i) & (self.block_orders['block_type'] == 'linked')
]
for _, child in children_df.iterrows():
child_id = child['id']
child_zone = self.get_order_zone(self.block_orders, child_id)
child_p = child['p']
child_accepted = get(self.block_orders, 'acceptance', child_id) > self.epsilon
if child_accepted:
child_surplus = 0
for t in self.periods:
child_q_t = get(self.block_orders, f'q{t}', child_id)
if child_q_t != 0:
child_surplus += get(self.block_orders, 'acceptance', child_id) * child_q_t * (
self.get_price_value(t, child_zone, reinsertion) - child_p
)
family_surplus += child_surplus
# Check if family has negative surplus (PAB condition for linked parent)
if not threshold:
if family_surplus < 0: # Family has negative surplus -> PAB
res.append(i)
else:
pass
else:
# Normal block order logic (non-linked parent)
total_quantity = sum(abs(q_t) for q_t in q.values())
weighted_mcp = sum(
self.get_price_value(t, zone, reinsertion) * abs(q_t) / total_quantity
for t, q_t in q.items()
)
# set right weighted_mcp in case of flexible block order
if type == 'flexible':
# overwrite weighted MCP with correct value considering flex_period variable
active_period = calculate_flexible_order_active_period(master_problem=self,
block_id=i)
weighted_mcp = self.get_price_value(active_period, zone, reinsertion)
if threshold:
if sale and weighted_mcp - self.delta_PAB < p < weighted_mcp or not sale and weighted_mcp < p < weighted_mcp - self.delta_PAB:
res.append(i)
else:
if sale and p > weighted_mcp or not sale and weighted_mcp > p:
res.append(i)
path_key = 'pab' if not threshold else 'block_inm_threshold'
file_path = self.paths[path_key] / f"iteration_{self.iteration}.txt"
with open(file_path, "w") as file:
file.writelines(f"{bid}\n" for bid in res)
return res
[docs]
def add_block_cut(self, single: Optional[bool] = False) -> bool:
"""
Add a cut that rejects one or more near-PAB block orders.
:param single: When ``True``, reject one randomly chosen candidate. When
``False``, reject every candidate returned by :meth:`get_block_bids`.
:return: ``True`` when at least one cut was added, otherwise ``False``.
"""
in_the_money_blocks = self.get_block_bids(threshold=True)
if len(in_the_money_blocks) == 0:
self._emit("No INM block orders left to reject.")
return False
if not single:
self.model.addConstrs(self.accept_block[i] == 0 for i in in_the_money_blocks)
else:
random_block = random.choice(in_the_money_blocks)
self.model.addConstr(self.accept_block[random_block] == 0)
self._emit("Block cut successfully added.")
return True
[docs]
def get_MIC_complex_orders(
self, threshold: Optional[bool] = False, reinsertion: Optional[bool] = False
) -> list[int]:
"""
Return accepted complex orders that fail or nearly fail their MIC or MP condition.
:param threshold: When ``True``, apply the ``beta_MIC`` tolerance and return
marginally infeasible orders. When ``False``, return orders whose MIC or MP
condition is already violated at the current prices.
:param reinsertion: Whether to evaluate the reinsertion price map.
:return: Complex order ids that match the requested filter.
"""
mic_complex_order_ids = self.complex_orders.loc[self.complex_orders['condition'] == 'MIC', 'id'].tolist()
mp_complex_order_ids = self.complex_orders.loc[self.complex_orders['condition'] == 'MP', 'id'].tolist()
res = []
for i in mic_complex_order_ids + mp_complex_order_ids:
accepted = get(self.complex_orders, 'acceptance', i) > self.epsilon
if not accepted:
continue
fixed_term = get(self.complex_orders, 'fixed_term', i)
variable_term = get(self.complex_orders, 'variable_term', i)
step_orders_str = get(self.complex_orders, 'step_orders', i)
step_orders = parse_step_order_ids(step_orders_str, self.complex_step_orders)
expected = sum(
variable_term * abs(get(self.complex_step_orders, 'q', j)) * get(self.complex_step_orders, 'acceptance',
j)
for j in step_orders) + fixed_term
actual = 0
for t in self.periods:
step_orders_t = self.complex_step_orders[
(self.complex_step_orders['id'].isin(step_orders)) & (self.complex_step_orders['t'] == t)][
'id'].tolist()
actual += sum(
self.get_price_value(t, self.get_order_zone(self.complex_step_orders, j), reinsertion) *
abs(get(self.complex_step_orders, 'q', j)) * get(self.complex_step_orders, 'acceptance', j)
for j in step_orders_t)
if not threshold:
if i in mic_complex_order_ids and expected > actual:
res.append(i)
elif i in mp_complex_order_ids and expected < actual:
res.append(i)
else:
if i in mic_complex_order_ids and expected * (1 - self.beta_MIC) > actual:
res.append(i)
elif i in mp_complex_order_ids and actual > expected * (1 + self.beta_MIC):
res.append(i)
path_key = 'complex_mic_inm_threshold' if threshold else 'complex_mic'
file_path = self.paths[path_key] / f"iteration_{self.iteration}.txt"
with open(file_path, "w") as file:
file.writelines(f"{bid}\n" for bid in res)
return res
[docs]
def get_MIC_scalable_orders(
self, threshold: Optional[bool] = False, reinsertion: Optional[bool] = False
) -> list[int]:
"""
Return accepted scalable complex orders that fail or nearly fail MIC or MP.
:param threshold: When ``True``, apply the ``beta_MIC`` tolerance and return
marginally infeasible orders. When ``False``, return orders whose MIC or MP
condition is already violated at the current prices.
:param reinsertion: Whether to evaluate the reinsertion price map.
:return: Scalable complex order ids that match the requested filter.
"""
mic_scalable_order_ids = self.scalable_complex_orders.loc[
self.scalable_complex_orders['condition'] == 'MIC', 'id'].tolist()
mp_scalable_order_ids = self.scalable_complex_orders.loc[
self.scalable_complex_orders['condition'] == 'MP', 'id'].tolist()
res = []
for i in mic_scalable_order_ids + mp_scalable_order_ids:
accepted = get(self.scalable_complex_orders, 'acceptance', i) > self.epsilon
if not accepted:
continue
fixed_term = get(self.scalable_complex_orders, 'fixed_term', i)
step_orders_str = get(self.scalable_complex_orders, 'step_orders', i)
step_orders = parse_step_order_ids(step_orders_str, self.scalable_step_orders)
expected, actual = 0, 0
for t in self.periods:
step_orders_t = self.scalable_step_orders[
(self.scalable_step_orders['id'].isin(step_orders)) & (self.scalable_step_orders['t'] == t)][
'id'].tolist()
actual += sum(
self.get_price_value(t, self.get_order_zone(self.scalable_step_orders, j), reinsertion) *
abs(get(self.scalable_step_orders, 'q', j)) * get(self.scalable_step_orders, 'acceptance', j)
for j in step_orders_t)
expected += sum(get(self.scalable_step_orders, 'p', j) * abs(get(self.scalable_step_orders, 'q', j)) *
get(self.scalable_step_orders, 'acceptance', j) for j in step_orders_t)
expected += fixed_term
if not threshold:
if i in mic_scalable_order_ids and expected > actual:
res.append(i)
elif i in mp_scalable_order_ids and expected < actual:
res.append(i)
else:
if i in mic_scalable_order_ids and expected * (1 - self.beta_MIC) > actual:
res.append(i)
elif i in mp_scalable_order_ids and actual > (1 + self.beta_MIC):
res.append(i)
path_key = 'scalable_mic_inm_threshold' if threshold else 'scalable_mic'
file_path = self.paths[path_key] / f"iteration_{self.iteration}.txt"
with open(file_path, "w") as file:
file.writelines(f"{bid}\n" for bid in res)
return res
[docs]
def get_load_gradient_orders(
self,
threshold: Optional[bool] = False,
reinsertion: Optional[bool] = False,
complex: bool = True,
) -> list[int]:
"""
Return accepted load-gradient orders with negative surplus.
:param threshold: When ``True``, only return orders whose surplus is below
``-delta_load_gradient``. When ``False``, return all orders with negative
surplus.
:param reinsertion: Whether to evaluate the reinsertion price map.
:param complex: When ``True``, inspect complex orders. When ``False``, inspect
scalable complex orders.
:return: Order ids whose surplus violates the requested threshold.
"""
# Select order and step dataframes depending on `complex` flag
if complex:
orders_df = self.complex_orders[
(self.complex_orders['condition'] == 'load gradient') &
(self.complex_orders['acceptance'] > self.epsilon)
]
step_orders_df = self.complex_step_orders
step_parent_col = 'complex_order_id'
else:
orders_df = self.scalable_complex_orders[
(self.scalable_complex_orders['condition'] == 'load gradient') &
(self.scalable_complex_orders['acceptance'] > self.epsilon)
]
step_orders_df = self.scalable_step_orders
step_parent_col = 'scalable_order_id'
res = []
for _, order in orders_df.iterrows():
surplus = 0.0
order_id = order['id']
for _, step in step_orders_df[step_orders_df[step_parent_col] == order_id].iterrows():
t = step['t']
q = step['q']
p = step['p']
accept = step['acceptance']
step_zone = self.resolve_zone(step.get("zone", self.default_zone))
surplus += accept * q * (self.get_price_value(t, step_zone, reinsertion) - p)
if (not threshold and surplus < 0) or (threshold and surplus < -self.delta_load_gradient):
res.append(order_id)
return res
[docs]
def add_MIC_complex_cut(self, single: Optional[bool] = False) -> bool:
"""
Add a cut that rejects one or more near-MIC complex orders.
:param single: When ``True``, reject one randomly chosen candidate. When
``False``, reject every candidate returned by :meth:`get_MIC_complex_orders`.
:return: ``True`` when at least one cut was added, otherwise ``False``.
"""
in_the_money_MIC_complex_orders = self.get_MIC_complex_orders(threshold=True)
if len(in_the_money_MIC_complex_orders) == 0:
self._emit("No INM complex MIC orders left to reject.")
return False
else:
if not single:
self.model.addConstrs(self.accept_complex[i] == 0 for i in in_the_money_MIC_complex_orders)
else:
random_order = random.choice(in_the_money_MIC_complex_orders)
self.model.addConstr(self.accept_complex[random_order] == 0)
self._emit("MIC complex cut successfully added.")
return True
[docs]
def add_MIC_scalable_cut(self, single: Optional[bool] = False) -> bool:
"""
Add a cut that rejects one or more near-MIC scalable complex orders.
:param single: When ``True``, reject one randomly chosen candidate. When
``False``, reject every candidate returned by
:meth:`get_MIC_scalable_orders`.
:return: ``True`` when at least one cut was added, otherwise ``False``.
"""
in_the_money_MIC_scalable_orders = self.get_MIC_scalable_orders(threshold=True)
if len(in_the_money_MIC_scalable_orders) == 0:
self._emit("No INM scalable complex MIC orders left to reject.")
return False
else:
if not single:
self.model.addConstrs(self.accept_scalable[i] == 0 for i in in_the_money_MIC_scalable_orders)
else:
random_order = random.choice(in_the_money_MIC_scalable_orders)
self.model.addConstr(self.accept_scalable[random_order] == 0)
self._emit("MIC scalable complex cut successfully added.")
return True
[docs]
def volume_indeterminacy_subproblem(self) -> None:
"""Placeholder for a future volume-indeterminacy subproblem implementation."""
pass
[docs]
def set_prices(self, prices: dict[Any, float], reinsertion: Optional[bool] = False) -> None:
"""
Store clearing prices from the pricing subproblem.
:param prices: Mapping keyed either by period ``t`` or by ``(zone, t)`` for
zonal pricing runs.
:param reinsertion: Whether the prices belong to the reinsertion pass.
"""
if not reinsertion:
self.prices = prices
else:
self.prices_reinsertion = prices
[docs]
def get_objective(self) -> float:
"""Return the current master-problem objective value."""
return self.model.getObjective().getValue()
def __str__(self) -> str:
"""Return a short human-readable model name."""
return 'Euphemia'