"""
This implementation is based on: https://ieeexplore.ieee.org/abstract/document/9221922
"""
import pypsa
import pandas as pd
import numpy as np
import gurobipy as gp
from gurobipy import GRB
import logging
from apem.unit_based_model.solver_configuration import SolverConfiguration
# High penalty for non-served energy, consistent with the paper's approach
C_NSE = 10000
# A high penalty cost for deviating from the zonal schedule in Redispatch R1
C_DEV = 99999
# Helper functions
[docs]
def get_zone_maps(network: pypsa.Network, node_zone_mapper: callable, zonal_configuration: str):
"""
Creates mappings from nodes to zones and zones to nodes using the provided mapper function.
:param network: PyPSA network whose buses are assigned to zones.
:param node_zone_mapper: Function mapping ``(zonal_configuration, latitude, longitude)`` to a zone id.
:param zonal_configuration: Name of the zonal configuration to apply.
:return: Tuple ``(node_to_zone, zone_to_nodes)`` with a bus-to-zone series and reverse mapping.
"""
node_to_zone = pd.Series(
{bus: node_zone_mapper(zonal_configuration, network.buses.at[bus, 'y'], network.buses.at[bus, 'x'])
for bus in network.buses.index},
name="zone"
).dropna().astype(int)
zone_to_nodes = node_to_zone.groupby(node_to_zone).apply(lambda group: group.index.tolist()).to_dict()
return node_to_zone, zone_to_nodes
[docs]
def calculate_gsk(network: pypsa.Network, node_to_zone: pd.Series, zone_to_nodes: dict) -> pd.Series:
"""
Calculate generation shift keys from installed generator capacities.
:param network: PyPSA network containing generator capacities and bus assignments.
:param node_to_zone: Mapping from buses to zones.
:param zone_to_nodes: Mapping from each zone to the buses it contains.
:return: Series indexed by bus with one shift-key value per bus.
"""
gen_p_nom = network.generators.groupby('bus')['p_nom'].sum()
bus_p_nom = gen_p_nom.reindex(network.buses.index, fill_value=0)
gsk = pd.Series(index=network.buses.index, dtype=float)
for zone_id, nodes_in_zone in zone_to_nodes.items():
zone_total_p_nom = bus_p_nom.loc[nodes_in_zone].sum()
if zone_total_p_nom > 1e-6:
gsk.loc[nodes_in_zone] = bus_p_nom.loc[nodes_in_zone] / zone_total_p_nom
else: # Handle zones with no generation capacity by distributing evenly
num_nodes = len(nodes_in_zone)
if num_nodes > 0:
gsk.loc[nodes_in_zone] = 1.0 / num_nodes
else:
gsk.loc[nodes_in_zone] = 0.0
return gsk
[docs]
def calculate_zonal_ptdf(nodal_ptdf: pd.DataFrame, gsk: pd.Series, node_to_zone: pd.Series) -> pd.DataFrame:
"""
Aggregate a nodal PTDF matrix to the zonal level.
:param nodal_ptdf: Nodal PTDF matrix with lines as rows and buses as columns.
:param gsk: Generation shift keys used for zonal aggregation.
:param node_to_zone: Mapping from buses to zones.
:return: Zonal PTDF matrix with lines as rows and zones as columns.
"""
return nodal_ptdf.mul(gsk, axis='columns').T.groupby(node_to_zone).sum().T
[docs]
def select_fb_lines(zonal_ptdf: pd.DataFrame, network: pypsa.Network, node_to_zone: pd.Series,
threshold: float = 0.05) -> list:
"""
Select the transmission lines included in the flow-based domain.
The returned set contains all interzonal lines plus intrazonal lines whose zonal PTDF spread
exceeds ``threshold``.
:param zonal_ptdf: Zonal PTDF matrix.
:param network: PyPSA network containing the transmission lines.
:param node_to_zone: Mapping from buses to zones.
:param threshold: Minimum PTDF spread required to keep an intrazonal line.
:return: Sorted list of selected line identifiers.
"""
interzonal_lines = {
line for line, data in network.lines.iterrows()
if node_to_zone.get(data.bus0) != node_to_zone.get(data.bus1)
}
ptdf_impact = zonal_ptdf.max(axis=1) - zonal_ptdf.min(axis=1)
sensitive_intrazonal_lines = set(ptdf_impact[ptdf_impact > threshold].index)
return sorted(list(interzonal_lines | sensitive_intrazonal_lines))
[docs]
def aggregate_by_zone(df: pd.DataFrame, node_to_zone: pd.Series) -> pd.DataFrame:
"""
Aggregate a nodal time series table to the zonal level.
:param df: DataFrame indexed by time with buses as columns.
:param node_to_zone: Mapping from buses to zones.
:return: DataFrame indexed by time with zones as columns.
"""
return df.T.groupby(node_to_zone).sum().T
class BaseCaseGenerator:
def __init__(self, network: pypsa.Network, ptdf: pd.DataFrame,
node_zone_mapper: callable, zonal_configuration: str):
"""
Initialize the base-case generator used by the zonal FBMC workflow.
:param network: PyPSA network representing the nodal system.
:param ptdf: Nodal PTDF matrix used by the base-case models.
:param node_zone_mapper: Function mapping buses to zones from coordinates.
:param zonal_configuration: Name of the zonal configuration to apply.
"""
self.network = network
self.ptdf = ptdf
self.node_to_zone, self.zone_to_nodes = get_zone_maps(network, node_zone_mapper, zonal_configuration)
self.snapshots = network.snapshots
self.buses = network.buses.index
self.lines = network.lines.index
self.generators = network.generators.index
def _create_base_nodal_model(self, network_instance: pypsa.Network):
"""Helper to create the nodal unit commitment model for base case generation."""
model = gp.Model('BaseCaseNodal')
model.setParam('LogToConsole', 0)
# Extract parameters from the (potentially modified) network instance
gen_buses = network_instance.generators.bus
gen_costs = network_instance.generators.marginal_cost
startup_costs = network_instance.generators.start_up_cost
p_min_pu = network_instance.generators.p_min_pu
p_max_pu_t = network_instance.generators_t.p_max_pu
p_nom = network_instance.generators.p_nom
bus_demand = network_instance.loads_t.p_set.T.groupby(network_instance.loads.bus).sum().T
bus_demand = bus_demand.reindex(columns=self.buses, fill_value=0)
# Decision Variables (as per Nodal Model, Eqs. 2-6)
p_gen = model.addVars(self.generators, self.snapshots, name="p_gen", lb=0)
u = model.addVars(self.generators, self.snapshots, vtype=GRB.BINARY, name="u")
startup = model.addVars(self.generators, self.snapshots, vtype=GRB.BINARY, name="startup")
shutdown = model.addVars(self.generators, self.snapshots, vtype=GRB.BINARY, name="shutdown")
p_bus = model.addVars(self.buses, self.snapshots, lb=-GRB.INFINITY, name="p_bus")
flow = model.addVars(self.lines, self.snapshots, lb=-GRB.INFINITY, name="flow")
nse = model.addVars(self.buses, self.snapshots, name="nse", lb=0)
# Eq. (2) - Objective Function
objective = (gp.quicksum(p_gen[g, t] * gen_costs[g] for g, t in p_gen) +
gp.quicksum(startup[g, t] * startup_costs[g] for g, t in startup) +
gp.quicksum(nse[b, t] * C_NSE for b, t in nse))
model.setObjective(objective, GRB.MINIMIZE)
# --- Base Constraints (Eqs. 1, 3, 4, 5, 6) ---
# Eq. (3) - Nodal power balance
model.addConstrs((p_bus[b, t] == gp.quicksum(p_gen[g, t] for g in gen_buses[gen_buses == b].index) -
bus_demand.at[t, b] + nse[b, t]
for b in self.buses for t in self.snapshots), name="power_balance")
# Eq. (4) - System-wide power balance
model.addConstrs((gp.quicksum(p_bus[b, t] for b in self.buses) == 0 for t in self.snapshots),
name="system_balance")
# Eq. (6) - DC power flow calculation
model.addConstrs((flow[l, t] == gp.quicksum(self.ptdf.at[l, b] * p_bus[b, t] for b in self.buses)
for l in self.lines for t in self.snapshots), name="dc_flow")
# Eq. (5) - Line flow limits
model.addConstrs((flow[l, t] <= network_instance.lines.at[l, 's_nom'] for l, t in flow), name="flow_upper")
model.addConstrs((flow[l, t] >= -network_instance.lines.at[l, 's_nom'] for l, t in flow), name="flow_lower")
# Eq. (1) - Generator constraints (set P)
for g in self.generators:
for t_idx, t in enumerate(self.snapshots):
model.addConstr(p_gen[g, t] >= p_min_pu[g] * p_nom[g] * u[g, t])
model.addConstr(p_gen[g, t] <= p_max_pu_t.at[t, g] * p_nom[g] * u[g, t])
if t_idx > 0:
model.addConstr(u[g, t] - u[g, self.snapshots[t_idx - 1]] == startup[g, t] - shutdown[g, t])
else:
model.addConstr(u[g, t] == startup[g, t]); model.addConstr(shutdown[g, t] == 0)
model.addConstr(startup[g, t] + shutdown[g, t] <= 1)
model.addConstrs((nse[b, t] <= bus_demand.at[t, b] for b, t in nse), name="nse_limit")
return model, p_bus
def generate(self, base_case_type: str):
"""
Generate expected nodal net positions ``p_bus_expected`` used by Zonal FBMC.
Base-case variants:
- ``BC1``: Solve the standard nodal UC/DCOPF model and use its nodal injections directly.
- ``BC2``: Same nodal model as BC1, plus zero zonal net-position constraints for every zone and snapshot.
- ``BC3.1``: Same as BC1, but with all loads scaled by +20 percent before solving.
- ``BC3.2``: Same as BC1, but with random load perturbations in [-20 percent, +20 percent].
This variant is stochastic unless the caller sets a NumPy random seed beforehand.
- ``BC4``: Two-step approach:
1) Relax intrazonal line capacities (x10), solve nodal model, and derive zonal reference net positions.
2) Re-solve the original nodal model while fixing each zonal net position to that reference.
:param base_case_type: Identifier of the base-case construction to use.
:return: DataFrame of expected nodal injections by snapshot and bus, or ``None`` if the solve fails.
"""
if base_case_type == 'BC1': # Same as nodal solution
model, p_bus = self._create_base_nodal_model(self.network)
elif base_case_type == 'BC2': # Eq. (9)
model, p_bus = self._create_base_nodal_model(self.network)
for t in self.snapshots:
for zone_id, nodes in self.zone_to_nodes.items():
model.addConstr(gp.quicksum(p_bus[b, t] for b in nodes) == 0,
name=f"zonal_net_pos_zero_{zone_id}_{t}")
elif base_case_type == 'BC3.1': # Eq. (10)
net_mod = self.network.copy()
net_mod.loads_t.p_set *= 1.2
model, p_bus = self._create_base_nodal_model(net_mod)
elif base_case_type == 'BC3.2': # Eq. (11)
net_mod = self.network.copy()
perturbation = 1 + (np.random.rand(*net_mod.loads_t.p_set.shape) * 0.4 - 0.2)
net_mod.loads_t.p_set *= perturbation
model, p_bus = self._create_base_nodal_model(net_mod)
elif base_case_type == 'BC4': # Eq. (12)
net_relaxed = self.network.copy()
for line_name, line_data in net_relaxed.lines.iterrows():
if self.node_to_zone.get(line_data.bus0) == self.node_to_zone.get(line_data.bus1):
net_relaxed.lines.at[line_name, 's_nom'] *= 10
relaxed_model, relaxed_p_bus = self._create_base_nodal_model(net_relaxed)
relaxed_model.optimize()
if relaxed_model.Status != GRB.OPTIMAL: raise Exception("BC4 Step 1 (relaxed) failed.")
p_bus_relaxed = pd.DataFrame({b: {t: relaxed_p_bus[b, t].X for t in self.snapshots} for b in self.buses})
p_tz_ref = aggregate_by_zone(p_bus_relaxed, self.node_to_zone)
model, p_bus = self._create_base_nodal_model(self.network)
for t in self.snapshots:
for zone_id, nodes in self.zone_to_nodes.items():
model.addConstr(gp.quicksum(p_bus[b, t] for b in nodes) == p_tz_ref.at[t, zone_id],
name=f"zonal_net_pos_fixed_{zone_id}_{t}")
else:
raise ValueError(f"Unknown base_case_type: {base_case_type}")
model.optimize()
if model.Status == GRB.OPTIMAL:
return pd.DataFrame({b: {t: p_bus[b, t].X for t in self.snapshots} for b in self.buses})
else:
logging.error(f"Base Case Generation for {base_case_type} FAILED. IIS written to file.");
model.computeIIS();
model.write(f"base_case_{base_case_type}_iis.ilp")
return None
class ZonalDispatchModel:
def solve(self, network: pypsa.Network, nodal_ptdf: pd.DataFrame,
p_bus_expected: pd.DataFrame, node_zone_mapper: callable,
zonal_configuration: str, verbose: bool = True, configuration: SolverConfiguration = None):
"""
Formulate and solve the zonal FBMC dispatch model.
:param network: PyPSA network of the original nodal system.
:param nodal_ptdf: Nodal PTDF matrix used to derive zonal constraints.
:param p_bus_expected: Expected nodal injections from the selected base case.
:param node_zone_mapper: Function mapping buses to zones from coordinates.
:param zonal_configuration: Name of the zonal configuration to apply.
:param verbose: Whether Gurobi logging should be shown.
:param configuration: Optional optimizer configuration wrapper applied to the model.
:return: Dictionary containing dispatch results, FBMC network data, duals, and raw model output,
or ``None`` if the optimization fails.
"""
if verbose:
logging.info("--- Starting ZonalDispatchModel solve ---")
model = gp.Model('Zonal_Dispatch_FBMC')
if configuration:
configuration.apply_to_model(model)
# --- 1. Zonal Pre-calculations ---
node_to_zone, zone_to_nodes = get_zone_maps(network, node_zone_mapper, zonal_configuration)
zones = sorted(zone_to_nodes.keys())
buses_in_zones = node_to_zone.index
nodal_ptdf = nodal_ptdf.loc[:, buses_in_zones]
p_bus_expected = p_bus_expected.loc[:, buses_in_zones]
gsk = calculate_gsk(network, node_to_zone, zone_to_nodes)
zonal_ptdf = calculate_zonal_ptdf(nodal_ptdf, gsk, node_to_zone)
L_z = select_fb_lines(zonal_ptdf, network, node_to_zone, threshold=0.05)
bus_demand = network.loads_t.p_set.T.groupby(network.loads.bus).sum().T.reindex(columns=network.buses.index,
fill_value=0)
zonal_demand = aggregate_by_zone(bus_demand, node_to_zone)
p_tz_expected = aggregate_by_zone(p_bus_expected, node_to_zone)
flow_expected_nodal = nodal_ptdf.dot(p_bus_expected.T).T
flow_expected_zonal = zonal_ptdf.dot(p_tz_expected.T).T
delta_F = flow_expected_nodal - flow_expected_zonal # Eq. (16) correction term
# --- 2. Define Model Variables & Objective ---
snapshots, generators = network.snapshots, network.generators.index
p_gen = model.addVars(generators, snapshots, name="p_gen", lb=0)
u = model.addVars(generators, snapshots, vtype=GRB.BINARY, name="u")
startup = model.addVars(generators, snapshots, vtype=GRB.BINARY, name="startup")
shutdown = model.addVars(generators, snapshots, vtype=GRB.BINARY, name="shutdown")
p_tz = model.addVars(zones, snapshots, lb=-GRB.INFINITY, name="p_tz")
nse_tz = model.addVars(zones, snapshots, name="nse_tz", lb=0)
flow_zonal = model.addVars(L_z, snapshots, lb=-GRB.INFINITY, name="flow_zonal")
# Objective Function - Eq. (13)
gen_costs, startup_c = network.generators.marginal_cost, network.generators.start_up_cost
objective = (gp.quicksum(p_gen[g, t] * gen_costs[g] for g, t in p_gen) +
gp.quicksum(startup[g, t] * startup_c[g] for g, t in startup) +
gp.quicksum(nse_tz[z, t] * C_NSE for z, t in nse_tz))
model.setObjective(objective, GRB.MINIMIZE)
# --- 3. Define Model Constraints ---
# Generator constraints (similar to Eq. 1)
gen_to_zone = network.generators.bus.map(node_to_zone).dropna()
p_min_pu, p_nom = network.generators.p_min_pu, network.generators.p_nom
p_max_pu_t = network.generators_t.p_max_pu
for g in generators:
for t_idx, t in enumerate(snapshots):
model.addConstr(p_gen[g, t] >= p_min_pu[g] * p_nom[g] * u[g, t])
model.addConstr(p_gen[g, t] <= p_max_pu_t.at[t, g] * p_nom[g] * u[g, t]) # CORRECTED
if t_idx > 0:
model.addConstr(u[g, t] - u[g, snapshots[t_idx - 1]] == startup[g, t] - shutdown[g, t])
else:
model.addConstr(u[g, t] == startup[g, t]); model.addConstr(shutdown[g, t] == 0)
# Zonal Power Balance - Eq. (14)
for t in snapshots:
for z in zones:
gens_in_zone = gen_to_zone[gen_to_zone == z].index
model.addConstr(
gp.quicksum(p_gen[g, t] for g in gens_in_zone) - p_tz[z, t] + nse_tz[z, t] == zonal_demand.at[t, z],
name=f"zonal_balance_{z}_{t}")
# System-wide zonal balance - Eq. (15)
model.addConstrs((gp.quicksum(p_tz[z, t] for z in zones) == 0 for t in snapshots), name="system_zonal_balance")
# Zonal flow calculation & limits - Eq. (16), (17), (18)
model.addConstrs(
(flow_zonal[l, t] == gp.quicksum(zonal_ptdf.at[l, z] * p_tz[z, t] for z in zones) for l, t in flow_zonal),
name="zonal_flow_calc")
model.addConstrs((flow_zonal[l, t] <= network.lines.at[l, 's_nom'] - delta_F.at[t, l] for l, t in flow_zonal),
name="zonal_flow_upper")
model.addConstrs((flow_zonal[l, t] >= -network.lines.at[l, 's_nom'] - delta_F.at[t, l] for l, t in flow_zonal),
name="zonal_flow_lower")
model.addConstrs((nse_tz[z, t] <= zonal_demand.at[t, z] for z, t in nse_tz), name="nse_limit_zonal")
# --- 4. Solve and Prepare Results ---
model.optimize()
if model.Status == GRB.OPTIMAL:
all_vars_for_saving = [{"variable": v.VarName, "value": v.X} for v in model.getVars()]
p_tz_df = pd.DataFrame({z: {t: p_tz[z, t].X for t in snapshots} for z in zones})
flow_zonal_component_df = zonal_ptdf.dot(p_tz_df.T).T
final_line_flow_df = (flow_zonal_component_df + delta_F).rename(columns=str)
# Fix integer variables and re-solve as LP to get duals
for v in model.getVars():
if v.VType in (GRB.BINARY, GRB.INTEGER): v.LB = v.X; v.UB = v.X
relaxed = model.relax();
relaxed.setParam('LogToConsole', 0);
relaxed.optimize()
return {
"objective_value": model.ObjVal,
"p_gen": pd.DataFrame({g: {t: p_gen[g, t].X for t in snapshots} for g in generators}),
"p_tz": p_tz_df,
"u": pd.DataFrame({g: {t: u[g, t].X for t in snapshots} for g in generators}),
"nse_tz": pd.DataFrame({z: {t: nse_tz[z, t].X for t in snapshots} for z in zones}),
"final_line_flow": final_line_flow_df,
"fbmc_network_data": {
"constraint_ids": [str(line_id) for line_id in L_z],
"zonal_ptdf": zonal_ptdf.loc[L_z, zones].rename(index=str, columns=str),
"capacity_upper": pd.DataFrame(
{
str(line_id): {
t: network.lines.at[line_id, 's_nom']
for t in snapshots
}
for line_id in L_z
}
),
"capacity_lower": pd.DataFrame(
{
str(line_id): {
t: -network.lines.at[line_id, 's_nom']
for t in snapshots
}
for line_id in L_z
}
),
},
"model": model,
"duals": {"zonal_price": pd.DataFrame(
{z: {t: relaxed.getConstrByName(f"zonal_balance_{z}_{t}").Pi for t in snapshots} for z in zones})},
"all_vars": all_vars_for_saving
}
else:
logging.error(f"Zonal MILP failed. IIS written to zonal_model_iis.ilp")
model.computeIIS();
model.write("zonal_model_iis.ilp")
return None
class RedispatchModel:
def solve(self, network: pypsa.Network, ptdf: pd.DataFrame,
zonal_results: dict, node_zone_mapper: callable,
zonal_configuration: str, method: str, verbose: bool = True):
"""
Solve the nodal redispatch problem after zonal FBMC clearing.
:param network: PyPSA network of the nodal system.
:param ptdf: Nodal PTDF matrix used for redispatch flow constraints.
:param zonal_results: Result dictionary returned by ``ZonalDispatchModel.solve``.
:param node_zone_mapper: Function mapping buses to zones from coordinates.
:param zonal_configuration: Name of the zonal configuration to apply.
:param method: Redispatch formulation identifier, either ``R1`` or ``R2``.
:param verbose: Whether Gurobi logging should be shown.
:return: Dictionary summarizing redispatch status, costs, and dispatch outputs.
"""
if verbose:
logging.info(f"--- Starting RedispatchModel solve (Method: {method}) ---")
if method not in ['R1', 'R2']: raise ValueError("Method must be 'R1' or 'R2'")
model = gp.Model(f'Redispatch_{method}');
model.setParam('LogToConsole', 1 if verbose else 0)
# --- 1. Extract Sets, Parameters, and Schedules ---
node_to_zone, _ = get_zone_maps(network, node_zone_mapper, zonal_configuration)
snapshots, buses, lines, generators = network.snapshots, network.buses.index, network.lines.index, network.generators.index
p_gen_zonal, u_zonal = zonal_results['p_gen'], zonal_results['u']
# --- 2. Define Decision Variables ---
p_gen = model.addVars(generators, snapshots, name="p_gen_rd", lb=0)
u = model.addVars(generators, snapshots, vtype=GRB.BINARY, name="u_rd")
startup = model.addVars(generators, snapshots, vtype=GRB.BINARY, name="startup_rd")
shutdown = model.addVars(generators, snapshots, vtype=GRB.BINARY, name="shutdown_rd")
p_bus = model.addVars(buses, snapshots, lb=-GRB.INFINITY, name="p_bus_rd")
nse = model.addVars(buses, snapshots, name="nse_rd", lb=0)
gen_costs, startup_costs = network.generators.marginal_cost, network.generators.start_up_cost
# --- 3. Set Objective Function based on Method ---
if method == 'R1':
# Eq. (19) - Minimize total operating cost + deviation penalty
delta_tz = model.addVars(zonal_results['p_tz'].columns, snapshots, lb=0, name="delta_tz")
op_costs = (gp.quicksum(p_gen[g, t] * gen_costs[g] for g, t in p_gen) +
gp.quicksum(startup[g, t] * startup_costs[g] for g, t in startup) +
gp.quicksum(nse[b, t] * C_NSE for b, t in nse))
deviation_penalty = gp.quicksum(delta_tz[z, t] * C_DEV for z, t in delta_tz)
model.setObjective(op_costs + deviation_penalty, GRB.MINIMIZE)
elif method == 'R2':
# Eq. (20) - Minimize redispatch compensation costs
p_up = model.addVars(generators, snapshots, name="p_up", lb=0)
p_down = model.addVars(generators, snapshots, name="p_down", lb=0)
model.addConstrs((p_gen[g, t] - p_gen_zonal.at[t, g] == p_up[g, t] - p_down[g, t] for g, t in p_gen),
name="redispatch_delta_definition")
zonal_prices = zonal_results['duals']['zonal_price']
gen_zones = network.generators.bus.map(node_to_zone)
gen_prices = pd.Series({g: zonal_prices.at[snapshots[0], gen_zones[g]] for g in generators})
cost_up_reg = gp.quicksum(p_up[g, t] * gen_costs[g] for g, t in p_up)
profit_margin = (gen_prices - gen_costs).clip(lower=0)
cost_down_reg = gp.quicksum(p_down[g, t] * profit_margin[g] for g, t in p_down)
cost_new_startup = gp.quicksum(
startup[g, t] * startup_costs[g] for g, t in startup if u_zonal.at[t, g] < 0.5)
cost_nse = gp.quicksum(nse[b, t] * C_NSE for b, t in nse)
model.setObjective(cost_up_reg + cost_down_reg + cost_new_startup + cost_nse, GRB.MINIMIZE)
# --- 4. Add Nodal Redispatch Constraints ---
bus_demand = network.loads_t.p_set.T.groupby(network.loads.bus).sum().T.reindex(columns=buses, fill_value=0)
# Eq. (23) -> Nodal Power Balance
model.addConstrs((p_bus[b, t] == gp.quicksum(
p_gen[g, t] for g in network.generators.index[network.generators.bus == b]) - bus_demand.at[t, b] + nse[
b, t]
for b in buses for t in snapshots), name="redispatch_balance")
# Eq. (4) -> System-wide balance
model.addConstrs((gp.quicksum(p_bus[b, t] for b in buses) == 0 for t in snapshots),
name="redispatch_sys_balance")
# Eq. (25) & (6) -> Nodal flow calculation and limits
model.addConstrs(
(gp.quicksum(ptdf.at[l, b] * p_bus[b, t] for b in ptdf.columns) <= network.lines.at[l, 's_nom'] for l in
lines for t in snapshots), name="redispatch_flow_upper")
model.addConstrs(
(gp.quicksum(ptdf.at[l, b] * p_bus[b, t] for b in ptdf.columns) >= -network.lines.at[l, 's_nom'] for l in
lines for t in snapshots), name="redispatch_flow_lower")
# Part of set P in Eq. (19) -> Generator operating constraints
p_min_pu, p_nom = network.generators.p_min_pu, network.generators.p_nom
p_max_pu_t = network.generators_t.p_max_pu
for g in generators:
for t in snapshots:
model.addConstr(p_gen[g, t] >= p_min_pu[g] * p_nom[g] * u[g, t])
model.addConstr(p_gen[g, t] <= p_max_pu_t.at[t, g] * p_nom[g] * u[g, t])
u_previous = u_zonal.at[t, g]
model.addConstr(u[g, t] - u_previous == startup[g, t] - shutdown[g, t], name=f"uc_logic_{g}_{t}")
model.addConstr(startup[g, t] + shutdown[g, t] <= 1)
# Eq. (22) -> Fixed commitment for slow units
slow_carriers = ['nuclear', 'lignite', 'coal', 'oil']
slow_gens = network.generators[network.generators.carrier.isin(slow_carriers)].index
model.addConstrs((u[g, t] == u_zonal.at[t, g] for g in slow_gens for t in snapshots), name="fixed_slow_uc")
# Eq. (21) -> Deviation calculation (only for R1 method)
if method == 'R1':
for t in snapshots:
for z, nodes in get_zone_maps(network, node_zone_mapper, zonal_configuration)[1].items():
p_bus_in_zone = gp.quicksum(p_bus[b, t] for b in nodes)
model.addConstr(delta_tz[z, t] >= p_bus_in_zone - zonal_results['p_tz'].at[t, z],
name=f"dev_pos_{z}_{t}")
model.addConstr(delta_tz[z, t] >= -(p_bus_in_zone - zonal_results['p_tz'].at[t, z]),
name=f"dev_neg_{z}_{t}")
# --- 5. Solve and Prepare Results ---
model.optimize()
if model.Status == GRB.OPTIMAL:
p_gen_redispatch = pd.DataFrame({g: {t: p_gen[g, t].X for t in snapshots} for g in generators})
startup_redispatch = pd.DataFrame({g: {t: startup[g, t].X > 0.5 for t in snapshots} for g in generators})
nse_redispatch = pd.DataFrame({b: {t: nse[b, t].X for t in snapshots} for b in buses})
redispatch_cost = model.ObjVal
startup_cost_total = (startup_redispatch * startup_costs).sum().sum()
final_operating_cost = ((p_gen_redispatch * gen_costs).sum().sum() + startup_cost_total + (
nse_redispatch.sum().sum() * C_NSE))
return {
"method": method,
"status": "Optimal",
"redispatch_cost": redispatch_cost,
"final_operating_cost": final_operating_cost,
"p_gen_redispatch": p_gen_redispatch,
"nse_redispatch": nse_redispatch,
"startup_redispatch": startup_redispatch
}
else:
model.computeIIS()
model.write(f"redispatch_{method}_iis.ilp")
return {"status": "Failed", "method": method}