from shapely.geometry import Point, LineString
import geopandas as gpd
import networkx as nx
import os
import pandas as pd
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
from apem.unit_based_model.utils.paths import RAW_DATA_DIR
[docs]
class Scenario:
"""
Container for all inputs that define one unit-based market scenario.
A scenario bundles bid tables, network topology, node metadata, and time/block
structure into a single object passed through allocation, pricing, and
evaluation workflows.
The object is intentionally lightweight and mutable so parsing steps and
algorithms can derive adjusted views (for example, relaxed or scaled variants)
from the same base scenario.
Expected table structure:
- ``df_buyers`` typically includes at least ``buyer``, ``period``, ``node``,
``max_dem`` and one or more valuation block columns (for example ``val1``,
``val2``, ...).
- ``df_sellers`` typically includes at least ``seller``, ``period``, ``node``,
``max_prod`` and one or more production cost block columns.
- ``network`` edges are expected to carry electrical attributes used by
clearing/flow models (for example susceptance and capacity limits).
:param name: human-readable scenario identifier used in logs and output paths
:param df_buyers: buyer-side bids or demand blocks by period and node
:param df_sellers: seller-side offers or generation blocks by period and node
:param network: transmission graph used by nodal/zonal clearing models
:param nodes_agents: node metadata and node-to-agent mapping; entries are
keyed by node and commonly include ``buyers``,
``sellers``, ``latitude`` and ``longitude``
:param periods: ordered list of market periods represented in the scenario
:param blocks_buyers: buyer block index range used by multi-block demand bids
:param blocks_sellers: seller block index range used by multi-block supply bids
:param r_star: identifier of the reference node used by DC-style formulations
"""
def __init__(self, name: str, df_buyers: pd.DataFrame, df_sellers: pd.DataFrame, network: nx.Graph,
nodes_agents: dict, periods: list, blocks_buyers: range, blocks_sellers: range,
r_star: str):
self.name = name
self.df_buyers = df_buyers
self.df_sellers = df_sellers
self.network = network
self.nodes_agents = nodes_agents
self.periods = periods
self.blocks_buyers = blocks_buyers
self.blocks_sellers = blocks_sellers
self.r_star = r_star
def __str__(self):
return self.name
[docs]
def analyse_scenario(self, results_root: str = "") -> None:
"""
Compute and persist descriptive scenario statistics.
The method writes aggregate counts and demand/supply totals (overall and
per period) to ``<results_root>/<scenario>_base_scenario.txt``. If
seller carrier information is available, renewable shares are included.
:param results_root: output directory; if empty, a default scenario result
folder under ``./results/unit_based_model`` is used
:return: ``None``
"""
count_sellers = len(self.df_sellers['seller'].unique())
count_buyers = len(self.df_buyers['buyer'].unique())
count_nodes = len(self.network.nodes)
nodes_without_agents = [
node for node, data in self.nodes_agents.items()
if not data['buyers'] and not data['sellers']
]
count_nodes_without_agents = len(nodes_without_agents)
count_lines = len(self.network.edges)
demand = self.df_buyers['max_dem'].sum()
supply = self.df_sellers['max_prod'].sum()
if 'carrier' in self.df_sellers.columns:
energy_carriers = self.df_sellers['carrier'].unique().tolist()
res_carriers = ['onwind', 'solar', 'offwind-ac', 'offwind-dc']
res_sellers = self.df_sellers[self.df_sellers['carrier'].isin(res_carriers)]
res_proportion = round(len(res_sellers) / len(self.df_sellers), 2)
res_supply_proportion = round(res_sellers['max_prod'].sum() / supply, 2)
# Define and create results directory, if not exists
results_directory = results_root or f"./results/unit_based_model/{self.name}_results"
os.makedirs(results_directory, exist_ok=True)
# Write contents to scenario.txt file
output_file = os.path.join(results_directory, f"{self.name}_base_scenario.txt")
f = open(output_file, 'w+')
f.write(f'Sellers: {count_sellers}\n')
f.write(f'Buyers: {count_buyers}\n')
f.write(f'Nodes: {count_nodes}\n')
f.write(f'Nodes without agents: {count_nodes_without_agents}\n')
f.write(f'Transmission lines: {count_lines}\n')
f.write(f'Periods: {len(self.periods)}\n')
if 'carrier' in self.df_sellers.columns:
f.write(f'Energy carriers: {energy_carriers}\n')
f.write(f'RES generator proportion in energy mix: {res_proportion}\n')
f.write(f'RES supply proportion in energy mix: {res_supply_proportion}\n')
f.write('\n')
f.write(f'Demand: {demand}\n\n')
for t in self.periods:
demand_t = self.df_buyers[self.df_buyers['period'] == t]['max_dem'].sum()
f.write(f'Demand period {t}: {demand_t}\n')
f.write('\n')
f.write(f'Available supply: {supply}\n\n')
for t in self.periods:
supply_t = self.df_sellers[self.df_sellers['period'] == t]['max_prod'].sum()
f.write(f'Available supply period {t}: {supply_t}\n')
f.close()
[docs]
def plot_network(self, power_flow_model, zonal_config: str = "", results_root: str = "") -> None:
"""
Plot the scenario network and optionally color nodes by zone.
The figure is stored in the scenario results directory and uses node
coordinates from ``nodes_agents`` together with line endpoints from
``network``.
:param power_flow_model: selected power-flow model name, used in output
directory routing
:param zonal_config: optional zonal configuration identifier; when
provided, nodes are colored by ``node_to_zone.csv``
:param results_root: base output directory; if empty, a default scenario
result folder under ``./results/unit_based_model``
is used
:return: ``None``
"""
# Define power flow model and create results directory, if not exists
base_dir = results_root or os.path.join("results", "unit_based_model", f"{self.name}_results")
results_directory = os.path.join(base_dir, str(power_flow_model))
os.makedirs(results_directory, exist_ok=True)
zone_results_directory = os.path.join(results_directory, zonal_config) if zonal_config else results_directory
# Get nodes and edges from the network
nodes = list(self.network.nodes)
edges = self.network.edges(data=True)
CRS = "EPSG:4326"
# Create GeoDataFrame for nodes with longitude & latitude
geo_df = gpd.GeoDataFrame(
{
"node": nodes,
"latitude": [self.nodes_agents[node]["latitude"] for node in nodes],
"longitude": [self.nodes_agents[node]["longitude"] for node in nodes],
},
crs=CRS,
geometry=[
Point(self.nodes_agents[node]["longitude"], self.nodes_agents[node]["latitude"])
for node in nodes
]
)
geo_df.index = geo_df["node"].astype(str)
# Assign colors based on zonal configuration
geo_df["color"] = "black" # default color
if zonal_config:
# Load csv file with node-to-zone mapping, tolerate suffix variants
os.makedirs(zone_results_directory, exist_ok=True)
node_to_zone_path = os.path.join(zone_results_directory, "node_to_zone.csv")
if not os.path.exists(node_to_zone_path):
base_config = zonal_config.rsplit("_", 1)[0] if "_" in zonal_config else zonal_config
alt_dir = os.path.join(results_directory, base_config)
alt_path = os.path.join(alt_dir, "node_to_zone.csv")
if os.path.exists(alt_path):
node_to_zone_path = alt_path
if not os.path.exists(node_to_zone_path):
print(f"plot_network: node_to_zone.csv not found for {zonal_config}, skipping plot.")
return
df_zones = pd.read_csv(node_to_zone_path, dtype={"node": str, "zone": str})
# Merge geo_df with df_zones on the 'node'
geo_df = geo_df.merge(df_zones[["node", "zone"]], left_index=True, right_on="node", how="inner")
# Create colormap for zones
unique_zones = sorted(geo_df["zone"].dropna().unique())
ACER_BZR_colors = ['dodgerblue', 'coral', 'gold', 'darkviolet', 'green']
# Determine color mapping based on the number of zones
if len(unique_zones) <= 5:
zone_to_color_mapping = {zone: color for zone, color in zip(unique_zones, ACER_BZR_colors)}
else:
cmap = plt.get_cmap("plasma", len(unique_zones) - 5)
additional_colors = [cmap(i) for i in range(len(unique_zones) - 5)]
zone_to_color_mapping = {zone: color for zone, color in zip(unique_zones[:5], ACER_BZR_colors)}
zone_to_color_mapping.update({zone: color for zone, color in zip(unique_zones[5:], additional_colors)})
# Assign colors to buses based on zones
geo_df["color"] = geo_df["zone"].map(zone_to_color_mapping)
# Create GeoDataFrame for edges
line_df = gpd.GeoDataFrame(
{
"from_node": [u for u, v, data in edges],
"to_node": [v for u, v, data in edges],
"B": [data["B"] for u, v, data in edges],
"F_max": [data["F_max"] for u, v, data in edges]
},
crs=CRS,
geometry=[
LineString([
Point(self.nodes_agents[u]["longitude"], self.nodes_agents[u]["latitude"]),
Point(self.nodes_agents[v]["longitude"], self.nodes_agents[v]["latitude"])
])
for u, v, data in edges
]
)
# Clear previous plot and create figure
plt.clf()
fig, ax = plt.subplots(figsize=(15, 15))
# Load and plot GADM map of Germany
map_germany = gpd.read_file(RAW_DATA_DIR / "gadm41_DEU_shp" / "gadm41_DEU_1.shp")
map_germany.plot(ax=ax, color="lightgray", alpha=0.5)
map_germany.boundary.plot(ax=ax, color="lightgray", linewidth=1.0) # show state borders
# Plot power lines
line_df.plot(ax=ax, color="gray", linewidth=1.0, alpha=0.7)
# Plot bus nodes with longitude & latitude
geo_df.plot(ax=ax, markersize=100, marker="o", color=geo_df["color"])
if zonal_config:
# Add legend with zone colors to the plot
sorted_zones = sorted(unique_zones, key=str)
legend_handles = [mpatches.Patch(color=zone_to_color_mapping[zone], label=f"Zone {zone}") for zone in
sorted_zones]
ax.legend(handles=legend_handles, loc="upper left", title=f"Zones in {zonal_config}")
# Save figure
file_name = f"{self.name}_{zonal_config}" if zonal_config else self.name
plt.savefig(f"{results_directory}/{file_name}_network.png", bbox_inches='tight', dpi=300)
plt.close(fig)