Source code for apem.order_book_based_model.euphemia.data.conversion.data_conversion

from collections import defaultdict
from typing import Tuple, List

import pandas as pd
import hashlib
import json
import numpy as np

from apem.unit_based_model.data.parsing.scenario import Scenario
from apem.unit_based_model.utils.paths import ensure_dir

from apem.order_book_based_model.euphemia.utils.paths import EUPHEMIA_ROOT


[docs] class DataConversion: """ Convert a unit-based market scenario into order-book Euphemia inputs. The source scenario uses the unit-based APEM data model with buyer-side demand tables and seller-side supply tables. This helper maps those tables into the order-book structures expected by the Euphemia workflow. """ def __init__(self, scenario: Scenario): self.df_buyers = scenario.df_buyers self.df_sellers = scenario.df_sellers self.periods = scenario.periods self.blocks_buyers = scenario.blocks_buyers self.blocks_sellers = scenario.blocks_sellers self.block_bids = None
[docs] def compute_buyers_inelastic_bids(self) -> pd.DataFrame: """ Convert price-inelastic demand into order-book block orders. One block order is created per demand entity. For each period ``t``, the block quantity equals the inelastic demand in that period. The resulting order is assigned ``MAR = 1`` and a very large bid price so that it behaves as must-serve demand in the order-book model. """ info = defaultdict(dict) df_dict_records = self.df_buyers.to_dict(orient='records') for bid in df_dict_records: info[bid['buyer']][bid['period']] = -bid['inelastic_dem'] info[bid['buyer']]['id'] = 'b' + str(bid['buyer']) df = pd.DataFrame.from_dict(info, orient='index').reset_index(drop=True) df = df.rename(columns={i: f'q{i}' for i in self.periods}) df['block_type'] = 'normal' df['code_prm'] = pd.NA df['MAR'] = 1 df['p'] = 10 ** 6 columns = ['id', 'block_type', 'code_prm', 'p'] + [f'q{i}' for i in self.periods] + ['MAR'] df = df[columns] return df
[docs] def compute_buyers_elastic_bids(self) -> pd.DataFrame: """ Convert price-elastic demand segments into order-book step orders. """ elastic_dem = [f'val{i}' for i in self.blocks_buyers] + [f'size{i}' for i in self.blocks_buyers] info = self.df_buyers[['buyer', 'period'] + elastic_dem] data = [] buyers = self.df_buyers['buyer'].unique().tolist() for b in buyers: count = 1 for t in self.periods: buyer_info = info[(info['buyer'] == b) & (info['period'] == t)] if len(buyer_info) == 0: continue for i in self.blocks_buyers: order = {'id': 'b' + str(b) + 't' + str(t) + 'l' + str(i), 't': t, 'p': buyer_info[f'val{i}'].values[0], 'q': -buyer_info[f'size{i}'].values[0] } data.append(order) count += 1 df_step_orders = pd.DataFrame(data) print(df_step_orders) return df_step_orders
[docs] def generate_zero_no_load_cost_bids(self) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Convert simple unit-based supply offers into scalable complex Euphemia orders. This routine targets supply units with zero no-load cost and no meaningful minimum-uptime restriction. Each qualifying unit is mapped to one scalable complex order plus its associated step orders. The ``MAP_t`` parameters are taken from the unit's minimum production profile, while the step orders are derived from the period-wise supply curve segments. """ sellers = self.df_sellers[(self.df_sellers['min_uptime'].isin([0, 1])) & (self.df_sellers['no_load_cost'] == 0)]['seller'].unique().tolist() scalable_orders, scalable_step_orders, step_orders = [], [], [] for s in sellers: suborders_ids = [] scalable_id = 's' + str(s) + 'MAP' for t in self.periods: id_step_min_prod = 's' + str(s) + 'min_prod_t' + f'{t}' seller_info = self.df_sellers[(self.df_sellers['seller'] == s) & (self.df_sellers['period'] == t)] # check if s submitted bids for the current period min_prod_t = seller_info['min_prod'].values[0] if not seller_info['min_prod'].empty else 0 if min_prod_t != 0: scalable_step_order_min = {'id': id_step_min_prod, 'scalable_order_id': scalable_id, 't': t, 'p': seller_info['cost1'].values[0], 'q': min_prod_t } suborders_ids.append(id_step_min_prod) scalable_step_orders.append(scalable_step_order_min) for block in self.blocks_sellers: id_step_block = 's' + str(s) + 't' + f'{t}' + 'l' + f'{block}' q = seller_info[f'size{block}'].values[0] if block == 1: q = seller_info[f'size{block}'].values[0] - min_prod_t if q == 0: continue scalable_step_order_block = {'id': id_step_block, 'scalable_order_id': scalable_id, 't': t, 'p': seller_info[f'cost{block}'].values[0], 'q': q } suborders_ids.append(id_step_block) scalable_step_orders.append(scalable_step_order_block) scalable_order = {'id': scalable_id, 'step_orders': suborders_ids, 'fixed_term': 0, 'condition': 'MIC', 'load_gradient': pd.NA, **{f'MAP{t}': self.df_sellers[ (self.df_sellers['seller'] == s) & (self.df_sellers['period'] == t)][ 'min_prod'].values[0] if not self.df_sellers[(self.df_sellers['seller'] == s) & (self.df_sellers['period'] == t)][ 'min_prod'].empty else 0 for t in self.periods} } scalable_orders.append(scalable_order) df_scalable_orders = pd.DataFrame(scalable_orders) df_scalable_step_orders = pd.DataFrame(scalable_step_orders) return df_scalable_orders, df_scalable_step_orders
[docs] def generate_positive_no_load_cost_bids(self, reduce_linked_blocks: bool) -> pd.DataFrame: """ Convert commitment-coupled unit offers into order-book block-order families. This routine targets supply units whose behavior cannot be represented by a simple scalable complex order, for example because they have a non-trivial minimum-uptime requirement or a positive no-load cost. Each such unit is translated into one exclusive parent block per commitment pattern together with linked child blocks that represent additional dispatch segments. :param reduce_linked_blocks: When ``True``, merge same-price linked child segments across active periods into one linked block per segment. :return: Block-order dataframe for the converted commitment-coupled offers. """ sellers = self.df_sellers[(self.df_sellers['min_uptime'] > 1) | (self.df_sellers['no_load_cost'] > 1)]['seller'].unique().tolist() # retrieve patterns that encode in which periods a seller is committed patterns = {} for val in range(2, 25): file_path = EUPHEMIA_ROOT / "data" / "conversion" / "patterns" / f"{val}.txt" patterns_val = [] try: with open(file_path, 'r') as file: for line in file: row = list(map(int, line.strip().split())) patterns_val.append(row) patterns[val] = patterns_val except FileNotFoundError: print(f"Error: The file '{file_path}' does not exist. Call generate_write_patterns().") print("Pattern retrieval finished") block_bids = [] for s in sellers: sellers_general_info = self.df_sellers[self.df_sellers['seller'] == s] min_uptime = sellers_general_info['min_uptime'].values[0] if sellers_general_info['min_uptime'].values[0] > 1 else 2 min_cost = sellers_general_info['cost1'].values[0] exclusive_id = f's{s}exclusive' count = 1 seen_time_commitments = set() time_commitments = [] # create time commitments for patterns for pattern in patterns[min_uptime]: current_commitment = [] # create a vector in which the value at index i is 1 if the pattern indicates that the seller is # committed in period i + 1 and 0 otherwise for value in pattern: if value != 1: current_commitment.extend([1] * value) else: current_commitment.append(0) time_commitments.append(current_commitment) # create contiguous commitment pattern for minuptime = 1 if necessary if not sellers_general_info['min_uptime'].values[0] > 1: for period in range(len(self.periods)): current_commitment = [0] * len(self.periods) current_commitment[period] = 1 time_commitments.append(current_commitment) for time_commitment in time_commitments: if sum(time_commitment) == 0: continue # Check if pattern was already added as block order tc_tuple = tuple(time_commitment) if tc_tuple in seen_time_commitments: continue # already processed this time pattern seen_time_commitments.add(tc_tuple) # Support for orders with no-load cost > 0 active_hours = sum(time_commitment) # k min_prod = sellers_general_info['min_prod'].values[0] no_load = sellers_general_info['no_load_cost'].values[0] pattern_price = min_cost # c_min,s if no_load > 0 and active_hours * min_prod > 0: pattern_price += no_load / (active_hours * min_prod) bid_p = { 'id': f's{s}pattern{count}', 'block_type': 'exclusive', 'code_prm': exclusive_id, 'p': pattern_price, **{ f'q{k}': ( self.df_sellers[ (self.df_sellers['seller'] == s) & (self.df_sellers['period'] == k) ]['min_prod'].iloc[0] if not self.df_sellers[ (self.df_sellers['seller'] == s) & (self.df_sellers['period'] == k) ]['min_prod'].empty and time_commitment[k - 1] == 1 else 0 ) for k in self.periods }, 'MAR': 1 } has_positive_q = any(value > 0 for key, value in bid_p.items() if key.startswith('q')) if has_positive_q: block_bids.append(bid_p) else: continue print(f"Exclusive blocks finished - Seller {s} - Commitment pattern {time_commitment}") self.add_linked_block_orders(time_commitment, s, count, block_bids, reduce_orders=reduce_linked_blocks) count += 1 df_block_orders = pd.DataFrame(block_bids) print(df_block_orders) return df_block_orders
[docs] def generate_patterns(self, min_uptime: int) -> List[List[int]]: """ Generate all commitment patterns for a unit with a given minimum uptime. """ T = len(self.periods) M = [1] + list(range(min_uptime, T + 1)) dp = [[] for _ in range(T + 1)] dp[0] = [[]] for i in range(1, T + 1): # dp[i]: all compositions of the segment i using lengths from M for m in M: if i >= m: for combination in dp[i - m]: dp[i].append(combination + [m]) return dp[T]
[docs] def generate_contiguous_patterns(self, min_uptime: int) -> List[List[int]]: """ Generate contiguous commitment patterns for a unit with a given minimum uptime. The returned patterns assume at most one start and one stop event across the modeled horizon. """ T = len(self.periods) patterns: List[List[int]] = [] for start_off in range(0, T - min_uptime + 1): # 1s before ON period for on_len in range(min_uptime, T - start_off + 1): # length of ON period end_off = T - start_off - on_len # 1s after ON period pattern = [1] * start_off pattern += [on_len] pattern += [1] * end_off patterns.append(pattern) return patterns
[docs] def add_linked_block_orders( self, time_commitment: list[int], s: int, count: int, block_bids: list[dict], reduce_orders: bool ) -> None: """ Add linked child blocks for one commitment pattern of one supply unit. :param time_commitment: Binary commitment profile over the modeled periods. :param s: Unit identifier from the source unit-based scenario. :param count: Running index used to build stable order identifiers. :param block_bids: Output list to which linked block rows are appended. :param reduce_orders: Whether to merge same-price linked segments across active periods. """ # --- reduce orders by putting all time periods in one linked block instead of multiple linked blocks with each one q != 0 --- if reduce_orders: for block in self.blocks_sellers: id_block = f's{s}pattern{count}l{block}' q_dict = {f'q{k}': 0 for k in self.periods} p_block = None # Fill right q for each period for this price step for t in self.periods: if time_commitment[t - 1] == 0: continue seller_info = self.df_sellers[ (self.df_sellers['seller'] == s) & (self.df_sellers['period'] == t) ] q = seller_info[f'size{block}'].values[0] if block == 1: q -= seller_info['min_prod'].values[0] if q == 0: continue q_dict[f'q{t}'] = q p_block = p_block or seller_info[f'cost{block}'].values[0] # Create only one order per price step if there is a q != 0 for at least one period if any(q_dict.values()): bid_p = { 'id': id_block, 'block_type': 'linked', 'code_prm': f's{s}pattern{count}', 'p': p_block, **q_dict, 'MAR': 0 } block_bids.append(bid_p) # --- Add one linked block order for each active period and price step --- else: for t in self.periods: if time_commitment[t - 1] == 0: continue seller_info = self.df_sellers[(self.df_sellers['seller'] == s) & (self.df_sellers['period'] == t)] for block in self.blocks_sellers: id_block_t = f's{s}pattern{count}t{t}l{block}' q = seller_info[f'size{block}'].values[0] if block == 1: q = seller_info[f'size{block}'].values[0] - seller_info['min_prod'].values[0] if q == 0: continue bid_p = {'id': id_block_t, 'block_type': 'linked', 'code_prm': f's{s}pattern{count}', 'p': seller_info[f'cost{block}'].values[0], **{f'q{k}': q if k == t else 0 for k in self.periods}, 'MAR': 0 } block_bids.append(bid_p)
[docs] def generate_write_patterns(self, use_contiguous_patterns: bool) -> None: """ Generate commitment-pattern files used for block-order conversion. :param use_contiguous_patterns: Whether to restrict the generated patterns to contiguous on/off trajectories. """ min_uptimes = range(2, 25) path = EUPHEMIA_ROOT / "data" / "conversion" / "patterns" ensure_dir(path) for min_uptime in min_uptimes: file_name = path / f'{min_uptime}.txt' patterns = self.generate_contiguous_patterns(min_uptime) if use_contiguous_patterns else self.generate_patterns(min_uptime) with open(file_name, 'w') as f: for p in patterns: row = ' '.join(map(str, p)) f.write(row + '\n')
[docs] def block_signature(self, row: pd.Series) -> str: """ Generate a stable signature for one block order. The signature uses the block type, quantity profile, price, and MAR. Source unit or demand identifiers are intentionally ignored so that economically identical converted block orders can be merged later. """ qty_cols = [c for c in row.index if c.startswith("q")] tup = ( row["block_type"], tuple(round(float(row[c]), 6) for c in qty_cols), # quantity vector round(float(row["p"]), 4), # price in EUR/MWh round(float(row["MAR"]), 4), # minimum acceptance ratio ) # Short but collision-resistant fingerprint (SHA-1 over JSON dump) return hashlib.sha1(json.dumps(tup).encode()).hexdigest()
[docs] def compress_blocks(self, df: pd.DataFrame) -> pd.DataFrame: """ Losslessly merge identical converted block-order chains. A linked chain consists of one exclusive parent block and zero or more linked child blocks. Two chains are mergeable when they have the same block signatures and the same tree structure. After merging, identical quantities are summed, the parent keeps ``MAR = 1``, original ids are concatenated for traceability, and child ``code_prm`` values are rewired to the merged parent id. """ qty_cols = [c for c in df.columns if c.startswith("q")] # --- Determine the parent ID for every row: --- # - exclusive blocks reference themselves, # - linked blocks reference the column `code_prm`. df["parent_id"] = np.where( df["block_type"] == "exclusive", df["id"], df["code_prm"] ) # Build a mapping Parent-ID -> [child-IDs] (only linked rows) children_map = ( df[df["block_type"] == "linked"].groupby("parent_id")["id"].apply(list).to_dict() ) # --- Build the per-block signature dict {order_id: signature} --- sig_series = df.apply(self.block_signature, axis=1) blk_sig = dict(zip(df["id"], sig_series)) def chain_sig(chain: str): """Recursively compute the signature of the entire chain below.""" return ( blk_sig[chain], tuple(sorted(chain_sig(c) for c in children_map.get(chain, []))) ) # All roots = exclusive blocks (one per chain) roots = df.loc[df["block_type"] == "exclusive", "id"].tolist() chain_sigs = {r: chain_sig(r) for r in roots} # --- Group roots whose *full chain* signatures are identical --- buckets: dict[tuple, list[str]] = defaultdict(list) for rid, sig in chain_sigs.items(): buckets[sig].append(rid) merged_rows: list[pd.Series] = [] for root_ids in buckets.values(): # Collect every order ID in this chain family (roots + children) all_ids = root_ids + sum([children_map.get(r, []) for r in root_ids], []) sub = df[df["id"].isin(all_ids)] # ---- Merge the root (exclusive) blocks --------------------- p_rows = sub[sub["block_type"] == "exclusive"] parent = p_rows.iloc[0].copy() parent[qty_cols] = p_rows[qty_cols].sum() parent["MAR"] = 1 parent["id"] = "+".join(p_rows["id"]) parent_new_id = parent["id"] # remember for children merged_rows.append(parent) # ---- Merge the children (group by identical qty+price+MAR) --- linked_rows = sub[sub["block_type"] == "linked"] for (_qty_price_mar, child_grp) in linked_rows.groupby(qty_cols + ["p", "MAR"]): kid = child_grp.iloc[0].copy() kid[qty_cols] = child_grp[qty_cols].sum() kid["id"] = "+".join(child_grp["id"]) kid["code_prm"] = parent_new_id # update parent reference merged_rows.append(kid) # Drop helper column before returning if merged_rows: return pd.DataFrame(merged_rows).drop(columns="parent_id") else: return pd.DataFrame(merged_rows)
[docs] def set_block_bids(self) -> None: """Placeholder for a future block-order assembly helper.""" pass
[docs] def set_step_orders(self) -> None: """Placeholder for a future step-order assembly helper.""" pass