Source code for power_flow_relaxations.models.nodal_base_model

from typing import Optional
from abc import abstractmethod
import pandas as pd
import numpy as np
import sys

from mosek.fusion import Matrix, Model, Domain, ObjectiveSense, Expr, SolutionError, ProblemStatus, SolutionStatus
import mosek.fusion.pythonic # Don't remove this import, it is needed for the Mosek Fusion API to work properly
from types import SimpleNamespace

from apem.unit_based_model.error import Error
from apem.unit_based_model.allocation.allocation import SellersAllocation, Allocation
from apem.unit_based_model.solver_configuration import SolverConfiguration as Configuration
from apem.unit_based_model.allocation.power_flow_model import PowerFlowModel
from apem.unit_based_model.allocation.analysis.stats import compute_stats
from apem.unit_based_model.data.parsing.scenario import Scenario
from apem.unit_based_model.utils.extraction import preprocess_as_dict


[docs] class NodalBaseModel(PowerFlowModel): """ Shared optimization scaffold for nodal power-flow relaxations. This base class handles: - scenario normalization/parsing into indexed sets and tensors, - variable creation and common market/network constraints, - objective assembly for welfare or zonal-tracking modes, - solving, optional integrality forcing, and allocation extraction. Subclasses provide relaxation-specific network equations by implementing `power_constraints`, `reference_constraints`, and `get_V_vt_values`. """
[docs] def read_scenario(self, scenario): """ Normalize and ingest a scenario into model-ready structures. Harmonizes expected buyer/seller columns, creates index mappings for entities and periods, builds node-neighbor and node-agent mappings, and precomputes dictionary lookups for bids, capacities, and costs. """ # Normalize required columns for this model if "max_real_dem" not in scenario.df_buyers.columns and "max_dem" in scenario.df_buyers.columns: scenario.df_buyers = scenario.df_buyers.copy() scenario.df_buyers["max_real_dem"] = scenario.df_buyers["max_dem"] for col in ["min_real_dem", "min_reactive_dem", "max_reactive_dem"]: if col not in scenario.df_buyers.columns: scenario.df_buyers[col] = 0 if "max_real_prod" not in scenario.df_sellers.columns and "max_prod" in scenario.df_sellers.columns: scenario.df_sellers = scenario.df_sellers.copy() scenario.df_sellers["max_real_prod"] = scenario.df_sellers["max_prod"] if "min_real_prod" not in scenario.df_sellers.columns and "min_prod" in scenario.df_sellers.columns: scenario.df_sellers["min_real_prod"] = scenario.df_sellers["min_prod"] for col in ["min_reactive_prod", "max_reactive_prod"]: if col not in scenario.df_sellers.columns: scenario.df_sellers[col] = 0 buyers_list = scenario.df_buyers["buyer"].unique().tolist() sellers_list = scenario.df_sellers["seller"].unique().tolist() periods_list = scenario.periods if hasattr(scenario, "periods") else scenario.df_buyers["period"].unique().tolist() self.buyers = list(enumerate(buyers_list)) self.buyer_indices = {buyer: i for i, buyer in self.buyers} self.sellers = list(enumerate(sellers_list)) self.seller_indices = {seller: i for i, seller in self.sellers} self.periods = list(enumerate(periods_list)) self.blocks_buyers = list(enumerate(scenario.blocks_buyers)) self.blocks_sellers = list(enumerate(scenario.blocks_sellers)) self.network = scenario.network self.nodes = list(enumerate(self.network)) self.node_indices = {node: i for i, node in self.nodes} self.reference_bus = self.node_indices[scenario.r_star], scenario.r_star self.neighbours = { node: [(self.node_indices[w], w) for w in self.network[node]] for node in self.network } self.nodes_agents = scenario.nodes_agents self.node_buyers = { node: [ self.buyer_indices[buyer] for buyer in self.nodes_agents[node]["buyers"] ] for node in self.network } self.node_sellers = { node: [ self.seller_indices[seller] for seller in self.nodes_agents[node]["sellers"] ] for node in self.network } self.buyer_val_dict, self.buyer_size_dict = {}, {} self.seller_cost_dict, self.seller_size_dict = {}, {} self.min_real_dem = preprocess_as_dict( scenario.df_buyers, ["buyer", "period"], "min_real_dem" ) self.max_real_dem = preprocess_as_dict( scenario.df_buyers, ["buyer", "period"], "max_real_dem" ) self.min_reactive_dem = preprocess_as_dict( scenario.df_buyers, ["buyer", "period"], "min_reactive_dem" ) self.max_reactive_dem = preprocess_as_dict( scenario.df_buyers, ["buyer", "period"], "max_reactive_dem" ) self.min_real_prod = preprocess_as_dict( scenario.df_sellers, ["seller", "period"], "min_real_prod" ) self.max_real_prod = preprocess_as_dict( scenario.df_sellers, ["seller", "period"], "max_real_prod" ) self.min_reactive_prod = preprocess_as_dict( scenario.df_sellers, ["seller", "period"], "min_reactive_prod" ) self.max_reactive_prod = preprocess_as_dict( scenario.df_sellers, ["seller", "period"], "max_reactive_prod" ) self.no_load_cost = preprocess_as_dict( scenario.df_sellers, ["seller", "period"], "no_load_cost" ) self.min_uptime = preprocess_as_dict( scenario.df_sellers, ["seller", "period"], "min_uptime" ) for block in scenario.blocks_buyers: self.buyer_val_dict[block] = preprocess_as_dict( scenario.df_buyers, ["buyer", "period"], "val", block ) self.buyer_size_dict[block] = preprocess_as_dict( scenario.df_buyers, ["buyer", "period"], "size", block ) for block in scenario.blocks_sellers: self.seller_cost_dict[block] = preprocess_as_dict( scenario.df_sellers, ["seller", "period"], "cost", block ) self.seller_size_dict[block] = preprocess_as_dict( scenario.df_sellers, ["seller", "period"], "size", block )
[docs] def initialize_parameters(self): """ Convert dictionary-based inputs into dense NumPy tensors. Produces tensor/matrix representations for buyer values/sizes, seller costs/sizes, no-load costs, and real/reactive demand/supply bounds, all aligned with internal `(entity, period, block)` indexing. """ self.buyer_val_tensor = np.zeros((len(self.buyers), len(self.periods), len(self.blocks_buyers))) for lb, block in self.blocks_buyers: for b, buyer in self.buyers: for t, period in self.periods: self.buyer_val_tensor[b, t, lb] = self.buyer_val_dict[block][ buyer, period ] self.buyer_size_tensor = np.zeros((len(self.buyers), len(self.periods), len(self.blocks_buyers))) for lb, block in self.blocks_buyers: for b, buyer in self.buyers: for t, period in self.periods: self.buyer_size_tensor[b, t, lb] = self.buyer_size_dict[block][ buyer, period ] self.seller_cost_tensor = np.zeros((len(self.sellers), len(self.periods), len(self.blocks_sellers))) for ls, block in self.blocks_sellers: for s, seller in self.sellers: for t, period in self.periods: self.seller_cost_tensor[s, t, ls] = self.seller_cost_dict[block][ seller, period ] self.seller_size_tensor = np.zeros((len(self.sellers), len(self.periods), len(self.blocks_sellers))) for ls, block in self.blocks_sellers: for s, seller in self.sellers: for t, period in self.periods: self.seller_size_tensor[s, t, ls] = self.seller_size_dict[block][ seller, period ] self.no_load_cost_matrix = np.zeros((len(self.sellers), len(self.periods))) for s, seller in self.sellers: for t, period in self.periods: self.no_load_cost_matrix[s, t] = self.no_load_cost[seller, period] # Buyer Parameters self.min_real_dem_tensor = np.zeros((len(self.buyers), len(self.periods))) for b, buyer in self.buyers: for t, period in self.periods: self.min_real_dem_tensor[b, t] = self.min_real_dem[buyer, period] self.max_real_dem_tensor = np.zeros((len(self.buyers), len(self.periods))) for b, buyer in self.buyers: for t, period in self.periods: self.max_real_dem_tensor[b, t] = self.max_real_dem[buyer, period] self.min_reactive_dem_tensor = np.zeros((len(self.buyers), len(self.periods))) for b, buyer in self.buyers: for t, period in self.periods: self.min_reactive_dem_tensor[b, t] = self.min_reactive_dem[buyer, period] self.max_reactive_dem_tensor = np.zeros((len(self.buyers), len(self.periods))) for b, buyer in self.buyers: for t, period in self.periods: self.max_reactive_dem_tensor[b, t] = self.max_reactive_dem[ buyer, period ] # Seller Parameters self.min_real_prod_tensor = np.zeros((len(self.sellers), len(self.periods))) for s, seller in self.sellers: for t, period in self.periods: self.min_real_prod_tensor[s, t] = self.min_real_prod[seller, period] self.max_real_prod_tensor = np.zeros((len(self.sellers), len(self.periods))) for s, seller in self.sellers: for t, period in self.periods: self.max_real_prod_tensor[s, t] = self.max_real_prod[seller, period] self.min_reactive_prod_tensor = np.zeros((len(self.sellers), len(self.periods))) for s, seller in self.sellers: for t, period in self.periods: self.min_reactive_prod_tensor[s, t] = self.min_reactive_prod[ seller, period ] self.max_reactive_prod_tensor = np.zeros((len(self.sellers), len(self.periods))) for s, seller in self.sellers: for t, period in self.periods: self.max_reactive_prod_tensor[s, t] = self.max_reactive_prod[ seller, period ]
[docs] def initialize_model(self, configuration): """ Create the MOSEK model and declare common decision variables. Includes buyer and seller block-dispatch variables, commitment/start-up variables, branch active/reactive flows, nodal imbalance slacks, and current-limit violation variables. Commitment is relaxed or binary based on `configuration.relaxation`. """ self.model = Model() self.problem_status = None self.solution_status = None self.p_bt = self.model.variable("p_bt", [len(self.buyers), len(self.periods)], Domain.greaterThan(0.0)) self.p_btl = self.model.variable("p_btl", [len(self.buyers), len(self.periods), len(self.blocks_buyers)], Domain.greaterThan(0.0)) self.q_bt = self.model.variable("q_bt", [len(self.buyers), len(self.periods)]) self.p_st = self.model.variable("p_st", [len(self.sellers), len(self.periods)], Domain.greaterThan(0.0)) self.p_stl = self.model.variable("p_stl", [len(self.sellers), len(self.periods), len(self.blocks_sellers)], Domain.greaterThan(0.0)) self.q_st = self.model.variable("q_st", [len(self.sellers), len(self.periods)]) if configuration is not None and configuration.relaxation: self.u_st = self.model.variable("u_st", [len(self.sellers), len(self.periods)], Domain.greaterThan(0.0)) else: self.u_st = self.model.variable("u_st", [len(self.sellers), len(self.periods)], Domain.binary()) self.phi_st = self.model.variable("phi_st", [len(self.sellers), len(self.periods)], Domain.greaterThan(0.0)) self.p_vwt = self.model.variable("p_vwt", [len(self.nodes), len(self.nodes), len(self.periods)]) self.q_vwt = self.model.variable("q_vwt", [len(self.nodes), len(self.nodes), len(self.periods)]) self.p_imb = self.model.variable("p_imb", [len(self.nodes), len(self.periods)], Domain.greaterThan(0.0)) self.q_imb = self.model.variable("q_imb", [len(self.nodes), len(self.periods)], Domain.greaterThan(0.0)) self.I_viol = self.model.variable("I_viol", [len(self.nodes), len(self.nodes), len(self.periods)], Domain.greaterThan(0.0))
[docs] def initialize_network_arrays(self, enforce_sparse=False): """ Build electrical parameter arrays from the scenario network. Parameters ---------- enforce_sparse: If `True`, store matrices as MOSEK sparse `Matrix`; otherwise NumPy arrays are used. Generates `V_min`, `V_max`, conductance `G`, susceptance `B`, branch ratings `F_max`, and derived apparent limits `S_max`. """ self.V_min = np.array([self.network.nodes[v].get("V_min", 1.) for _, v in self.nodes]) self.V_max = np.array([self.network.nodes[v].get("V_max", 1.) for _, v in self.nodes]) G = np.zeros((len(self.nodes), len(self.nodes))) B = np.zeros((len(self.nodes), len(self.nodes))) F_max = np.zeros((len(self.nodes), len(self.nodes))) for i_v, v in self.nodes: for i_w, w in self.neighbours[v]: G[i_v, i_w] = self.network[v][w].get("G", 0.) B[i_v, i_w] = self.network[v][w].get("B", 0.) F_max[i_v, i_w] = self.network[v][w].get("F_max", 0.) S_max = abs(F_max * self.V_max[:, None]) if enforce_sparse: self.G = Matrix.sparse(G) self.B = Matrix.sparse(B) self.F_max = Matrix.sparse(F_max) self.S_max = Matrix.sparse(S_max) else: self.G = G self.B = B self.F_max = F_max self.S_max = S_max
[docs] def set_tolerances(self, p_vwt_line_tol=5e-4, q_vwt_line_tol=5e-4, I_viol_weight=3e-1, p_imb_weight=3e-1, q_imb_weight=3e-1): """ Configure tolerance and penalty coefficients used across constraints. Parameters control line-equation residual bands and scaling of slack variables for current-limit and nodal-balance violations. """ self.p_vwt_line_tol = p_vwt_line_tol self.q_vwt_line_tol = q_vwt_line_tol self.I_viol_weight = I_viol_weight self.p_imb_weight = p_imb_weight self.q_imb_weight = q_imb_weight
def __init__( self, scenario: Scenario, configuration: Configuration, tolerances: Optional[dict[str, float]] = None, ): """ Construct a nodal relaxation model from a scenario. Parameters ---------- scenario: Input market/network scenario. configuration: Solver configuration used for variable domains and runtime settings. tolerances: Optional overrides for line/balance tolerance and penalty weights. Initialization order is: scenario parsing -> parameter tensors -> variable creation -> network arrays -> tolerance setup. """ # PowerFlowModel has no __init__; avoid passing args to object.__init__ super().__init__() # Keep references for downstream use self.scenario = scenario self.configuration = configuration self.read_scenario(scenario) self.initialize_parameters() self.initialize_model(configuration) self.initialize_network_arrays() self.tolerances = tolerances self.set_tolerances(**(tolerances or {}))
[docs] def get_thermal_limit_objective(self, welfare_scale = 1): """ Return normalized penalty term for current-limit violations. The term is scaled by `welfare_scale` and averaged over the size of `I_viol` to keep magnitudes comparable across instances. """ violation = welfare_scale / (np.prod(self.I_viol.shape)) * Expr.sum(self.I_viol) return violation
[docs] def get_imbalance_objective(self, welfare_scale = 1): """ Return normalized penalty term for nodal real/reactive imbalance slack. Uses aggregated `p_imb` and `q_imb` variables with scaling by `welfare_scale` and tensor sizes. """ imbalance = welfare_scale / (np.prod(self.p_imb.shape)) * Expr.sum(self.p_imb) + welfare_scale / (np.prod(self.q_imb.shape)) * Expr.sum(self.q_imb) return imbalance
[docs] def get_objective( self, zonal_allocation: Optional[SellersAllocation] = None, min_vol: Optional[bool] = False, ) -> tuple: """ Build the objective expression and objective sense. If `zonal_allocation` is not provided, returns a welfare-maximization objective with imbalance and thermal-violation penalties. If `zonal_allocation` is provided, returns a deviation-minimization objective: either minimum-volume deviation (`min_vol=True`) or cost-weighted deviation plus commitment mismatch (`min_vol=False`). """ if isinstance(zonal_allocation, SellersAllocation): if len(self.blocks_sellers) > 0: diff_stl = self.model.variable("diff_y_stl", [ len(self.sellers), len(self.periods), len(self.blocks_sellers) ], Domain.greater_than(0.0), ) u_diff_st = self.model.variable("diff_u_st", [ len(self.sellers), len(self.periods) ], Domain.greater_than(0.0), ) for s, seller in self.sellers: for t, period in self.periods: for ls, block in self.blocks_sellers: self.model.constraint( zonal_allocation.p_stl[seller, period, block] - self.p_stl[s, t, ls] <= diff_stl[s, t, ls] ) self.model.constraint( self.p_stl[s, t, ls] - zonal_allocation.p_stl[seller, period, block] <= diff_stl[s, t, ls] ) if min_vol: welfare_scale = np.sum(np.abs(self.seller_size_tensor)) # type: ignore return (ObjectiveSense.Minimize, Expr.sum(diff_stl) + self.get_imbalance_objective(welfare_scale) + self.get_thermal_limit_objective(welfare_scale) ) else: for s, seller in self.sellers: for t, period in self.periods: self.model.constraint( zonal_allocation.u_st[seller, period] - self.u_st[s, t] <= u_diff_st[s, t] ) self.model.constraint( self.u_st[s, t] - zonal_allocation.u_st[seller, period] <= u_diff_st[s, t] ) welfare_scale = ( np.sum(np.abs(self.seller_cost_tensor)) # type: ignore + np.sum(np.abs(self.no_load_cost_matrix)) # type: ignore ) return (ObjectiveSense.Minimize, Expr.sum(Expr.vstack([ Expr.mulElm(self.seller_cost_tensor[:, t, :], Expr.reshape(diff_stl[:, t, :], [len(self.sellers), len(self.blocks_sellers)])) for t, _ in self.periods])) + Expr.sum(Expr.mulElm(self.no_load_cost_matrix, u_diff_st)) + self.get_imbalance_objective(welfare_scale) + self.get_thermal_limit_objective(welfare_scale) ) else: raise ValueError( "Zonal allocation is not compatible with the current scenario. blocks_sellers can't be empty!" ) else: welfare_scale = ( np.sum(np.abs(self.buyer_val_tensor)) # type: ignore + np.sum(np.abs(self.seller_cost_tensor)) # type: ignore + np.sum(np.abs(self.no_load_cost_matrix)) # type: ignore ) return (ObjectiveSense.Maximize, Expr.sum(Expr.vstack([ Expr.mulElm(self.buyer_val_tensor[:, t, :], Expr.reshape(self.p_btl[:, t, :], [len(self.buyers), len(self.blocks_buyers)])) for t, _ in self.periods])) - Expr.sum(Expr.vstack([ Expr.mulElm(self.seller_cost_tensor[:, t, :], Expr.reshape(self.p_stl[:, t, :], [len(self.sellers), len(self.blocks_sellers)])) for t, _ in self.periods])) - Expr.sum(Expr.mulElm(self.no_load_cost_matrix, self.u_st)) - self.get_imbalance_objective(welfare_scale) - self.get_thermal_limit_objective(welfare_scale) )
[docs] def bid_constraints(self, u_fixed: Optional[dict] = None): """ Add buyer/seller bid, capacity, and unit-commitment constraints. Enforces block limits, aggregate dispatch identities, real/reactive bounds, and minimum-uptime relations. If `u_fixed` is provided, commitment values are fixed per seller and period before constraint assembly. """ if isinstance(u_fixed, dict): for t, period in self.periods: for s, seller in self.sellers: if (seller, period) in u_fixed: self.model.constraint(self.u_st[s, t] == u_fixed[seller, period]) for t, _ in self.periods: for b, _ in self.buyers: for lb, _ in self.blocks_buyers: self.model.constraint(self.p_btl[b, t, lb] <= self.buyer_size_tensor[b, t, lb]) self.model.constraint(self.p_bt[b, t] == Expr.sum(self.p_btl[b, t, :])) self.model.constraint(self.p_bt[b, t] >= self.min_real_dem_tensor[b, t]) self.model.constraint(self.p_bt[b, t] <= self.max_real_dem_tensor[b, t]) self.model.constraint(self.q_bt[b, t] >= self.min_reactive_dem_tensor[b, t]) self.model.constraint(self.q_bt[b, t] <= self.max_reactive_dem_tensor[b, t]) for s, _ in self.sellers: for ls, _ in self.blocks_sellers: self.model.constraint(self.p_stl[s, t, ls] <= self.seller_size_tensor[s, t, ls] * self.u_st[s, t]) self.model.constraint(self.p_st[s, t] == Expr.sum(self.p_stl[s, t, :])) self.model.constraint(self.p_st[s, t] >= (self.min_real_prod_tensor[s, t] * self.u_st[s, t])) self.model.constraint(self.p_st[s, t] <= (self.max_real_prod_tensor[s, t] * self.u_st[s, t])) self.model.constraint(self.q_st[s, t] >= (self.min_reactive_prod_tensor[s, t] * self.u_st[s, t])) self.model.constraint(self.q_st[s, t] <= (self.max_reactive_prod_tensor[s, t] * self.u_st[s, t])) if len(self.periods) > 1: # Uptime constraints (hard to vectorize) for s, seller in self.sellers: for t, period in self.periods: if t > 0: self.model.constraint(self.phi_st[s, t] - self.u_st[s, t] + self.u_st[s, t - 1] >= 0) if t >= self.min_uptime[seller, period]: phi_sum = 0 for i in range(t - self.min_uptime[seller, period], t): phi_sum += self.phi_st[s, i] self.model.constraint( phi_sum - self.u_st[s, t] <= 0 ) self.model.constraint(self.u_st == Domain.inRange(0, 1))
[docs] def current_rating_constraints(self): """ Impose conic apparent-power/current rating limits on each branch. For each directed edge and period, constrains `(p, q)` flow magnitudes via a quadratic cone, with multiplicative slack through `I_viol`. """ for t, _ in self.periods: for i_v, v in self.nodes: for i_w, w in self.neighbours[v]: self.model.constraint( Expr.vstack(self.V_min[i_v] * self.F_max[i_v, i_w] * (1 + self.I_viol[i_v, i_w, t] * self.I_viol_weight), self.p_vwt[i_v, i_w, t], self.q_vwt[i_v, i_w, t]) == Domain.inQCone() )
[docs] @abstractmethod def power_constraints(self): """ Add power flow constraints to the model. These constraints typically represent the power flow equations in the network. :return: set of power flow constraints """ raise NotImplementedError("This method should be implemented in subclasses.")
[docs] @abstractmethod def reference_constraints(self): """ Add reference constraints to the model. These constraints typically set the voltage or power flow at a reference node to a specific value. To access the reference node use self.scenario.r_star :return: set of reference constraints """ raise NotImplementedError("This method should be implemented in subclasses.")
[docs] def bus_constraints(self): """ Add nodal real/reactive balance constraints with imbalance slack. For each node and period, enforces supply - demand - outgoing flow = 0 within weighted slack variables `p_imb` and `q_imb`. """ for t, _ in self.periods: for i_v, v in self.nodes: neighbours = [(i_v, i_w, t) for i_w, _ in self.neighbours[v]] sellers = [(s, t) for s in self.node_sellers[v]] buyers = [(b, t) for b in self.node_buyers[v]] # === Real Power Balance === seller_real = Expr.sum(self.p_st[sellers]) if sellers else 0 buyer_real = Expr.sum(self.p_bt[buyers]) if buyers else 0 flow_out_real = Expr.sum(self.p_vwt[neighbours]) if neighbours else 0 self.model.constraint((seller_real - buyer_real - flow_out_real) <= self.p_imb[i_v, t] * self.p_imb_weight) self.model.constraint((seller_real - buyer_real - flow_out_real) >= - self.p_imb[i_v, t] * self.p_imb_weight) # === Reactive Power Balance === seller_reactive = Expr.sum(self.q_st[sellers]) if sellers else 0 buyer_reactive = Expr.sum(self.q_bt[buyers]) if buyers else 0 flow_out_reactive = Expr.sum(self.q_vwt[neighbours]) if neighbours else 0 self.model.constraint((seller_reactive - buyer_reactive - flow_out_reactive) <= self.q_imb[i_v, t] * self.q_imb_weight) self.model.constraint((seller_reactive - buyer_reactive - flow_out_reactive) >= - self.q_imb[i_v, t] * self.q_imb_weight)
[docs] def get_p_bt_values(self) -> dict: """ Get the values of the p_bt variable. :return: Dictionary with (buyer, period) as keys and p_bt values as values. """ value = self.p_bt.level().reshape((len(self.buyers), len(self.periods))) return { (buyer, period): value[b, t] for b, buyer in self.buyers for t, period in self.periods if value is not None }
[docs] def get_p_btl_values(self) -> dict: """ Get the values of the p_btl variable. :return: Dictionary with (buyer, period, block) as keys and p_btl values as values. """ value = self.p_btl.level().reshape((len(self.buyers), len(self.periods), len(self.blocks_buyers))) return { (buyer, period, block): value[b, t, lb] for b, buyer in self.buyers for t, period in self.periods for lb, block in self.blocks_buyers if value is not None }
[docs] def get_q_bt_values(self) -> dict: """ Get the values of the q_bt variable. :return: Dictionary with (buyer, period) as keys and q_bt values as values. """ value = self.q_bt.level().reshape((len(self.buyers), len(self.periods))) return { (buyer, period): value[b, t] for b, buyer in self.buyers for t, period in self.periods if value is not None }
[docs] def get_p_st_values(self) -> dict: """ Get the values of the p_st variable. :return: Dictionary with (seller, period) as keys and p_st values as values. """ value = self.p_st.level().reshape((len(self.sellers), len(self.periods))) return { (seller, period): value[s, t] for s, seller in self.sellers for t, period in self.periods if value is not None }
[docs] def get_p_stl_values(self) -> dict: """ Get the values of the p_stl variable. :return: Dictionary with (seller, period, block) as keys and p_stl values as values. """ value = self.p_stl.level().reshape((len(self.sellers), len(self.periods), len(self.blocks_sellers))) return { (seller, period, block): value[s, t, ls] for s, seller in self.sellers for t, period in self.periods for ls, block in self.blocks_sellers if value is not None }
[docs] def get_q_st_values(self) -> dict: """ Get the values of the q_st variable. :return: Dictionary with (seller, period) as keys and q_st values as values. """ value = self.q_st.level().reshape((len(self.sellers), len(self.periods))) return { (seller, period): value[s, t] for s, seller in self.sellers for t, period in self.periods if value is not None }
[docs] def get_u_st_values(self, binary=True) -> dict: """ Get the values of the u_st variable. :return: Dictionary with (seller, period) as keys and u_st values as values. """ value = self.u_st.level().reshape((len(self.sellers), len(self.periods))) if binary: return { (seller, period): np.round(value[s, t]) # type: ignore for s, seller in self.sellers for t, period in self.periods if value is not None } else: return { (seller, period): value[s, t] for s, seller in self.sellers for t, period in self.periods if value is not None }
[docs] def get_phi_st_values(self) -> dict: """ Get the values of the phi_st variable. :return: Dictionary with (seller, period) as keys and phi_st values as values. """ value = self.phi_st.level().reshape((len(self.sellers), len(self.periods))) return { (seller, period): value[s, t] for s, seller in self.sellers for t, period in self.periods if value is not None }
[docs] def get_p_vwt_values(self) -> dict: """ Get the values of the p_vwt variable. :return: Dictionary with (node, neighbor, period) as keys and f_vwt values as values. """ value = self.p_vwt.level().reshape((len(self.nodes), len(self.nodes), len(self.periods))) return { (v, w, period): value[i_v, i_w, t] for i_v, v in self.nodes for i_w, w in self.neighbours[v] for t, period in self.periods if value is not None }
[docs] def get_q_vwt_values(self) -> dict: """ Get the values of the q_vwt variable. :return: Dictionary with (node, neighbor, period) as keys and q_vwt values as values. """ value = self.q_vwt.level().reshape((len(self.nodes), len(self.nodes), len(self.periods))) return { (v, w, period): value[i_v, i_w, t] for i_v, v in self.nodes for i_w, w in self.neighbours[v] for t, period in self.periods if value is not None }
[docs] @abstractmethod def get_V_vt_values(self) -> dict[tuple[int, int], tuple[float, float]]: """ Get the values of the V_vt variable. :return: Dictionary with (node, period) as keys and V_vt values as values. """ raise NotImplementedError( "The get_V_vt_values method should be implemented in subclasses to return the voltage values." )
[docs] def collect_constraints(self, u_fixed: Optional[dict] = None, verbose=False): """ Assemble the full model constraint set in canonical order. Parameters ---------- u_fixed: Optional commitment fixes passed to :meth:`bid_constraints`. verbose: If `True`, prints progress while adding constraint groups. """ if verbose: print("Collecting power constraints...") self.power_constraints() if verbose: print("Collecting reference constraints...") self.reference_constraints() if verbose: print("Collecting bus constraints...") self.bus_constraints() if verbose: print("Collecting bid constraints...") self.bid_constraints(u_fixed)
[docs] def solve( self, results_file: Optional[str] = None, stats_file: Optional[str] = None, u_fixed: Optional[dict] = None, min_vol: Optional[bool] = False, zonal_allocation: Optional[SellersAllocation] = None, verbose: bool = False, force_integrality: bool = True, **kwargs ) -> Allocation | Error: """ Solve the relaxation and return an allocation-like result object. Parameters ---------- results_file: Optional CSV path for raw variable dumps (or failure status). stats_file: Optional path for computed run statistics. u_fixed: Optional fixed commitments to enforce during solve. min_vol: Objective mode flag for zonal-tracking formulations. zonal_allocation: Optional target allocation for redispatch-style tracking objective. verbose: Enable MOSEK solver logs and build-step prints. force_integrality: If relaxed commitments produce fractional values, rerun once with rounded binary commitments fixed. **kwargs: Reserved for compatibility. Returns ------- Allocation | Error Allocation-like object on success; lightweight error object otherwise. """ self.collect_constraints(u_fixed, verbose) # Solve the optimization problem self.model.objective(*self.get_objective(zonal_allocation, min_vol)) if verbose: self.model.setLogHandler(sys.stdout) try: self.model.solve() except SolutionError as e: return Error(str(e)) self.problem_status = self.model.getProblemStatus() self.solution_status = self.model.getPrimalSolutionStatus() self.solve_time = self.model.getSolverDoubleInfo("optimizerTime") # Check if the optimization was successful if self.solution_status in [SolutionStatus.Optimal, SolutionStatus.Feasible]: if self.solution_status == SolutionStatus.Feasible: print("The solution is feasible but not optimal.") u_st_values = self.get_u_st_values(binary=False) if force_integrality and any(not (np.isclose(val, 0, atol=1e-5) or np.isclose(val, 1, atol=1e-5)) for val in u_st_values.values()): print("Solving again with fixed u_st values to ensure integrality.") u_binary = self.get_u_st_values(binary=True) # Reinitialize the model to reset all constraints self.__init__(self.scenario, self.configuration, tolerances=self.tolerances) self.collect_constraints(u_fixed=u_binary, verbose=verbose) self.model.objective(*self.get_objective(zonal_allocation, min_vol)) try: self.model.solve() except SolutionError as e: return Error(str(e)) self.problem_status = self.model.getProblemStatus() self.solution_status = self.model.getPrimalSolutionStatus() if self.solution_status not in [SolutionStatus.Optimal, SolutionStatus.Feasible]: return Error("The solution is not optimal or feasible after forcing integrality: " + str(self.problem_status)) allocation = self.get_allocation() if stats_file: compute_stats( stats_file, self.scenario, self.configuration, allocation, SimpleNamespace(**allocation.stats) ) if results_file: results = [ {"variable": var._ModelVariable__name, "value": var.level(), "size": var.getSize()} for var in [self.p_bt, self.p_btl, self.q_bt, self.p_st, self.p_stl, self.q_st, self.p_vwt, self.q_vwt, self.u_st, self.phi_st] ] df = pd.DataFrame(results, columns=["variable", "value", "size"]) df.to_csv(results_file, index=False) return allocation else: if results_file: status_message = { ProblemStatus.PrimalInfeasible: "Model is primal infeasible", ProblemStatus.DualInfeasible: "Model is dual infeasible", ProblemStatus.PrimalAndDualInfeasible: "Model is primal and dual infeasible", ProblemStatus.IllPosed: "Model is ill-posed", ProblemStatus.PrimalInfeasibleOrUnbounded: "Model is primal infeasible or unbounded", ProblemStatus.Unknown: "Solver encountered an error", }.get(self.problem_status, "Optimization failed with unknown status") error_data = [ {"status": self.problem_status, "message": status_message} ] df = pd.DataFrame(error_data, columns=["status", "message"]) df.to_csv(results_file, index=False) print(f"{self} allocation error with code {self.problem_status}. {self.solution_status}") return Error(str(self.problem_status))
[docs] def get_allocation(self): """ Build a lightweight allocation object from the solved model values. Returns a `RelaxAllocation` object with buyer, seller, and network allocations, summary stats, helper methods for welfare and feasibility checks, and reconstructed voltage components `V_vt`. Raises `ValueError` if solve status is missing or the solution is not feasible/optimal. """ if self.model is not None and self.problem_status is not None and self.solution_status is not None: if self.solution_status not in [SolutionStatus.Feasible, SolutionStatus.Optimal]: raise ValueError( f"The solution is not optimal or feasible: {self.solution_status}" ) self.model.dataReport() stats = { "welfare": self.model.primalObjValue(), "runtime": self.solve_time if self.solve_time is not None else self.model.getSolverDoubleInfo("optimizerTime"), "MIP_gap": self.model.getSolverDoubleInfo("mioObjRelGap"), } # Lightweight allocation object compatible with run_relaxations expectations class SimpleBuyersAllocation: def __init__(self, p_bt, p_btl, df_buyers, blocks_buyers): self.x_bt = p_bt self.x_btl = p_btl self.df_buyers = df_buyers self.blocks_buyers = blocks_buyers class SimpleSellersAllocation: def __init__(self, p_st, p_stl, u_st, phi_st, df_sellers): self.y_st = p_st self.y_stl = p_stl self.u_st = u_st self.phi_st = phi_st self.df_sellers = df_sellers class SimpleTransmissionNetworkAllocation: def __init__(self, f_vwt, q_vwt, network, periods): self.f_vwt = f_vwt self.q_vwt = q_vwt self.network = network # periods may be a list of ints or list of (idx, value) self.periods = [t if not isinstance(t, tuple) else t[1] for t in periods] class RelaxAllocation: def __init__(self, buyers_alloc, sellers_alloc, network_alloc, stats, scenario, V_vt): self.BuyersAllocation = buyers_alloc self.SellersAllocation = sellers_alloc self.TransmissionNetworkAllocation = network_alloc self.stats = stats self.scenario = scenario self.V_vt = V_vt def compute_welfare(self): return self.stats["welfare"] def compute_feasibility_violations(self, violations=None, print_summary=False): requested = set(violations or ["line", "line_current_limit"]) flow_mismatch = [] thermal_excess = [] for (v, w, period), p_vw in self.TransmissionNetworkAllocation.f_vwt.items(): q_vw = self.TransmissionNetworkAllocation.q_vwt.get((v, w, period), 0.0) # Thermal limit check based on per-branch apparent flow magnitude. line_mag = float(np.hypot(p_vw, q_vw)) v_min = self.scenario.network.nodes[v].get("V_min", 1.0) f_max = self.scenario.network[v][w].get("F_max", 0.0) thermal_limit = abs(v_min * f_max) thermal_excess.append(max(0.0, line_mag - thermal_limit)) # Flow equation mismatch using recovered complex voltages. if (v, period) in self.V_vt and (w, period) in self.V_vt: Vd_v, Vq_v = self.V_vt[(v, period)] Vd_w, Vq_w = self.V_vt[(w, period)] g_vw = self.scenario.network[v][w].get("G", 0.0) b_vw = self.scenario.network[v][w].get("B", 0.0) vv_sq = Vd_v ** 2 + Vq_v ** 2 re_vvw = Vd_v * Vd_w + Vq_v * Vq_w im_vvw = Vq_v * Vd_w - Vd_v * Vq_w p_calc = g_vw * (vv_sq - re_vvw) + b_vw * im_vvw q_calc = -b_vw * (vv_sq - re_vvw) + g_vw * im_vvw flow_mismatch.append(float(np.hypot(p_vw - p_calc, q_vw - q_calc))) result = {} if "line" in requested: result["line (A)"] = float(np.mean(flow_mismatch)) if flow_mismatch else np.nan if "line_current_limit" in requested or not violations: result["line_current_limit (A)"] = float(np.mean(thermal_excess)) if thermal_excess else np.nan return result p_bt = self.get_p_bt_values() p_st = self.get_p_st_values() p_btl = self.get_p_btl_values() p_stl = self.get_p_stl_values() q_bt = self.get_q_bt_values() # kept for completeness q_st = self.get_q_st_values() p_vwt = self.get_p_vwt_values() _ = self.get_q_vwt_values() # computed but not used in RelaxAllocation V_vt = self.get_V_vt_values() u_st = self.get_u_st_values() phi_st = self.get_phi_st_values() buyers_alloc = SimpleBuyersAllocation(p_bt, p_btl, self.scenario.df_buyers, self.scenario.blocks_buyers) sellers_alloc = SimpleSellersAllocation(p_st, p_stl, u_st, phi_st, self.scenario.df_sellers) network_alloc = SimpleTransmissionNetworkAllocation(p_vwt, self.get_q_vwt_values(), self.scenario.network, self.scenario.periods) return RelaxAllocation(buyers_alloc, sellers_alloc, network_alloc, stats, self.scenario, V_vt) else: raise ValueError( "No allocation has been computed yet. Please call the solve() method first." )
@abstractmethod def __str__(self) -> str: raise NotImplementedError( "The __str__ method should be implemented in subclasses to provide a meaningful string representation." )