Source code for apem.order_book_based_model.euphemia.master_problem.master_problem

import json
import logging
import os
import random
import re
import time
import csv
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from uuid import uuid4

import gurobipy as gp
import pandas as pd
from gurobipy import GRB

import apem.order_book_based_model.euphemia.cutting_strategies.price_based as price_based_cutting
import apem.order_book_based_model.euphemia.cutting_strategies.combinatorial_benders as combinatorial_benders_cutting
import apem.order_book_based_model.euphemia.cutting_strategies.no_good as no_good_cutting
from apem.order_book_based_model.euphemia.enums.cut_types import CutTypes
from apem.order_book_based_model.euphemia.euphemia_config import EuphemiaConfig
from apem.order_book_based_model.euphemia.model.setup_model import add_objective, add_market_constraints, \
    add_network_constraints
from apem.order_book_based_model.euphemia.pricing.price_determination_subproblem import PriceSubproblem
from apem.order_book_based_model.euphemia.reinsertions.prmic_prb_reinsertion import PRMIC_PRB_reinsertion
from apem.order_book_based_model.euphemia.utils.calculations import calculate_flexible_order_active_period, \
    calculate_block_demand_surplus
from apem.order_book_based_model.euphemia.utils.extraction import get, parse_step_order_ids


def _slugify(value: str) -> str:
    """Convert free-text labels to folder-safe identifiers."""
    return re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_")


def _new_run_id() -> str:
    """Create a unique run id with UTC timestamp and random suffix."""
    timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
    return f"{timestamp}_{uuid4().hex[:8]}"


def _normalize_zone(zone: object, default_zone: str) -> str:
    """Return a canonical zone label and strip accidental quote characters."""
    if pd.isna(zone):
        return default_zone
    zone_str = str(zone).strip().strip("'\"").strip()
    return zone_str if zone_str else default_zone


