Source code for apem.unit_based_model.allocation.algorithms.zonal_clearing

from itertools import combinations
import os.path
from typing import Optional, Tuple, Union

import networkx as nx
import pandas as pd

from apem.unit_based_model.allocation.algorithms.nodal_clearing.dcopf import DCOPF
from apem.unit_based_model.allocation.allocation import Allocation
from apem.unit_based_model.solver_configuration import SolverConfiguration
from apem.unit_based_model.error import Error
from apem.unit_based_model.allocation.power_flow_model import PowerFlowModel
from apem.unit_based_model.allocation.algorithms.zonal_clearing.zonal_configuration import node_zone_mapper
from apem.unit_based_model.data.parsing.scenario import Scenario


[docs] class Zonal_NTC_multiedge(PowerFlowModel): """ Variant of the zonal NTC model that preserves individual cross-zonal lines (no aggregation). Each interzonal line in the nodal network becomes its own edge in a zonal ``MultiGraph`` so parallel connections are modeled independently. Works only with PyPSA data. """ def __init__(self, zonal_configuration: str = 'zonal_DE3', factor: float = 0.8): self.zonal_configuration = zonal_configuration self.factor = factor
[docs] def create_zonal_scenario_NTC(self, base_scenario: Scenario, results_root: Optional[str] = None) -> Scenario: """ Convert a nodal scenario into a zonal one while keeping every cross-zonal line as a separate edge. """ # Work on copies to avoid mutating the input nodal scenario in place. df_sellers = base_scenario.df_sellers.copy() df_buyers = base_scenario.df_buyers.copy() node_to_zone, zones = {}, {} for node in base_scenario.network.nodes: if node not in base_scenario.nodes_agents: continue # skip nodes without coordinate information lat = base_scenario.nodes_agents[node]["latitude"] lon = base_scenario.nodes_agents[node]["longitude"] # map node to zone based on latitude and longitude coordinates zone = node_zone_mapper(self.zonal_configuration, lat=lat, lon=lon) node_to_zone[node] = zone # ensure each zone is represented by a single node within that zone zones.setdefault(zone, node) # the sellers and buyers at the current node are assigned to the zone df_sellers.loc[df_sellers['node'] == node, 'node'] = zone df_buyers.loc[df_buyers['node'] == node, 'node'] = zone if not zones: raise ValueError( f"{self}: no nodes could be mapped to zones for zonal_configuration={self.zonal_configuration}. " "Ensure nodes_agents includes latitude/longitude for network nodes." ) # save node_to_zone assignment as .csv file (include factor for consistency with result paths) factor_str = f"_f{self.factor}" if self.factor is not None else "" if results_root: results_path = results_root else: results_path = ( f"results/unit_based_model/{base_scenario.name}_results/Zonal_NTC_multiedge/" f"{self.zonal_configuration}{factor_str}" ) os.makedirs(results_path, exist_ok=True) node_to_zone_df = pd.DataFrame(list(node_to_zone.items()), columns=['node', 'zone']) node_to_zone_df.to_csv(os.path.join(results_path, "node_to_zone.csv"), index=False) # create network that keeps every cross-zonal line as an individual edge aggregated_network = nx.MultiGraph() for z in zones: aggregated_network.add_node(z) is_multi = base_scenario.network.is_multigraph() edge_iter = base_scenario.network.edges(keys=True, data=True) if is_multi else base_scenario.network.edges(data=True) for edge in edge_iter: if is_multi: v, w, _k, data = edge else: v, w, data = edge if v not in node_to_zone or w not in node_to_zone: continue zone_v = node_to_zone[v] zone_w = node_to_zone[w] # keep only inter-zonal lines; intra-zonal lines vanish in zonal model if zone_v == zone_w: continue aggregated_network.add_edge( zone_v, zone_w, B=data['B'], F_max=data['F_max'] * self.factor ) # if only a single zone exists: create network without edges if len(aggregated_network.nodes) == 1: aggregated_network = nx.Graph() aggregated_network.add_node(next(iter(zones.keys()))) r_star = list(aggregated_network.nodes)[0] # for each zone, we store its sellers and buyers nodes_agents = {} for z in zones: nodes_agents[z] = {} nodes_agents[z]['sellers'] = df_sellers[df_sellers['node'] == z]['seller'].unique().tolist() nodes_agents[z]['buyers'] = df_buyers[df_buyers['node'] == z]['buyer'].unique().tolist() # keep representative coordinates for plotting rep_node = zones[z] rep_coords = base_scenario.nodes_agents.get(rep_node, {}) if rep_coords: nodes_agents[z]['latitude'] = rep_coords.get('latitude') nodes_agents[z]['longitude'] = rep_coords.get('longitude') return Scenario(f'{base_scenario.name}', df_buyers, df_sellers, aggregated_network, nodes_agents, base_scenario.periods, base_scenario.blocks_buyers, base_scenario.blocks_sellers, r_star)
def solve(self, scenario: Scenario, configuration: SolverConfiguration, results_file: Optional[str] = None, stats_file: Optional[str] = None, u_fixed: Optional[dict] = None) \ -> Tuple[Scenario, Union[Allocation, Error]]: # create a zonal NTC scenario with explicit lines zone_results_root = os.path.dirname(os.path.dirname(results_file)) if results_file else None if zone_results_root: zonal_scenario = self.create_zonal_scenario_NTC(base_scenario=scenario, results_root=zone_results_root) else: zonal_scenario = self.create_zonal_scenario_NTC(base_scenario=scenario) # solve a DCOPF problem for the constructed zonal network dcopf = DCOPF() return zonal_scenario, dcopf.solve(zonal_scenario, configuration, results_file, stats_file) def __str__(self): return 'Zonal_NTC_multiedge'