Source code for apem.unit_based_model.evaluation.price_analysis

"""Utilities for validating, summarizing, and comparing price tables across algorithms."""

from __future__ import annotations

from itertools import combinations
from typing import Sequence

import numpy as np
import pandas as pd

REQUIRED_COLUMNS = ("algorithm", "price")


[docs] def validate_price_table(df: pd.DataFrame) -> pd.DataFrame: """ Validate and normalize a generic price-analysis input table. :param df: input table containing at least ``algorithm`` and ``price`` columns; additional columns are preserved :return: normalized copy with trimmed column names, normalized algorithm labels, and numeric ``price`` values :raises ValueError: if required columns are missing, algorithm labels are empty, or no numeric prices are available """ normalized = df.copy() normalized.columns = [str(column).strip() for column in normalized.columns] missing = [column for column in REQUIRED_COLUMNS if column not in normalized.columns] if missing: raise ValueError(f"Missing required columns: {missing}. Required columns: {list(REQUIRED_COLUMNS)}.") normalized["algorithm"] = normalized["algorithm"].astype(str).str.strip() normalized["price"] = pd.to_numeric(normalized["price"], errors="coerce") if normalized["algorithm"].eq("").any(): raise ValueError("Column 'algorithm' contains empty values.") if normalized["price"].notna().sum() == 0: raise ValueError("Column 'price' does not contain any numeric values.") return normalized
[docs] def summarize_prices( df: pd.DataFrame, *, group_by: Sequence[str] = ("algorithm",), ) -> pd.DataFrame: """ Compute descriptive statistics for prices grouped by one or more columns. :param df: input price table :param group_by: grouping columns to summarize by; defaults to ``("algorithm",)`` :return: one row per group with counts, central moments, quantiles, spread, and additional quality metrics """ validated = validate_price_table(df) group_columns = _normalize_group_columns(validated, group_by) summary = ( validated.groupby(list(group_columns), dropna=False)["price"] .agg( count="count", mean="mean", median="median", std="std", variance="var", min="min", max="max", p05=lambda series: series.quantile(0.05), p25=lambda series: series.quantile(0.25), p75=lambda series: series.quantile(0.75), p95=lambda series: series.quantile(0.95), ) .reset_index() ) auxiliary = ( validated.groupby(list(group_columns), dropna=False)["price"] .apply(_auxiliary_price_metrics) .unstack() .reset_index() ) result = summary.merge(auxiliary, on=list(group_columns), how="left") ordered_columns = list(group_columns) + [ "count", "missing_count", "negative_price_count", "mean", "median", "std", "variance", "coefficient_of_variation", "min", "max", "spread", "p05", "p25", "p75", "p95", ] return result.loc[:, ordered_columns]
[docs] def compare_price_algorithms( df: pd.DataFrame, *, align_on: Sequence[str] | None = None, baseline: str | None = None, ) -> pd.DataFrame: """ Compare algorithm price series on aligned observations. :param df: input price table with one ``algorithm`` and ``price`` value per aligned observation :param align_on: explicit alignment key columns; when omitted, all non-required columns are used, or an inferred row index if none are available :param baseline: optional algorithm name; if provided, only pairs including this baseline are returned :return: pairwise comparison table with mean levels, mean differences, absolute differences, and correlation per algorithm pair :raises ValueError: if duplicates exist for the same alignment key and algorithm, if fewer than two algorithms are present, or if ``baseline`` is not present """ validated = validate_price_table(df) alignment_columns, prepared = _prepare_alignment(validated, align_on) duplicates = prepared.duplicated(subset=[*alignment_columns, "algorithm"], keep=False) if duplicates.any(): duplicate_rows = prepared.loc[duplicates, [*alignment_columns, "algorithm"]] preview = duplicate_rows.head(5).to_dict(orient="records") raise ValueError( "Found duplicate rows for the same algorithm and alignment key. " f"Examples: {preview}" ) wide = prepared.pivot(index=list(alignment_columns), columns="algorithm", values="price") algorithm_names = list(wide.columns) if len(algorithm_names) < 2: raise ValueError("At least two algorithms are required for pairwise comparison.") pairs = combinations(algorithm_names, 2) if baseline is not None: if baseline not in algorithm_names: raise ValueError(f"Baseline '{baseline}' not found. Available algorithms: {algorithm_names}.") pairs = ((left, right) for left, right in pairs if baseline in (left, right)) comparisons: list[dict[str, float | int | str]] = [] for left, right in pairs: pair = wide[[left, right]].dropna() if pair.empty: continue difference = pair[left] - pair[right] absolute_difference = difference.abs() comparisons.append( { "algorithm_left": left, "algorithm_right": right, "shared_observations": int(pair.shape[0]), "mean_left": float(pair[left].mean()), "mean_right": float(pair[right].mean()), "mean_difference": float(difference.mean()), "mean_absolute_difference": float(absolute_difference.mean()), "max_absolute_difference": float(absolute_difference.max()), "correlation": float(pair[left].corr(pair[right])), } ) return pd.DataFrame(comparisons)
[docs] def round_numeric_columns(df: pd.DataFrame, digits: int = 2) -> pd.DataFrame: """ Round all numeric columns to a fixed number of decimal places. :param df: input table :param digits: number of decimals used for rounding numeric columns :return: copy of ``df`` with rounded numeric columns """ rounded = df.copy() numeric_columns = rounded.select_dtypes(include="number").columns rounded.loc[:, numeric_columns] = rounded.loc[:, numeric_columns].round(digits) return rounded
def _normalize_group_columns(df: pd.DataFrame, group_by: Sequence[str]) -> tuple[str, ...]: group_columns = tuple(group_by) if group_by else ("algorithm",) missing = [column for column in group_columns if column not in df.columns] if missing: raise ValueError(f"Grouping columns not found: {missing}.") return group_columns def _prepare_alignment( df: pd.DataFrame, align_on: Sequence[str] | None, ) -> tuple[tuple[str, ...], pd.DataFrame]: prepared = df.copy() if align_on is None: inferred = [column for column in prepared.columns if column not in REQUIRED_COLUMNS] if inferred: alignment_columns = tuple(inferred) else: prepared["row_number"] = prepared.groupby("algorithm").cumcount() alignment_columns = ("row_number",) else: alignment_columns = tuple(align_on) missing = [column for column in alignment_columns if column not in prepared.columns] if missing: raise ValueError(f"Alignment columns not found: {missing}.") return alignment_columns, prepared def _auxiliary_price_metrics(series: pd.Series) -> pd.Series: valid = series.dropna() mean = valid.mean() std = valid.std() coefficient_of_variation = np.nan if pd.notna(mean) and mean != 0 and pd.notna(std): coefficient_of_variation = float(std / mean) spread = np.nan if not valid.empty: spread = float(valid.max() - valid.min()) return pd.Series( { "missing_count": int(series.isna().sum()), "negative_price_count": int((valid < 0).sum()), "spread": spread, "coefficient_of_variation": coefficient_of_variation, } )