[docs] class MasterProblem: """ Formulate and solve the Euphemia master problem. """ def __init__(self, config: EuphemiaConfig) -> None: if not config: config = EuphemiaConfig() self.config = config self.model = gp.Model('Euphemia Master Problem') self.scenario = config.scenario self.periods = self.scenario.periods self.step_orders = self.scenario.step_orders self.block_orders = self.scenario.block_orders self.complex_orders = self.scenario.complex_orders self.complex_step_orders = self.scenario.complex_step_orders self.scalable_complex_orders = self.scenario.scalable_complex_orders self.scalable_step_orders = self.scenario.scalable_step_orders self.piecewise_linear_orders = self.scenario.piecewise_linear_orders self.default_zone = "Z1" # Normalize zone labels across all zonal order tables. for df in ( self.step_orders, self.block_orders, self.complex_step_orders, self.scalable_step_orders, self.piecewise_linear_orders, ): if "zone" not in df.columns: df["zone"] = self.default_zone df["zone"] = df["zone"].apply(lambda zone: _normalize_zone(zone, self.default_zone)) scenario_zones = getattr(self.scenario, "zones", None) if scenario_zones: self.zones = sorted({_normalize_zone(z, self.default_zone) for z in scenario_zones}) else: self.zones = sorted( set(self.step_orders["zone"]) .union(self.block_orders["zone"]) .union(self.complex_step_orders["zone"]) .union(self.scalable_step_orders["zone"]) .union(self.piecewise_linear_orders["zone"]) ) if not self.zones: self.zones = [self.default_zone] self.default_zone = self.zones[0] self.network_model = str(getattr(config, "network_model", "ATC")).upper() if self.network_model not in {"ATC", "FBMC"}: raise ValueError(f"Unsupported network_model '{self.network_model}'. Use 'ATC' or 'FBMC'.") self.atc = getattr(self.scenario, "atc", pd.DataFrame(columns=["from_zone", "to_zone", "t", "cap"])) self.atc_cap = {} self.atc_ramp_up = {} self.atc_ramp_down = {} if isinstance(self.atc, pd.DataFrame) and not self.atc.empty: for _, row in self.atc.iterrows(): t = int(row["t"]) if t not in self.periods: continue from_zone = _normalize_zone(row["from_zone"], self.default_zone) to_zone = _normalize_zone(row["to_zone"], self.default_zone) cap = float(row["cap"]) key = (from_zone, to_zone, t) if key in self.atc_cap: self.atc_cap[key] = min(self.atc_cap[key], cap) else: self.atc_cap[key] = cap if "ramp_up" in row and pd.notna(row["ramp_up"]): self.atc_ramp_up[(from_zone, to_zone)] = float(row["ramp_up"]) if "ramp_down" in row and pd.notna(row["ramp_down"]): self.atc_ramp_down[(from_zone, to_zone)] = float(row["ramp_down"]) if self.atc_cap: self.zones = sorted( set(self.zones).union({i for (i, _, _) in self.atc_cap}).union({j for (_, j, _) in self.atc_cap}) ) self.fb_constraints = getattr( self.scenario, "fb_constraints", pd.DataFrame(columns=["cnec_id", "t", "ram"]) ) self.fb_ptdf = getattr(self.scenario, "fb_ptdf", pd.DataFrame(columns=["cnec_id", "t", "zone", "ptdf"])) self.fb_ram = {} self.fb_lb = {} self.fb_ptdf_map = {} if isinstance(self.fb_constraints, pd.DataFrame) and not self.fb_constraints.empty: for _, row in self.fb_constraints.iterrows(): t = int(row["t"]) if t not in self.periods: continue cnec_id = str(row["cnec_id"]) key = (cnec_id, t) ram = float(row["ram"]) self.fb_ram[key] = min(self.fb_ram[key], ram) if key in self.fb_ram else ram if "lb" in row and pd.notna(row["lb"]): lb = float(row["lb"]) self.fb_lb[key] = max(self.fb_lb[key], lb) if key in self.fb_lb else lb if isinstance(self.fb_ptdf, pd.DataFrame) and not self.fb_ptdf.empty: for _, row in self.fb_ptdf.iterrows(): t = int(row["t"]) if t not in self.periods: continue cnec_id = str(row["cnec_id"]) zone = _normalize_zone(row["zone"], self.default_zone) self.fb_ptdf_map[(cnec_id, t, zone)] = self.fb_ptdf_map.get((cnec_id, t, zone), 0.0) + float(row["ptdf"]) if self.fb_ptdf_map: self.zones = sorted(set(self.zones).union({z for (_, _, z) in self.fb_ptdf_map})) self.atc_index = sorted(self.atc_cap.keys()) self.fb_index = sorted((c, t) for (c, t) in self.fb_ram if any((c, t, z) in self.fb_ptdf_map for z in self.zones)) if self.network_model == "ATC": self.network_constraints_enabled = bool(self.atc_index) and len(self.zones) > 1 else: self.network_constraints_enabled = bool(self.fb_index) and len(self.zones) > 1 self.zonal_pricing_enabled = self.network_constraints_enabled self.accept_step = self.model.addVars(list(self.step_orders['id']), vtype=GRB.CONTINUOUS, lb=0, ub=1, name='accept_step') self.accept_block = self.model.addVars(list(self.block_orders['id']), vtype=GRB.CONTINUOUS, lb=0, ub=1, name='accept_block') # required for the big-M constraint to satisfy the MAR condition of block orders self.MAR_aux = self.model.addVars(list(self.block_orders['id']), vtype=GRB.BINARY, name='y') # required for flexible orders - decide in which period the order is accepted self.flex_period = self.model.addVars( list(self.block_orders[self.block_orders['block_type'] == 'flexible']['id']), self.periods, vtype=GRB.BINARY, name='flex_period') self.accept_complex = self.model.addVars(list(self.complex_orders['id']), vtype=GRB.BINARY, lb=0, ub=1, name='accept_complex') self.accept_complex_step = self.model.addVars(list(self.complex_step_orders['id']), vtype=GRB.CONTINUOUS, lb=0, ub=1, name='accept_complex_step') self.accept_scalable = self.model.addVars(list(self.scalable_complex_orders['id']), vtype=GRB.BINARY, lb=0, ub=1, name='accept_scalable_complex') self.accept_scalable_step = self.model.addVars(list(self.scalable_step_orders['id']), vtype=GRB.CONTINUOUS, lb=0, ub=1, name='accept_scalable_step') self.accept_piecewise_linear = self.model.addVars(list(self.piecewise_linear_orders['id']), vtype=GRB.CONTINUOUS, lb=0, ub=1, name='accept_piecewise_linear') if self.network_model == "ATC": self.f_atc = self.model.addVars(self.atc_index, vtype=GRB.CONTINUOUS, lb=0, name="f_atc") self.net_position = gp.tupledict() else: self.f_atc = gp.tupledict() self.net_position = self.model.addVars( self.zones, self.periods, vtype=GRB.CONTINUOUS, lb=-GRB.INFINITY, name="net_position" ) self.add_acceptance_variables_to_dataframe() # Compute overlapping block orders for price-based cuts self.block_overlap = self.compute_block_overlaps() self.block_orders['overlap_set'] = self.block_orders['id'].map(self.block_overlap) self.current_alloc_solution = {} self.found_solution = False self.current_best_objective = -1 self.reinsertion_run = False self.iteration = 0 self.start_time = 0 self.M = config.big_m self.prices = {} self.prices_reinsertion = {} self.price_lower_bound = config.price_lower_bound self.price_upper_bound = config.price_upper_bound self.delta_PAB = config.delta_PAB self.beta_MIC = config.beta_MIC self.delta_load_gradient = config.delta_load_gradient self.epsilon = config.epsilon self.max_iterations = config.max_iterations self.reinsertion_max_iterations = config.reinsertion_max_iterations self.max_prb_reinsertion_attempts = config.max_prb_reinsertion_attempts self.cutting_strategy = config.cutting_strategy self.disable_reinsertion = config.disable_reinsertion self.calculate_corrected_welfare = config.calculate_corrected_welfare self.output_flag = config.output_flag self.time_limit = config.time_limit self.mip_gap = config.mip_gap self.threads = config.threads self.seed = config.seed self.lazy_constraints = config.lazy_constraints self.model.Params.LazyConstraints = int(self.lazy_constraints) self.model.setParam("OutputFlag", int(self.output_flag)) if self.time_limit is not None: self.model.setParam("TimeLimit", float(self.time_limit)) if self.mip_gap is not None: self.model.setParam("MIPGap", float(self.mip_gap)) if self.threads is not None: self.model.setParam("Threads", int(self.threads)) if self.seed is not None: self.model.setParam("Seed", int(self.seed)) self.run_id = _new_run_id() self.cut_type_key = _slugify(self.cutting_strategy.value) self.created_at_utc = datetime.now(timezone.utc) self.started_at_utc = None project_root = Path(__file__).resolve().parents[4] results_root = project_root / "results" / "order_book_based_model" / "euphemia" self.run_root = results_root / self.config.dataset / self.run_id / self.cut_type_key self.paths = { "alloc": self.run_root / "allocation", "prices": self.run_root / "prices", "pab": self.run_root / "pab", "block_inm_threshold": self.run_root / "block_inm_threshold", "complex_mic": self.run_root / "complex_mic", "complex_mic_inm_threshold": self.run_root / "complex_mic_inm_threshold", "scalable_mic": self.run_root / "scalable_mic", "scalable_mic_inm_threshold": self.run_root / "scalable_mic_inm_threshold", "debug": self.run_root / "debug", "evaluation": self.run_root / "evaluation", } for attr, path in self.paths.items(): setattr(self, attr, path) os.makedirs(path, exist_ok=True) self.run_metadata_path = self.run_root / "run.json" self.run_logger = logging.getLogger(f"apem.euphemia.{self.config.dataset}.{self.cut_type_key}.{self.run_id}") self._setup_run_logger() self.run_metadata = { "run_id": self.run_id, "dataset": self.config.dataset, "network_model": self.network_model, "scenario_name": self.scenario.name, "cut_type": self.cutting_strategy.value, "cut_type_key": self.cut_type_key, "created_at_utc": self.created_at_utc.isoformat().replace("+00:00", "Z"), "started_at_utc": None, "ended_at_utc": None, "status": "initialized", "reinsertion_run": self.reinsertion_run, "iteration": 0, "found_solution": False, "best_objective": None, "model_status": None, "paths": {key: str(path) for key, path in self.paths.items()}, "run_log": str(self.run_root / "run.log"), } self._write_run_metadata() def _setup_run_logger(self) -> None: """Configure a per-run file logger.""" self.run_logger.setLevel(logging.INFO) self.run_logger.propagate = False for handler in list(self.run_logger.handlers): self.run_logger.removeHandler(handler) handler.close() log_file = self.run_root / "run.log" handler = logging.FileHandler(log_file, mode="a", encoding="utf-8") handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s")) self.run_logger.addHandler(handler) def _emit(self, message: str, level: int = logging.INFO) -> None: """Write a message to stdout and the per-run log file.""" print(message) self.run_logger.log(level, message) def _safe_model_status(self) -> Optional[int]: """Return the current Gurobi status code when available.""" try: return int(self.model.Status) except Exception: # noqa: BLE001 return None def _write_run_metadata(self) -> None: """Persist run metadata as JSON for downstream tracking.""" with open(self.run_metadata_path, "w", encoding="utf-8") as metadata_file: json.dump(self.run_metadata, metadata_file, indent=2, sort_keys=True) def _finalize_run_metadata(self, status: str, error: Optional[str] = None) -> None: """Persist final run metadata after optimization or failure.""" elapsed_seconds = None if self.start_time: elapsed_seconds = time.time() - self.start_time self.run_metadata.update( { "status": status, "reinsertion_run": self.reinsertion_run, "iteration": self.iteration, "found_solution": self.found_solution, "best_objective": self.current_best_objective if self.current_best_objective >= 0 else None, "model_status": self._safe_model_status(), "ended_at_utc": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), "elapsed_seconds": elapsed_seconds, } ) if error: self.run_metadata["error"] = error self._write_run_metadata()
[docs] def resolve_zone(self, zone: object) -> str: """Normalize a raw zone label and fall back to the default zone when needed.""" return _normalize_zone(zone, self.default_zone)
[docs] def get_order_zone(self, df: pd.DataFrame, order_id: object) -> str: """Return the normalized zone assigned to a specific order row.""" if "zone" not in df.columns: return self.default_zone return self.resolve_zone(get(df, "zone", order_id))
[docs] def get_price_value(self, t: int, zone: Optional[str] = None, reinsertion: Optional[bool] = False) -> float: """ Return the clearing price for a period. :param t: Market period. :param zone: Optional zone label for zonal pricing runs. :param reinsertion: Whether to use the reinsertion price map. :return: Clearing price for the requested period and zone. """ prices = self.prices_reinsertion if reinsertion else self.prices if self.zonal_pricing_enabled: resolved_zone = self.resolve_zone(zone) if (resolved_zone, t) in prices: return prices[(resolved_zone, t)] if (self.default_zone, t) in prices: return prices[(self.default_zone, t)] return prices[t]
def run(self) -> None: """ Run the full Euphemia solve loop for the configured scenario. The method formulates the master problem, solves it with lazy cuts, checks each incumbent through the pricing subproblem, stores accepted allocations and prices, and optionally launches reinsertion passes once a feasible market outcome is found. """ self.start_time = time.time() self.started_at_utc = datetime.now(timezone.utc) self.run_metadata.update( { "started_at_utc": self.started_at_utc.isoformat().replace("+00:00", "Z"), "status": "running", "reinsertion_run": self.reinsertion_run, "error": None, } ) self._write_run_metadata() run_status = "failed" run_error = None try: self._emit("Formulating master problem (objective + constraints)...") formulation_start = time.perf_counter() add_objective(self) add_market_constraints(self) add_network_constraints(self) self.model.update() formulation_elapsed = time.perf_counter() - formulation_start self._emit( f"Master problem formulated in {formulation_elapsed:.3f}s: vars={self.model.NumVars}, constrs={self.model.NumConstrs}" ) self.max_iterations = self.max_iterations if not self.reinsertion_run else self.reinsertion_max_iterations self._emit("Starting master problem optimization...") optimization_start = time.perf_counter() self.solve_master_problem() optimization_elapsed = time.perf_counter() - optimization_start self._emit(f"Master problem optimization finished in {optimization_elapsed:.3f}s.") self.model.write(str(self.paths["debug"] / "master_problem.lp")) self._emit(f"Master problem status: {self.model.Status}") if self.model.Status == GRB.Status.INFEASIBLE: self._emit("Master problem is infeasible") run_status = "infeasible" else: run_status = "completed_no_solution" if self.found_solution: self._emit( f"------- Surplus maximization and price problem successfully finished after {self.iteration} iterations -------" ) self._emit( f'Final economic surplus{" of reinsertion run" if self.reinsertion_run else ""}: {self.current_best_objective}' ) self._emit(f"Found prices: {self.prices}") run_status = "success" # Log metrics for evaluation if not self.reinsertion_run: elapsed = time.time() - self.start_time file_path = self.paths["evaluation"] / "evaluation.txt" with open(file_path, "a", buffering=1) as file: file.write(f"--- Evaluation: {self.cutting_strategy} on {self.scenario.name} ---\n") if self.cutting_strategy == CutTypes.PB: file.write( f"- beta_MIC: {self.beta_MIC} ; delta_load_gradient: {self.delta_load_gradient} - \n" ) file.write(f"Iterations: {self.iteration}\n") file.write(f"Final welfare: {self.current_best_objective}\n") file.write(f"Time passed: {elapsed:.3f} seconds\n") file.write(f"Clearing prices {self.prices}\n") if self.calculate_corrected_welfare: inelastic_surplus = calculate_block_demand_surplus(self) file.write(f"Corrected welfare: {self.current_best_objective - inelastic_surplus}\n") file.write("\n") file.flush() os.fsync(file.fileno()) if not self.reinsertion_run and not self.disable_reinsertion: PRMIC_PRB_reinsertion(self, is_prmic_reinsertion=True) PRMIC_PRB_reinsertion(self, is_prmic_reinsertion=False) except Exception as exc: # noqa: BLE001 run_error = str(exc) self._emit(f"Run failed: {exc}", level=logging.ERROR) self.run_logger.exception("Unhandled exception during run") raise finally: self._finalize_run_metadata(status=run_status, error=run_error)
[docs] def solve_master_problem(self) -> None: """ Optimize the master problem and evaluate incumbents through the callback. """ self.model.optimize(callback=self.master_problem_callback)
[docs] def master_problem_callback(self, callback_model: gp.Model, where: int) -> None: """ Process incumbent integer solutions produced by the master problem. For each MIP solution, the callback updates the in-memory allocation tables, solves the pricing subproblem, stores a new incumbent when prices are feasible, or adds the configured lazy cut when the pricing problem is infeasible. """ # when a MIP solution was found if where == GRB.Callback.MIPSOL: # Check iteration limit if self.iteration >= self.max_iterations: self._emit(f"Maximum iterations ({self.max_iterations}) reached. Terminating.") callback_model.terminate() # terminate optimization early if not self.reinsertion_run: elapsed = time.time() - self.start_time # Log if no solution could be found file_path = self.paths["evaluation"] / "evaluation.txt" with open(file_path, "a", buffering=1) as file: file.write(f"--- Evaluation: {self.cutting_strategy} on {self.scenario.name} ---\n") if self.cutting_strategy == CutTypes.PB: file.write( f"- beta_MIC: {self.beta_MIC} ; delta_load_gradient: {self.delta_load_gradient} - \n") file.write(f"No solution in iteration limit of {self.max_iterations}\n") file.write(f"Time passed: {elapsed:.3f} seconds\n\n") file.flush() os.fsync(file.fileno()) return # get current solution objective_value = callback_model.cbGet(GRB.Callback.MIPSOL_OBJ) vars = callback_model.getVars() solution = callback_model.cbGetSolution(vars) if solution is not None: self._emit("Found integer solution") self._emit(f"Objective value: {objective_value}") self.iteration += 1 # match variables with value in current solution self.current_alloc_solution = {v.VarName: [val] for v, val in zip(vars, solution)} self.update_order_dataframes() # Write current allocation solution to file file_path = self.paths["alloc"] / "results.csv" with open(file_path, "w", newline="", buffering=1, encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(["variable", "value"]) for var in callback_model.getVars(): writer.writerow([var.VarName, callback_model.cbGetSolution(var)]) f.flush() os.fsync(f.fileno()) self._emit("Solving price determination subproblem...") price_subproblem = PriceSubproblem(master_problem=self) price_subproblem.solve_price_determination_subproblem() # If price subproblem optimal check if new incumbent was found if price_subproblem.pricing_model.Status == GRB.OPTIMAL: self._emit("Found market clearing prices") # Write MCPs to file file_path = self.paths["prices"] / "prices.csv" file_exists = file_path.exists() and file_path.stat().st_size > 0 with open(file_path, "a", buffering=1, encoding="utf-8") as file: # 'a' = append if not file_exists: file.write("variable,value\n") for v in price_subproblem.pricing_model.getVars(): file.write(f"{v.varName},{v.X}\n") # self._emit(f"{v.varName}: {v.X}") # for console output and run log file.flush() os.fsync(file.fileno()) if objective_value > self.current_best_objective: self.set_prices(price_subproblem.extract_prices(), reinsertion=False) self.current_best_objective = objective_value self.found_solution = True # if price subproblem is infeasible, add cut to master problem if price_subproblem.pricing_model.Status == GRB.INFEASIBLE: self._emit("Price subproblem is infeasible") if self.cutting_strategy == CutTypes.CB: combinatorial_benders_cutting.add_combinatorial_benders_cut(self=self, callback_model=callback_model, price_subproblem=price_subproblem) elif self.cutting_strategy == CutTypes.NG: no_good_cutting.add_no_good_cut(self=self, callback_model=callback_model) elif self.cutting_strategy == CutTypes.PB: price_based_cutting.handle_price_based_cutting(self=self, callback_model=callback_model)
[docs] def add_acceptance_variables_to_dataframe(self) -> None: """ Attach each Gurobi acceptance variable to the corresponding order dataframe row. """ self.step_orders['acceptance_var'] = self.step_orders['id'].map(self.accept_step) self.piecewise_linear_orders['acceptance_var'] = self.piecewise_linear_orders['id'].map( self.accept_piecewise_linear) self.block_orders['acceptance_var'] = self.block_orders['id'].map(self.accept_block) self.complex_orders['acceptance_var'] = self.complex_orders['id'].map(self.accept_complex) self.scalable_complex_orders['acceptance_var'] = self.scalable_complex_orders['id'].map(self.accept_scalable)
[docs] def update_order_dataframes(self) -> None: """ Populate the order dataframes with acceptance values from the current incumbent. """ solution_df = pd.DataFrame(self.current_alloc_solution) # step orders accept_step_order_columns = [col for col in solution_df.columns if 'accept_step' in col] accept_step_values = solution_df[accept_step_order_columns].values.flatten() self.step_orders['acceptance'] = accept_step_values # piecewise linear orders accept_piecewise_linear_order_columns = [col for col in solution_df.columns if 'accept_piecewise_linear' in col] accept_piecewise_linear_order_values = solution_df[ accept_piecewise_linear_order_columns].values.flatten() self.piecewise_linear_orders['acceptance'] = accept_piecewise_linear_order_values # block orders accept_block_columns = [col for col in solution_df.columns if 'accept_block' in col] accept_block_values = solution_df[accept_block_columns].values.flatten() self.block_orders['acceptance'] = accept_block_values # complex orders accept_complex_columns = [col for col in solution_df.columns if 'accept_complex[' in col] accept_complex_step_columns = [col for col in solution_df.columns if 'accept_complex_step[' in col] accept_complex_values = solution_df[accept_complex_columns].values.flatten() accept_complex_step_values = solution_df[accept_complex_step_columns].values.flatten() self.complex_orders['acceptance'] = accept_complex_values self.complex_step_orders['acceptance'] = accept_complex_step_values # scalable complex orders accept_scalable_columns = [col for col in solution_df.columns if 'accept_scalable_complex[' in col] accept_scalable_step_columns = [col for col in solution_df.columns if 'accept_scalable_step[' in col] accept_scalable_values = solution_df[accept_scalable_columns].values.flatten() accept_scalable_step_values = solution_df[accept_scalable_step_columns].values.flatten() self.scalable_complex_orders['acceptance'] = accept_scalable_values self.scalable_step_orders['acceptance'] = accept_scalable_step_values
[docs] def compute_block_overlaps(self) -> dict[int, set[int]]: """ Compute block-order overlap sets for price-based cut generation. Two block orders overlap when they both inject or withdraw non-zero volume in at least one common period. The returned mapping is keyed by block order id. """ period_cols = [f"q{t}" for t in self.periods] # Extract only the 'id' column and the period quantity columns df = self.block_orders[['id'] + period_cols].copy() # Boolean mask: True if quantity is non-zero in that period mask = df[period_cols].ne(0) # Extract the list of IDs (ensured to be unique) ids = df['id'].tolist() # Initialize overlap dictionary with order IDs as keys overlap = {i: set() for i in ids} # Compare each pair of orders only once (i < j) for idx1 in range(len(ids)): i = ids[idx1] for idx2 in range(idx1 + 1, len(ids)): j = ids[idx2] # Check for overlapping periods with non-zero quantities if (mask.iloc[idx1] & mask.iloc[idx2]).any(): overlap[i].add(j) overlap[j].add(i) return overlap
[docs] def get_block_bids(self, threshold: bool, reinsertion: Optional[bool] = False) -> list[int]: """ Return accepted block orders that violate or nearly violate the PAB condition. :param threshold: When ``True``, return in-the-money block orders within ``delta_PAB`` of becoming paradoxically accepted. When ``False``, return currently paradoxically accepted block orders. :param reinsertion: Whether to evaluate the reinsertion price map. :return: Block order ids that satisfy the requested filter. """ res = [] for i in list(self.block_orders['id']): accepted = get(self.block_orders, 'acceptance', i) > self.epsilon if not accepted: continue zone = self.get_order_zone(self.block_orders, i) p = get(self.block_orders, 'p', i) q = {t: get(self.block_orders, f'q{t}', i) for t in self.periods if get(self.block_orders, f'q{t}', i) != 0} sale = True if sum(q.values()) > 0 else False type = get(self.block_orders, 'block_type', i) # Check if this is a linked parent order is_linked_parent = any( other_order['block_type'] == 'linked' and i == other_order['code_prm'] for _, other_order in self.block_orders.iterrows() ) if is_linked_parent: # For linked parent orders, calculate family surplus (parent + all children) family_surplus = 0 # Parent surplus: acceptance * q_t * (MCP_t - p) parent_surplus = 0 for t in self.periods: q_t = get(self.block_orders, f'q{t}', i) if q_t != 0: parent_surplus += get(self.block_orders, 'acceptance', i) * q_t * ( self.get_price_value(t, zone, reinsertion) - p ) family_surplus += parent_surplus # Children surplus: Find children where code_prm == parent_id children_df = self.block_orders[ (self.block_orders['code_prm'] == i) & (self.block_orders['block_type'] == 'linked') ] for _, child in children_df.iterrows(): child_id = child['id'] child_zone = self.get_order_zone(self.block_orders, child_id) child_p = child['p'] child_accepted = get(self.block_orders, 'acceptance', child_id) > self.epsilon if child_accepted: child_surplus = 0 for t in self.periods: child_q_t = get(self.block_orders, f'q{t}', child_id) if child_q_t != 0: child_surplus += get(self.block_orders, 'acceptance', child_id) * child_q_t * ( self.get_price_value(t, child_zone, reinsertion) - child_p ) family_surplus += child_surplus # Check if family has negative surplus (PAB condition for linked parent) if not threshold: if family_surplus < 0: # Family has negative surplus -> PAB res.append(i) else: pass else: # Normal block order logic (non-linked parent) total_quantity = sum(abs(q_t) for q_t in q.values()) weighted_mcp = sum( self.get_price_value(t, zone, reinsertion) * abs(q_t) / total_quantity for t, q_t in q.items() ) # set right weighted_mcp in case of flexible block order if type == 'flexible': # overwrite weighted MCP with correct value considering flex_period variable active_period = calculate_flexible_order_active_period(master_problem=self, block_id=i) weighted_mcp = self.get_price_value(active_period, zone, reinsertion) if threshold: if sale and weighted_mcp - self.delta_PAB < p < weighted_mcp or not sale and weighted_mcp < p < weighted_mcp - self.delta_PAB: res.append(i) else: if sale and p > weighted_mcp or not sale and weighted_mcp > p: res.append(i) path_key = 'pab' if not threshold else 'block_inm_threshold' file_path = self.paths[path_key] / f"iteration_{self.iteration}.txt" with open(file_path, "w") as file: file.writelines(f"{bid}\n" for bid in res) return res
[docs] def add_block_cut(self, single: Optional[bool] = False) -> bool: """ Add a cut that rejects one or more near-PAB block orders. :param single: When ``True``, reject one randomly chosen candidate. When ``False``, reject every candidate returned by :meth:`get_block_bids`. :return: ``True`` when at least one cut was added, otherwise ``False``. """ in_the_money_blocks = self.get_block_bids(threshold=True) if len(in_the_money_blocks) == 0: self._emit("No INM block orders left to reject.") return False if not single: self.model.addConstrs(self.accept_block[i] == 0 for i in in_the_money_blocks) else: random_block = random.choice(in_the_money_blocks) self.model.addConstr(self.accept_block[random_block] == 0) self._emit("Block cut successfully added.") return True
[docs] def get_MIC_complex_orders( self, threshold: Optional[bool] = False, reinsertion: Optional[bool] = False ) -> list[int]: """ Return accepted complex orders that fail or nearly fail their MIC or MP condition. :param threshold: When ``True``, apply the ``beta_MIC`` tolerance and return marginally infeasible orders. When ``False``, return orders whose MIC or MP condition is already violated at the current prices. :param reinsertion: Whether to evaluate the reinsertion price map. :return: Complex order ids that match the requested filter. """ mic_complex_order_ids = self.complex_orders.loc[self.complex_orders['condition'] == 'MIC', 'id'].tolist() mp_complex_order_ids = self.complex_orders.loc[self.complex_orders['condition'] == 'MP', 'id'].tolist() res = [] for i in mic_complex_order_ids + mp_complex_order_ids: accepted = get(self.complex_orders, 'acceptance', i) > self.epsilon if not accepted: continue fixed_term = get(self.complex_orders, 'fixed_term', i) variable_term = get(self.complex_orders, 'variable_term', i) step_orders_str = get(self.complex_orders, 'step_orders', i) step_orders = parse_step_order_ids(step_orders_str, self.complex_step_orders) expected = sum( variable_term * abs(get(self.complex_step_orders, 'q', j)) * get(self.complex_step_orders, 'acceptance', j) for j in step_orders) + fixed_term actual = 0 for t in self.periods: step_orders_t = self.complex_step_orders[ (self.complex_step_orders['id'].isin(step_orders)) & (self.complex_step_orders['t'] == t)][ 'id'].tolist() actual += sum( self.get_price_value(t, self.get_order_zone(self.complex_step_orders, j), reinsertion) * abs(get(self.complex_step_orders, 'q', j)) * get(self.complex_step_orders, 'acceptance', j) for j in step_orders_t) if not threshold: if i in mic_complex_order_ids and expected > actual: res.append(i) elif i in mp_complex_order_ids and expected < actual: res.append(i) else: if i in mic_complex_order_ids and expected * (1 - self.beta_MIC) > actual: res.append(i) elif i in mp_complex_order_ids and actual > expected * (1 + self.beta_MIC): res.append(i) path_key = 'complex_mic_inm_threshold' if threshold else 'complex_mic' file_path = self.paths[path_key] / f"iteration_{self.iteration}.txt" with open(file_path, "w") as file: file.writelines(f"{bid}\n" for bid in res) return res
[docs] def get_MIC_scalable_orders( self, threshold: Optional[bool] = False, reinsertion: Optional[bool] = False ) -> list[int]: """ Return accepted scalable complex orders that fail or nearly fail MIC or MP. :param threshold: When ``True``, apply the ``beta_MIC`` tolerance and return marginally infeasible orders. When ``False``, return orders whose MIC or MP condition is already violated at the current prices. :param reinsertion: Whether to evaluate the reinsertion price map. :return: Scalable complex order ids that match the requested filter. """ mic_scalable_order_ids = self.scalable_complex_orders.loc[ self.scalable_complex_orders['condition'] == 'MIC', 'id'].tolist() mp_scalable_order_ids = self.scalable_complex_orders.loc[ self.scalable_complex_orders['condition'] == 'MP', 'id'].tolist() res = [] for i in mic_scalable_order_ids + mp_scalable_order_ids: accepted = get(self.scalable_complex_orders, 'acceptance', i) > self.epsilon if not accepted: continue fixed_term = get(self.scalable_complex_orders, 'fixed_term', i) step_orders_str = get(self.scalable_complex_orders, 'step_orders', i) step_orders = parse_step_order_ids(step_orders_str, self.scalable_step_orders) expected, actual = 0, 0 for t in self.periods: step_orders_t = self.scalable_step_orders[ (self.scalable_step_orders['id'].isin(step_orders)) & (self.scalable_step_orders['t'] == t)][ 'id'].tolist() actual += sum( self.get_price_value(t, self.get_order_zone(self.scalable_step_orders, j), reinsertion) * abs(get(self.scalable_step_orders, 'q', j)) * get(self.scalable_step_orders, 'acceptance', j) for j in step_orders_t) expected += sum(get(self.scalable_step_orders, 'p', j) * abs(get(self.scalable_step_orders, 'q', j)) * get(self.scalable_step_orders, 'acceptance', j) for j in step_orders_t) expected += fixed_term if not threshold: if i in mic_scalable_order_ids and expected > actual: res.append(i) elif i in mp_scalable_order_ids and expected < actual: res.append(i) else: if i in mic_scalable_order_ids and expected * (1 - self.beta_MIC) > actual: res.append(i) elif i in mp_scalable_order_ids and actual > (1 + self.beta_MIC): res.append(i) path_key = 'scalable_mic_inm_threshold' if threshold else 'scalable_mic' file_path = self.paths[path_key] / f"iteration_{self.iteration}.txt" with open(file_path, "w") as file: file.writelines(f"{bid}\n" for bid in res) return res
[docs] def get_load_gradient_orders( self, threshold: Optional[bool] = False, reinsertion: Optional[bool] = False, complex: bool = True, ) -> list[int]: """ Return accepted load-gradient orders with negative surplus. :param threshold: When ``True``, only return orders whose surplus is below ``-delta_load_gradient``. When ``False``, return all orders with negative surplus. :param reinsertion: Whether to evaluate the reinsertion price map. :param complex: When ``True``, inspect complex orders. When ``False``, inspect scalable complex orders. :return: Order ids whose surplus violates the requested threshold. """ # Select order and step dataframes depending on `complex` flag if complex: orders_df = self.complex_orders[ (self.complex_orders['condition'] == 'load gradient') & (self.complex_orders['acceptance'] > self.epsilon) ] step_orders_df = self.complex_step_orders step_parent_col = 'complex_order_id' else: orders_df = self.scalable_complex_orders[ (self.scalable_complex_orders['condition'] == 'load gradient') & (self.scalable_complex_orders['acceptance'] > self.epsilon) ] step_orders_df = self.scalable_step_orders step_parent_col = 'scalable_order_id' res = [] for _, order in orders_df.iterrows(): surplus = 0.0 order_id = order['id'] for _, step in step_orders_df[step_orders_df[step_parent_col] == order_id].iterrows(): t = step['t'] q = step['q'] p = step['p'] accept = step['acceptance'] step_zone = self.resolve_zone(step.get("zone", self.default_zone)) surplus += accept * q * (self.get_price_value(t, step_zone, reinsertion) - p) if (not threshold and surplus < 0) or (threshold and surplus < -self.delta_load_gradient): res.append(order_id) return res
[docs] def add_MIC_complex_cut(self, single: Optional[bool] = False) -> bool: """ Add a cut that rejects one or more near-MIC complex orders. :param single: When ``True``, reject one randomly chosen candidate. When ``False``, reject every candidate returned by :meth:`get_MIC_complex_orders`. :return: ``True`` when at least one cut was added, otherwise ``False``. """ in_the_money_MIC_complex_orders = self.get_MIC_complex_orders(threshold=True) if len(in_the_money_MIC_complex_orders) == 0: self._emit("No INM complex MIC orders left to reject.") return False else: if not single: self.model.addConstrs(self.accept_complex[i] == 0 for i in in_the_money_MIC_complex_orders) else: random_order = random.choice(in_the_money_MIC_complex_orders) self.model.addConstr(self.accept_complex[random_order] == 0) self._emit("MIC complex cut successfully added.") return True
[docs] def add_MIC_scalable_cut(self, single: Optional[bool] = False) -> bool: """ Add a cut that rejects one or more near-MIC scalable complex orders. :param single: When ``True``, reject one randomly chosen candidate. When ``False``, reject every candidate returned by :meth:`get_MIC_scalable_orders`. :return: ``True`` when at least one cut was added, otherwise ``False``. """ in_the_money_MIC_scalable_orders = self.get_MIC_scalable_orders(threshold=True) if len(in_the_money_MIC_scalable_orders) == 0: self._emit("No INM scalable complex MIC orders left to reject.") return False else: if not single: self.model.addConstrs(self.accept_scalable[i] == 0 for i in in_the_money_MIC_scalable_orders) else: random_order = random.choice(in_the_money_MIC_scalable_orders) self.model.addConstr(self.accept_scalable[random_order] == 0) self._emit("MIC scalable complex cut successfully added.") return True
[docs] def volume_indeterminacy_subproblem(self) -> None: """Placeholder for a future volume-indeterminacy subproblem implementation.""" pass
[docs] def set_prices(self, prices: dict[Any, float], reinsertion: Optional[bool] = False) -> None: """ Store clearing prices from the pricing subproblem. :param prices: Mapping keyed either by period ``t`` or by ``(zone, t)`` for zonal pricing runs. :param reinsertion: Whether the prices belong to the reinsertion pass. """ if not reinsertion: self.prices = prices else: self.prices_reinsertion = prices
[docs] def get_objective(self) -> float: """Return the current master-problem objective value.""" return self.model.getObjective().getValue()
def __str__(self) -> str: """Return a short human-readable model name.""" return 'Euphemia'