Source code for apem.order_book_based_model.euphemia.model.setup_model

from typing import TYPE_CHECKING

import gurobipy as gp
from gurobipy import GRB

from apem.order_book_based_model.euphemia.utils.extraction import get

if TYPE_CHECKING:
    from apem.order_book_based_model.euphemia.master_problem.master_problem import MasterProblem


[docs] def add_objective(self: "MasterProblem") -> None: """Define the surplus-maximizing objective of the master problem. The objective aggregates the contribution of every supported order family: step orders, block orders, complex-order step rows, scalable-complex step rows, and piecewise-linear orders. The final model objective is posted as a maximization of total economic surplus using the sign conventions already encoded in the parsed bid tables. :param self: Active Euphemia master-problem instance whose Gurobi model is being populated. :return: ``None``. """ # 1) step orders step_orders_obj = gp.quicksum( self.accept_step[i] * get(self.step_orders, 'q', i) * get(self.step_orders, 'p', i) for i in list(self.step_orders['id'])) # 3) block orders block_orders_obj = gp.quicksum( self.accept_block[i] * gp.quicksum(get(self.block_orders, f'q{t}', i) for t in self.periods) * get(self.block_orders, 'p', i) for i in list(self.block_orders['id'])) # 4) complex orders - consider step suborders complex_orders_obj = gp.quicksum( self.accept_complex_step[i] * get(self.complex_step_orders, 'q', i) * get(self.complex_step_orders, 'p', i) for i in list(self.complex_step_orders['id'])) # 5) scalable complex orders scalable_orders_obj = gp.quicksum( self.accept_scalable_step[i] * get(self.scalable_step_orders, 'q', i) * get(self.scalable_step_orders, 'p', i) for i in list(self.scalable_step_orders['id'])) # 6) piecewise linear orders piecewise_linear_orders_obj = gp.quicksum( self.accept_piecewise_linear[i] * get(self.piecewise_linear_orders, 'q', i) * ( get(self.piecewise_linear_orders, 'p0', i) + self.accept_piecewise_linear[i] * (get(self.piecewise_linear_orders, 'p1', i) - get(self.piecewise_linear_orders, 'p0', i)) / 2) for i in list(self.piecewise_linear_orders['id'])) self.model.setObjective( -step_orders_obj - block_orders_obj - complex_orders_obj - scalable_orders_obj - piecewise_linear_orders_obj, GRB.MAXIMIZE)
[docs] def add_market_constraints(self: "MasterProblem") -> None: """Add market-clearing and order-linking constraints to the master problem. This routine builds the core allocation structure of the Euphemia master problem. It enforces: - market balance for the active network representation - block-order acceptance logic including MAR, exclusive, linked, and flexible blocks - linkage between parent complex/scalable-complex orders and their step rows - load-gradient and MAP restrictions for complex-order variants Balance is formulated differently depending on the current network mode: - no network constraints: one global balance equation per period - ``ATC``: zonal injection balanced with directed interconnector flows - ``FBMC``: zonal net-position definitions plus global balance :param self: Active Euphemia master-problem instance whose Gurobi model is being populated. :return: ``None``. """ def order_zone(df, order_id): if "zone" not in df.columns: return self.default_zone return self.resolve_zone(get(df, "zone", order_id)) def zonal_injection(z, t): return ( gp.quicksum( self.accept_step[i] * get(self.step_orders, "q", i) for i in list(self.step_orders["id"]) if get(self.step_orders, "t", i) == t and order_zone(self.step_orders, i) == z ) + gp.quicksum( self.accept_block[i] * get(self.block_orders, f"q{t}", i) for i in list(self.block_orders["id"]) if get(self.block_orders, "block_type", i) != "flexible" and order_zone(self.block_orders, i) == z ) + gp.quicksum( self.accept_block[i] * self.flex_period[i, t] * get(self.block_orders, f"q{t}", i) for i in list(self.block_orders["id"]) if get(self.block_orders, "block_type", i) == "flexible" and order_zone(self.block_orders, i) == z ) + gp.quicksum( self.accept_complex_step[i] * get(self.complex_step_orders, "q", i) for i in list(self.complex_step_orders["id"]) if get(self.complex_step_orders, "t", i) == t and order_zone(self.complex_step_orders, i) == z ) + gp.quicksum( self.accept_scalable_step[i] * get(self.scalable_step_orders, "q", i) for i in list(self.scalable_step_orders["id"]) if get(self.scalable_step_orders, "t", i) == t and order_zone(self.scalable_step_orders, i) == z ) + gp.quicksum( self.accept_piecewise_linear[i] * get(self.piecewise_linear_orders, "q", i) for i in list(self.piecewise_linear_orders["id"]) if get(self.piecewise_linear_orders, "t", i) == t and order_zone(self.piecewise_linear_orders, i) == z ) ) if not self.network_constraints_enabled: # single-zone market clearing self.model.addConstrs( (gp.quicksum(self.accept_step[i] * get(self.step_orders, 'q', i) for i in list(self.step_orders['id']) if get(self.step_orders, 't', i) == t) + gp.quicksum(self.accept_block[i] * get(self.block_orders, f'q{t}', i) for i in list(self.block_orders['id']) if get(self.block_orders, 'block_type', i) != 'flexible') + gp.quicksum(self.accept_block[i] * self.flex_period[i, t] * get(self.block_orders, f'q{t}', i) for i in list(self.block_orders['id']) if get(self.block_orders, 'block_type', i) == 'flexible') + gp.quicksum(self.accept_complex_step[i] * get(self.complex_step_orders, 'q', i) for i in list(self.complex_step_orders['id']) if get(self.complex_step_orders, 't', i) == t) + gp.quicksum(self.accept_scalable_step[i] * get(self.scalable_step_orders, 'q', i) for i in list(self.scalable_step_orders['id']) if get(self.scalable_step_orders, 't', i) == t) + gp.quicksum(self.accept_piecewise_linear[i] * get(self.piecewise_linear_orders, 'q', i) for i in list(self.piecewise_linear_orders['id']) if get(self.piecewise_linear_orders, 't', i) == t) == 0 for t in self.periods), name='power_balance') elif self.network_model == "ATC": # zonal market clearing with directed ATC flows self.model.addConstrs( ( zonal_injection(z, t) - gp.quicksum( self.f_atc[i, j, tt] for (i, j, tt) in self.atc_index if i == z and tt == t ) + gp.quicksum( self.f_atc[i, j, tt] for (i, j, tt) in self.atc_index if j == z and tt == t ) == 0 for z in self.zones for t in self.periods ), name="zonal_power_balance", ) else: # FBMC zonal net positions linked to accepted quantities self.model.addConstrs( ( self.net_position[z, t] == zonal_injection(z, t) for z in self.zones for t in self.periods ), name="fbmc_net_position_def", ) self.model.addConstrs( (gp.quicksum(self.net_position[z, t] for z in self.zones) == 0 for t in self.periods), name="fbmc_global_balance", ) # block order acceptance # accept_block[i] = 0 or accept_block[i] >= MAR self.model.addConstrs( self.accept_block[i] >= get(self.block_orders, 'MAR', i) * self.MAR_aux[i] for i in list(self.block_orders['id'])) self.model.addConstrs(self.accept_block[i] <= self.M * self.MAR_aux[i] for i in list(self.block_orders['id'])) # exclusive groups exclusive_groups = list(self.block_orders[self.block_orders['block_type'] == 'exclusive']['code_prm']) exclusive_blocks = list(self.block_orders[self.block_orders['block_type'] == 'exclusive']['id']) self.model.addConstrs( gp.quicksum( self.MAR_aux[i] for i in exclusive_blocks if get(self.block_orders, 'code_prm', i) == eg) <= 1 for eg in exclusive_groups) # linked blocks linked_blocks = list(self.block_orders[self.block_orders['block_type'] == 'linked']['id']) block_to_parent = {i: get(self.block_orders, 'code_prm', i) for i in linked_blocks} self.model.addConstrs(self.accept_block[i] <= self.accept_block[block_to_parent[i]] for i in linked_blocks) # flexible blocks flexible_blocks = list(self.block_orders[self.block_orders['block_type'] == 'flexible']['id']) self.model.addConstrs(self.accept_block[i] == gp.quicksum(self.flex_period[i, t] for t in self.periods) for i in flexible_blocks) # complex orders complex_step_orders = list(self.complex_step_orders['id']) self.model.addConstrs( self.accept_complex_step[i] <= self.accept_complex[get(self.complex_step_orders, 'complex_order_id', i)] for i in complex_step_orders) # scalable complex orders scalable_step_orders = list(self.scalable_step_orders['id']) self.model.addConstrs( self.accept_scalable_step[i] <= self.accept_scalable[get(self.scalable_step_orders, 'scalable_order_id', i)] for i in scalable_step_orders) # load gradient condition # complex orders load_gradient_complex_ids = self.complex_orders.loc[self.complex_orders['load_gradient'].notna(), 'id'].tolist() for i in load_gradient_complex_ids: periods_orders = {} inc_dec = get(self.complex_orders, 'load_gradient', i) for t in self.periods: # for a specific complex order sum over all associated step orders from the current period orders_i_t = self.complex_step_orders.loc[ (self.complex_step_orders['complex_order_id'] == i) & ( self.complex_step_orders['t'] == t), 'id'].tolist() periods_orders[t] = gp.quicksum( self.accept_complex_step[j] * abs(get(self.complex_step_orders, 'q', j)) for j in orders_i_t) self.model.addConstrs( periods_orders[t] - periods_orders[t - 1] <= inc_dec * self.accept_complex[i] for t in self.periods[1:]) self.model.addConstrs( periods_orders[t - 1] - periods_orders[t] <= inc_dec * self.accept_complex[i] for t in self.periods[1:]) # scalable complex orders load_gradient_scalable_ids = self.scalable_complex_orders.loc[ self.scalable_complex_orders['load_gradient'].notna(), 'id'].tolist() for i in load_gradient_scalable_ids: periods_orders = {} inc_dec = get(self.scalable_complex_orders, 'load_gradient', i) for t in self.periods: orders_i_t = self.scalable_step_orders.loc[ (self.scalable_step_orders['scalable_order_id'] == i) & ( self.scalable_step_orders['t'] == t), 'id'].tolist() periods_orders[t] = gp.quicksum( self.accept_scalable_step[j] * abs(get(self.scalable_step_orders, 'q', j)) for j in orders_i_t) self.model.addConstrs( periods_orders[t] - periods_orders[t - 1] <= inc_dec * self.accept_scalable[i] for t in self.periods[1:]) self.model.addConstrs( periods_orders[t - 1] - periods_orders[t] <= inc_dec * self.accept_scalable[i] for t in self.periods[1:]) # MAP self.model.addConstrs( periods_orders[t] >= get(self.scalable_complex_orders, f'MAP{t}', i) * self.accept_scalable[i] for t in self.periods) # MAP for scalable complex orders that do not have the load gradient condition for i in self.scalable_complex_orders['id'].tolist(): if i not in load_gradient_scalable_ids: periods_orders = {} for t in self.periods: orders_i_t = self.scalable_step_orders.loc[ (self.scalable_step_orders['scalable_order_id'] == i) & ( self.scalable_step_orders['t'] == t), 'id'].tolist() periods_orders[t] = gp.quicksum( self.accept_scalable_step[i] * abs(get(self.scalable_step_orders, 'q', i)) for i in orders_i_t) self.model.addConstrs( periods_orders[t] >= get(self.scalable_complex_orders, f'MAP{t}', i) * self.accept_scalable[i] for t in self.periods)
[docs] def add_network_constraints(self: "MasterProblem") -> None: """Add network feasibility constraints for the selected network model. The function is a no-op when ``network_constraints_enabled`` is ``False``. Otherwise it complements :func:`add_market_constraints` with the physical transmission limits implied by the configured network representation: - ``ATC``: capacity bounds on directed interconnectors plus optional ramp-up and ramp-down limits between consecutive periods - ``FBMC``: PTDF-based upper RAM limits and optional lower-bound limits on zonal net positions :param self: Active Euphemia master-problem instance whose Gurobi model is being populated. :return: ``None``. """ if not self.network_constraints_enabled: return if self.network_model == "ATC": # ATC capacity limits for directed interconnectors self.model.addConstrs( (self.f_atc[i, j, t] <= self.atc_cap[(i, j, t)] for (i, j, t) in self.atc_index), name="atc_capacity", ) # Optional directional flow ramping limits if self.atc_ramp_up: for (i, j), ramp_up in self.atc_ramp_up.items(): for prev_t, t in zip(self.periods[:-1], self.periods[1:]): if (i, j, prev_t) in self.atc_cap and (i, j, t) in self.atc_cap: self.model.addConstr( self.f_atc[i, j, t] - self.f_atc[i, j, prev_t] <= ramp_up, name=f"atc_ramp_up_{i}_{j}_{t}", ) if self.atc_ramp_down: for (i, j), ramp_down in self.atc_ramp_down.items(): for prev_t, t in zip(self.periods[:-1], self.periods[1:]): if (i, j, prev_t) in self.atc_cap and (i, j, t) in self.atc_cap: self.model.addConstr( self.f_atc[i, j, prev_t] - self.f_atc[i, j, t] <= ramp_down, name=f"atc_ramp_down_{i}_{j}_{t}", ) return # FBMC PTDF * net_position <= RAM constraints self.model.addConstrs( ( gp.quicksum(self.fb_ptdf_map.get((cnec_id, t, z), 0.0) * self.net_position[z, t] for z in self.zones) <= self.fb_ram[(cnec_id, t)] for (cnec_id, t) in self.fb_index ), name="fbmc_ram", ) if self.fb_lb: self.model.addConstrs( ( gp.quicksum(self.fb_ptdf_map.get((cnec_id, t, z), 0.0) * self.net_position[z, t] for z in self.zones) >= self.fb_lb[(cnec_id, t)] for (cnec_id, t) in self.fb_index if (cnec_id, t) in self.fb_lb ), name="fbmc_lb", )