Appendix · Source Code

Analysis Scripts

Four Python scripts powering the quantitative analysis — spread computation, LP-optimised backtests, duration comparison, and Monte Carlo simulation. Approximately 1,200 lines of Python in total.

Scripts
4
Total Lines
~1,200
Language
Python 3
Dependencies
NumPy + SciPy

Click any script below to expand its full source code with syntax highlighting. Each script is self-contained and documented with docstrings.

analyze_24month_spreads.py
Best-N-hours spread analysis across 24 months of SEM day-ahead prices. Computes daily achievable arbitrage spreads for 1h, 2h, and 4h batteries, revenue concentration distributions, and forward scenario modelling (wind growth, Celtic Interconnector, BESS fleet expansion, data centre demand).
457 lines Spread Analysis Scenarios
#!/usr/bin/env python3
"""
24-month SEM price analysis for BESS arbitrage.
Computes daily achievable spreads, revenue distribution, and forward scenarios.

Input: 4 JSON files from Energy-Charts API (H1 2024, H2 2024, H1 2025, H2 2025)
Output: JSON for webapp + printed summary statistics
"""

import json
import os
import sys
from datetime import datetime, timezone, timedelta
from collections import defaultdict
import math

DATA_DIR = "/home/nicky/research/energy-markets/data"
OUTPUT_DIR = "/home/nicky/research/energy-markets/data"

FILES = [
    "sem_2024_h1.json",
    "sem_2024_h2.json",
    "sem_2025_h1.json",
    "sem_2025_h2.json",
]


def load_all_data():
    """Load all price files and aggregate to hourly."""
    hourly = {}  # {(date, hour): [prices]}

    for fname in FILES:
        path = os.path.join(DATA_DIR, fname)
        with open(path) as f:
            data = json.load(f)

        for ts, price in zip(data["unix_seconds"], data["price"]):
            if price is None:
                continue
            dt = datetime.fromtimestamp(ts, tz=timezone.utc)
            key = (dt.strftime("%Y-%m-%d"), dt.hour)
            if key not in hourly:
                hourly[key] = []
            hourly[key].append(price)

    # Average sub-hourly data to get one price per hour
    hourly_avg = {}
    for key, prices in hourly.items():
        hourly_avg[key] = sum(prices) / len(prices)

    return hourly_avg


def group_by_day(hourly_data):
    """Group hourly prices into daily arrays."""
    days = defaultdict(dict)
    for (date_str, hour), price in hourly_data.items():
        days[date_str][hour] = price

    # Only keep days with >= 20 hours of data
    result = {}
    for date_str in sorted(days):
        if len(days[date_str]) >= 20:
            prices_24 = [days[date_str].get(h) for h in range(24)]
            # Fill missing hours with interpolation
            valid_prices = [p for p in prices_24 if p is not None]
            if len(valid_prices) >= 20:
                mean_p = sum(valid_prices) / len(valid_prices)
                prices_24 = [p if p is not None else mean_p for p in prices_24]
                result[date_str] = prices_24
    return result


def daily_arbitrage(prices_24, duration_hrs, rte=0.85):
    """
    Compute achievable daily arbitrage for a battery of given duration.
    Strategy: charge during cheapest N hours, discharge during most expensive N.
    Returns gross spread (EUR/MWh of capacity) after RTE loss.
    """
    n = duration_hrs
    sorted_prices = sorted(prices_24)
    cheapest_n = sorted_prices[:n]
    expensive_n = sorted_prices[-n:]

    charge_cost = sum(cheapest_n) / n  # avg price per MWh when charging
    discharge_revenue = sum(expensive_n) / n  # avg price per MWh when discharging

    # Account for round-trip efficiency: need to buy 1/RTE MWh to discharge 1 MWh
    effective_charge_cost = charge_cost / rte
    net_spread = discharge_revenue - effective_charge_cost

    return {
        "net_spread": net_spread,
        "charge_avg": charge_cost,
        "discharge_avg": discharge_revenue,
        "gross_spread": discharge_revenue - charge_cost,
    }


def percentile(data, pct):
    """Compute percentile of a sorted list."""
    if not data:
        return 0
    k = (len(data) - 1) * pct / 100
    f = math.floor(k)
    c = math.ceil(k)
    if f == c:
        return data[int(k)]
    return data[f] * (c - k) + data[c] * (k - f)


def analyze_spreads(daily_data, duration_hrs=4, rte=0.85):
    """Full analysis of daily spreads."""
    results = []
    for date_str, prices in sorted(daily_data.items()):
        arb = daily_arbitrage(prices, duration_hrs, rte)
        dt = datetime.strptime(date_str, "%Y-%m-%d")
        results.append({
            "date": date_str,
            "month": dt.strftime("%Y-%m"),
            "weekday": dt.weekday(),
            "is_weekend": dt.weekday() >= 5,
            "daily_mean": sum(prices) / len(prices),
            "daily_min": min(prices),
            "daily_max": max(prices),
            "max_min_spread": max(prices) - min(prices),
            "net_spread": arb["net_spread"],
            "gross_spread": arb["gross_spread"],
            "charge_avg": arb["charge_avg"],
            "discharge_avg": arb["discharge_avg"],
            "has_negative": any(p < 0 for p in prices),
            "negative_hours": sum(1 for p in prices if p < 0),
        })
    return results


def revenue_distribution(daily_results, power_mw=50, duration_hrs=4):
    """
    Compute revenue distribution for a specific battery configuration.
    Shows what % of annual revenue comes from the top X% of days.
    """
    capacity_mwh = power_mw * duration_hrs
    daily_revenues = []
    for d in daily_results:
        # Revenue for one cycle: net_spread * capacity
        rev = max(0, d["net_spread"]) * capacity_mwh
        daily_revenues.append({"date": d["date"], "revenue": rev})

    daily_revenues.sort(key=lambda x: x["revenue"], reverse=True)
    total = sum(r["revenue"] for r in daily_revenues)

    # Compute cumulative revenue by top N%
    cumulative = []
    running = 0
    for i, r in enumerate(daily_revenues):
        running += r["revenue"]
        pct_days = (i + 1) / len(daily_revenues) * 100
        pct_revenue = running / total * 100 if total > 0 else 0
        if (i + 1) in [1, 5, 10, 20, 36, 50, 73, 100, 183, 365, len(daily_revenues)]:
            cumulative.append({
                "days": i + 1,
                "pct_days": round(pct_days, 1),
                "pct_revenue": round(pct_revenue, 1),
                "daily_rev_eur": round(r["revenue"]),
            })

    return {
        "total_annual_eur": round(total),
        "per_mw_eur": round(total / power_mw) if power_mw else 0,
        "per_mwh_eur": round(total / capacity_mwh) if capacity_mwh else 0,
        "profitable_days": sum(1 for d in daily_results if d["net_spread"] > 0),
        "total_days": len(daily_results),
        "cumulative": cumulative,
    }


def forward_scenarios(daily_results, daily_data):
    """
    Model how projected supply/demand changes affect the spread distribution.

    Scenarios:
    1. Baseline (current data as-is)
    2. +5 GW wind (2030): more low-price hours, more negatives
    3. Celtic Interconnector: peaks capped at ~EUR 150 (French nuclear + congestion)
    4. +2 GW BESS fleet: spreads compressed ~30% (GB precedent)
    5. Data centre demand +30%: raises floor prices
    6. Combined 2028: Celtic + 1.5 GW BESS + moderate wind growth
    7. Combined 2030: Celtic + 3 GW BESS + high wind + data centres
    """
    scenarios = {}

    # Helper: apply transformation to all daily price arrays and recompute spreads
    def apply_and_analyze(transform_fn, label):
        transformed = {}
        for date_str, prices in daily_data.items():
            transformed[date_str] = transform_fn(list(prices), date_str)
        results = analyze_spreads(transformed, duration_hrs=4, rte=0.85)
        spreads = sorted([d["net_spread"] for d in results])
        profitable = [s for s in spreads if s > 0]
        return {
            "label": label,
            "median_spread": round(percentile(spreads, 50), 1),
            "p25_spread": round(percentile(spreads, 25), 1),
            "p75_spread": round(percentile(spreads, 75), 1),
            "p90_spread": round(percentile(spreads, 90), 1),
            "mean_spread": round(sum(spreads) / len(spreads), 1) if spreads else 0,
            "profitable_days_pct": round(len(profitable) / len(spreads) * 100, 1),
            "negative_price_days": sum(1 for d in results if d["has_negative"]),
            "annual_revenue_per_mw": round(
                sum(max(0, s) * 4 for s in spreads) / 2  # /2 for annualized from 24mo
            ),
        }

    # 1. Baseline
    scenarios["baseline"] = apply_and_analyze(lambda p, d: p, "Current (2024-2025)")

    # 2. More wind: on ~40% of days, reduce prices by 15-30% during off-peak
    def more_wind(prices, date_str):
        # Simulate higher wind penetration: depress low-price hours further
        import hashlib
        seed = int(hashlib.md5(date_str.encode()).hexdigest()[:8], 16)
        is_windy = (seed % 100) < 45  # 45% of days are windy (up from ~32%)
        if is_windy:
            return [p * 0.75 if p < 80 else p * 0.90 for p in prices]
        return prices

    scenarios["more_wind"] = apply_and_analyze(more_wind, "+5 GW Wind (2030)")

    # 3. Celtic Interconnector: cap peaks at ~EUR 150
    def celtic(prices, date_str):
        return [min(p, 150) for p in prices]

    scenarios["celtic"] = apply_and_analyze(celtic, "Celtic Interconnector (2028)")

    # 4. +2 GW BESS: compress spreads ~30%
    def more_bess(prices, date_str):
        mean = sum(prices) / len(prices)
        return [mean + (p - mean) * 0.70 for p in prices]

    scenarios["bess_2gw"] = apply_and_analyze(more_bess, "+2 GW BESS Fleet")

    # 5. Data centre demand: raises floor by ~EUR 10
    def datacentre(prices, date_str):
        return [p + 10 if p < 80 else p + 5 for p in prices]

    scenarios["datacentre"] = apply_and_analyze(datacentre, "+30% Data Centre Demand")

    # 6. Combined 2028: Celtic + 1.5 GW BESS + moderate wind
    def combined_2028(prices, date_str):
        # Celtic cap
        prices = [min(p, 150) for p in prices]
        # 1.5 GW BESS: ~20% spread compression
        mean = sum(prices) / len(prices)
        prices = [mean + (p - mean) * 0.80 for p in prices]
        # Moderate wind increase
        import hashlib
        seed = int(hashlib.md5(date_str.encode()).hexdigest()[:8], 16)
        if (seed % 100) < 38:
            prices = [p * 0.85 if p < 80 else p * 0.95 for p in prices]
        # Data centre uplift
        prices = [p + 5 for p in prices]
        return prices

    scenarios["combined_2028"] = apply_and_analyze(combined_2028, "Combined 2028 Projection")

    # 7. Combined 2030
    def combined_2030(prices, date_str):
        # Celtic cap
        prices = [min(p, 140) for p in prices]
        # 3 GW BESS: ~40% spread compression
        mean = sum(prices) / len(prices)
        prices = [mean + (p - mean) * 0.60 for p in prices]
        # High wind
        import hashlib
        seed = int(hashlib.md5(date_str.encode()).hexdigest()[:8], 16)
        if (seed % 100) < 50:
            prices = [p * 0.70 if p < 80 else p * 0.85 for p in prices]
        # Higher data centre demand
        prices = [p + 8 for p in prices]
        return prices

    scenarios["combined_2030"] = apply_and_analyze(combined_2030, "Combined 2030 Projection")

    return scenarios


def main():
    print("=" * 70)
    print("24-Month SEM Price Analysis for BESS Arbitrage")
    print("Period: January 2024 - December 2025")
    print("=" * 70)
    print()

    # Load and process data
    print("Loading data...")
    hourly = load_all_data()
    print(f"  {len(hourly)} hourly price points loaded")

    daily_data = group_by_day(hourly)
    print(f"  {len(daily_data)} complete days")

    # Date range
    dates = sorted(daily_data.keys())
    print(f"  Range: {dates[0]} to {dates[-1]}")
    print()

    # Analyze for different battery durations
    for dur in [1, 2, 4]:
        print(f"{'=' * 70}")
        print(f"  {dur}-HOUR BATTERY ANALYSIS (85% RTE)")
        print(f"{'=' * 70}")

        results = analyze_spreads(daily_data, duration_hrs=dur, rte=0.85)

        # Net spread statistics
        net_spreads = sorted([d["net_spread"] for d in results])
        gross_spreads = sorted([d["gross_spread"] for d in results])
        profitable = [s for s in net_spreads if s > 0]
        negative_days = [d for d in results if d["has_negative"]]

        print(f"\n  Daily Achievable Spread (EUR/MWh, net of 85% RTE):")
        print(f"    Mean:    EUR {sum(net_spreads)/len(net_spreads):6.1f}/MWh")
        print(f"    Median:  EUR {percentile(net_spreads, 50):6.1f}/MWh")
        print(f"    P25:     EUR {percentile(net_spreads, 25):6.1f}/MWh")
        print(f"    P75:     EUR {percentile(net_spreads, 75):6.1f}/MWh")
        print(f"    P90:     EUR {percentile(net_spreads, 90):6.1f}/MWh")
        print(f"    P95:     EUR {percentile(net_spreads, 95):6.1f}/MWh")
        print(f"    Max:     EUR {max(net_spreads):6.1f}/MWh")
        print(f"    Min:     EUR {min(net_spreads):6.1f}/MWh")
        print(f"\n  Profitability:")
        print(f"    Profitable days: {len(profitable)}/{len(net_spreads)} ({len(profitable)/len(net_spreads)*100:.1f}%)")
        print(f"    Days with negative prices: {len(negative_days)} ({len(negative_days)/len(results)*100:.1f}%)")

        if dur == 4:
            # Revenue distribution for 50 MW / 200 MWh
            print(f"\n  Revenue Distribution (50 MW / 200 MWh, annualized from 24mo):")
            rev = revenue_distribution(results, power_mw=50, duration_hrs=4)
            print(f"    Total 24-month revenue: EUR {rev['total_annual_eur']:,}")
            print(f"    Annualized per MW:      EUR {rev['per_mw_eur']:,}/MW/yr")
            print(f"    Annualized per MWh:     EUR {rev['per_mwh_eur']:,}/MWh/yr")
            print(f"\n    Revenue concentration:")
            for c in rev["cumulative"]:
                print(f"      Top {c['pct_days']:5.1f}% of days ({c['days']:3d} days): "
                      f"{c['pct_revenue']:5.1f}% of revenue "
                      f"(worst day in group: EUR {c['daily_rev_eur']:,})")

        # Monthly breakdown
        print(f"\n  Monthly Median Spread (EUR/MWh):")
        monthly = defaultdict(list)
        for d in results:
            monthly[d["month"]].append(d["net_spread"])
        for month in sorted(monthly):
            spreads = sorted(monthly[month])
            med = percentile(spreads, 50)
            bar = "#" * max(0, int(med / 2))
            print(f"    {month}: {med:6.1f} {bar}")

        # Weekday vs weekend
        wd_spreads = [d["net_spread"] for d in results if not d["is_weekend"]]
        we_spreads = [d["net_spread"] for d in results if d["is_weekend"]]
        print(f"\n  Weekday vs Weekend:")
        print(f"    Weekday mean spread: EUR {sum(wd_spreads)/len(wd_spreads):.1f}/MWh ({len(wd_spreads)} days)")
        print(f"    Weekend mean spread: EUR {sum(we_spreads)/len(we_spreads):.1f}/MWh ({len(we_spreads)} days)")

    # Forward scenarios (4hr battery)
    print(f"\n{'=' * 70}")
    print(f"  FORWARD SCENARIO ANALYSIS (4-hour battery, 50 MW)")
    print(f"{'=' * 70}")

    scenarios = forward_scenarios(
        analyze_spreads(daily_data, duration_hrs=4, rte=0.85),
        daily_data
    )

    print(f"\n  {'Scenario':<30s} {'Median':>8s} {'Mean':>8s} {'P75':>8s} {'P90':>8s} {'Prof%':>7s} {'Rev/MW':>10s}")
    print(f"  {'-'*30} {'-'*8} {'-'*8} {'-'*8} {'-'*8} {'-'*7} {'-'*10}")
    for key, s in scenarios.items():
        print(f"  {s['label']:<30s} {s['median_spread']:>7.1f} {s['mean_spread']:>7.1f} "
              f"{s['p75_spread']:>7.1f} {s['p90_spread']:>7.1f} "
              f"{s['profitable_days_pct']:>6.1f}% {s['annual_revenue_per_mw']:>9,}")

    # Export data for webapp
    print(f"\n\nExporting webapp data...")
    results_4hr = analyze_spreads(daily_data, duration_hrs=4, rte=0.85)

    # Build histogram data
    spreads_for_hist = [d["net_spread"] for d in results_4hr]
    bin_width = 10
    min_bin = int(min(spreads_for_hist) / bin_width) * bin_width - bin_width
    max_bin = int(max(spreads_for_hist) / bin_width) * bin_width + bin_width * 2
    bins = list(range(min_bin, max_bin, bin_width))
    hist = [0] * len(bins)
    for s in spreads_for_hist:
        for i, b in enumerate(bins):
            if b <= s < b + bin_width:
                hist[i] += 1
                break

    # Monthly summary
    monthly_summary = {}
    monthly_data = defaultdict(list)
    for d in results_4hr:
        monthly_data[d["month"]].append(d)
    for month in sorted(monthly_data):
        days = monthly_data[month]
        spreads = [d["net_spread"] for d in days]
        monthly_summary[month] = {
            "median_spread": round(percentile(sorted(spreads), 50), 1),
            "mean_spread": round(sum(spreads) / len(spreads), 1),
            "mean_price": round(sum(d["daily_mean"] for d in days) / len(days), 1),
            "profitable_pct": round(sum(1 for s in spreads if s > 0) / len(spreads) * 100, 1),
            "negative_price_days": sum(1 for d in days if d["has_negative"]),
            "days": len(days),
        }

    webapp_data = {
        "period": f"{dates[0]} to {dates[-1]}",
        "total_days": len(results_4hr),
        "total_hours": len(hourly),
        "battery_config": "4-hour, 85% RTE",
        "histogram": {"bins": bins, "counts": hist, "bin_width": bin_width},
        "monthly": monthly_summary,
        "scenarios": scenarios,
        "revenue_50mw": revenue_distribution(results_4hr, power_mw=50, duration_hrs=4),
        "summary": {
            "mean_spread": round(sum(spreads_for_hist) / len(spreads_for_hist), 1),
            "median_spread": round(percentile(sorted(spreads_for_hist), 50), 1),
            "p25": round(percentile(sorted(spreads_for_hist), 25), 1),
            "p75": round(percentile(sorted(spreads_for_hist), 75), 1),
            "p90": round(percentile(sorted(spreads_for_hist), 90), 1),
            "p95": round(percentile(sorted(spreads_for_hist), 95), 1),
            "profitable_pct": round(len([s for s in spreads_for_hist if s > 0]) / len(spreads_for_hist) * 100, 1),
        },
        "durations": {},
    }

    # Add 1hr and 2hr summaries
    for dur in [1, 2, 4]:
        res = analyze_spreads(daily_data, duration_hrs=dur, rte=0.85)
        spreads = sorted([d["net_spread"] for d in res])
        webapp_data["durations"][f"{dur}hr"] = {
            "mean_spread": round(sum(spreads) / len(spreads), 1),
            "median_spread": round(percentile(spreads, 50), 1),
            "p75": round(percentile(spreads, 75), 1),
            "p90": round(percentile(spreads, 90), 1),
            "profitable_pct": round(len([s for s in spreads if s > 0]) / len(spreads) * 100, 1),
            "annual_rev_per_mw": round(sum(max(0, s) * dur for s in spreads) / 2),
        }

    output_path = os.path.join(OUTPUT_DIR, "spread_analysis_24month.json")
    with open(output_path, "w") as f:
        json.dump(webapp_data, f, indent=2)
    print(f"  Written to {output_path}")


if __name__ == "__main__":
    main()
backtest_battery.py
LP-optimised perfect foresight backtest and fixed-schedule simulation for a 50 MW / 200 MWh battery in the Irish SEM. Solves a daily linear programme for optimal charge/discharge decisions, then compares against a naive fixed-window strategy (charge 02:00–05:59, discharge 17:00–20:59).
261 lines Backtest LP Optimisation
#!/usr/bin/env python3
"""
Battery backtest: simulate a 50 MW / 200 MWh battery in the Irish SEM.

Two strategies:
1. Perfect foresight (PF): LP-optimized daily arbitrage knowing all 24 hourly prices.
2. Fixed schedule (FS): charge hours 2-5 (02:00-05:59), discharge hours 17-20
   (17:00-20:59). No foresight required.

SoC resets to SOC_MIN at the start of each day. This is standard for daily
arbitrage backtests -- the battery fully cycles each day.

RTE convention (per user spec): RTE is applied on charge.
  - Charging 50 MWh from grid stores 50 * 0.85 = 42.5 MWh in the battery.
  - Discharging 50 MWh from the battery delivers 50 MWh to the grid.

Output: data/backtest_results.csv
"""

import csv
import sys
import numpy as np
from scipy.optimize import linprog
from collections import defaultdict

# --- Battery parameters -----------------------------------------------------------
POWER_MW = 50
CAPACITY_MWH = 200
SOC_MIN_FRAC = 0.05
SOC_MAX_FRAC = 0.95
SOC_MIN = CAPACITY_MWH * SOC_MIN_FRAC    # 10 MWh
SOC_MAX = CAPACITY_MWH * SOC_MAX_FRAC    # 190 MWh
USABLE_MWH = SOC_MAX - SOC_MIN           # 180 MWh
RTE = 0.85

# Fixed schedule hours (clock hours, UTC)
CHARGE_HOURS = {2, 3, 4, 5}         # 02:00-05:59
DISCHARGE_HOURS = {17, 18, 19, 20}  # 17:00-20:59


def load_hourly_data(path):
    """Load hourly price data, grouped by date. Returns dict[date] -> [(hour, price)]."""
    days = defaultdict(list)
    with open(path) as f:
        reader = csv.DictReader(f)
        for row in reader:
            date = row['timestamp_utc'][:10]
            hour = int(row['hour'])
            price = float(row['price_eur_mwh'])
            days[date].append((hour, price))
    for date in days:
        days[date].sort()
    return days


def perfect_foresight_lp(prices_arr):
    """
    Solve daily arbitrage LP with perfect knowledge of hourly prices.

    Variables: x = [c_0 .. c_{n-1}, d_0 .. d_{n-1}]
      c_h = charge power drawn from grid in hour h (MW, 0 to POWER_MW)
      d_h = discharge power delivered to grid in hour h (MW, 0 to POWER_MW)

    Objective (maximize):
      sum_h price_h * (d_h - c_h)

    SoC evolution:
      SoC[0] = SOC_MIN
      SoC[h+1] = SoC[h] + RTE * c_h - d_h

    Constraints:
      SOC_MIN <= SoC[h] <= SOC_MAX  for h = 1..n
      (equivalently, 0 <= cumulative_energy[h] <= USABLE_MWH)

    where cumulative_energy[h] = RTE * sum(c[0..h-1]) - sum(d[0..h-1])
    """
    n = len(prices_arr)
    if n == 0:
        return None

    # Objective: minimize sum(price * c) - sum(price * d)  [i.e. negative of revenue]
    c_obj = np.concatenate([prices_arr, -prices_arr])

    # Variable bounds
    bounds = [(0, POWER_MW)] * (2 * n)

    # SoC constraints via cumulative energy
    # For each step k (0..n-1), after processing hour k:
    #   cum[k] = RTE * sum(c[0..k]) - sum(d[0..k])
    #
    # Upper: cum[k] <= USABLE_MWH  ->  RTE*sum(c) - sum(d) <= USABLE_MWH
    # Lower: cum[k] >= 0           -> -RTE*sum(c) + sum(d) <= 0

    A_ub = np.zeros((2 * n, 2 * n))
    b_ub = np.zeros(2 * n)

    for k in range(n):
        # Upper SoC: RTE * sum(c[0..k]) - sum(d[0..k]) <= USABLE_MWH
        A_ub[k, :k + 1] = RTE
        A_ub[k, n:n + k + 1] = -1.0
        b_ub[k] = USABLE_MWH

        # Lower SoC: -RTE * sum(c[0..k]) + sum(d[0..k]) <= 0
        A_ub[n + k, :k + 1] = -RTE
        A_ub[n + k, n:n + k + 1] = 1.0
        b_ub[n + k] = 0.0

    result = linprog(c_obj, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')

    if not result.success:
        return None

    charge = result.x[:n]
    discharge = result.x[n:]

    mwh_charged = float(np.sum(charge))
    mwh_discharged = float(np.sum(discharge))
    revenue = float(np.sum(prices_arr * (discharge - charge)))

    return {
        'revenue': revenue,
        'mwh_charged': mwh_charged,
        'mwh_discharged': mwh_discharged,
    }


def fixed_schedule_sim(hours, prices):
    """
    Simulate fixed schedule: charge during CHARGE_HOURS, discharge during
    DISCHARGE_HOURS. Processes hours in chronological order, respecting SoC limits.
    """
    hour_price = dict(zip(hours, prices))
    soc = SOC_MIN
    total_revenue = 0.0
    mwh_charged = 0.0
    mwh_discharged = 0.0

    # Walk through all 24 clock hours in order
    for h in range(24):
        if h not in hour_price:
            continue

        price = hour_price[h]

        if h in CHARGE_HOURS:
            # Charge: draw from grid, store RTE fraction
            max_store = SOC_MAX - soc                        # headroom in battery
            energy_stored = min(POWER_MW * RTE, max_store)   # MWh added to SoC
            if energy_stored > 0:
                grid_draw = energy_stored / RTE              # MWh drawn from grid
                soc += energy_stored
                mwh_charged += grid_draw
                total_revenue -= price * grid_draw

        elif h in DISCHARGE_HOURS:
            # Discharge: deliver to grid, deplete SoC
            available = soc - SOC_MIN                        # MWh above min SoC
            energy_delivered = min(POWER_MW, available)       # MWh to grid
            if energy_delivered > 0:
                soc -= energy_delivered
                mwh_discharged += energy_delivered
                total_revenue += price * energy_delivered

    return {
        'revenue': total_revenue,
        'mwh_charged': mwh_charged,
        'mwh_discharged': mwh_discharged,
    }


def main():
    print("Loading hourly price data...")
    days = load_hourly_data('data/sem_complete_hourly.csv')
    dates = sorted(days.keys())
    print(f"Loaded {len(dates)} days ({dates[0]} to {dates[-1]})")

    results = []
    lp_failures = 0

    for i, date in enumerate(dates):
        day_data = days[date]
        hours = [h for h, _ in day_data]
        prices = [p for _, p in day_data]

        if len(hours) < 12:
            continue

        prices_arr = np.array(prices)

        # Perfect foresight (LP)
        pf = perfect_foresight_lp(prices_arr)
        if pf is None:
            lp_failures += 1
            continue

        # Fixed schedule (simulation)
        fs = fixed_schedule_sim(hours, prices)

        row = {
            'date': date,
            'year': int(date[:4]),
            'month': int(date[5:7]),
            'n_hours': len(hours),
            'pf_revenue': round(pf['revenue'], 2),
            'pf_mwh_charged': round(pf['mwh_charged'], 2),
            'pf_mwh_discharged': round(pf['mwh_discharged'], 2),
            'pf_cycles': round(pf['mwh_discharged'] / USABLE_MWH, 4),
            'fs_revenue': round(fs['revenue'], 2),
            'fs_mwh_charged': round(fs['mwh_charged'], 2),
            'fs_mwh_discharged': round(fs['mwh_discharged'], 2),
            'fs_cycles': round(fs['mwh_discharged'] / USABLE_MWH, 4),
        }
        results.append(row)

        if (i + 1) % 500 == 0:
            print(f"  Processed {i + 1}/{len(dates)} days...")

    print(f"\nProcessed {len(results)} days ({lp_failures} LP failures skipped)")

    # Write CSV
    fieldnames = [
        'date', 'year', 'month', 'n_hours',
        'pf_revenue', 'pf_mwh_charged', 'pf_mwh_discharged', 'pf_cycles',
        'fs_revenue', 'fs_mwh_charged', 'fs_mwh_discharged', 'fs_cycles',
    ]

    with open('data/backtest_results.csv', 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(results)

    # Print year-by-year summary
    print("\n-- Year-by-year summary --")
    print(f"{'Year':>6} {'Days':>5} {'PF Revenue':>14} {'FS Revenue':>14} {'Capture':>8}")
    for year in sorted(set(r['year'] for r in results)):
        yr = [r for r in results if r['year'] == year]
        pf_total = sum(r['pf_revenue'] for r in yr)
        fs_total = sum(r['fs_revenue'] for r in yr)
        n_days = len(yr)
        capture = (fs_total / pf_total * 100) if pf_total > 0 else 0
        print(f"{year:>6} {n_days:>5} EUR{pf_total:>12,.0f} EUR{fs_total:>12,.0f} {capture:>7.1f}%")

    # Sanity checks
    print("\n-- Sanity checks --")
    sample = results[0]
    print(f"First day ({sample['date']}): PF EUR{sample['pf_revenue']:.0f}, "
          f"FS EUR{sample['fs_revenue']:.0f}")
    neg_pf = sum(1 for r in results if r['pf_revenue'] < 0)
    neg_fs = sum(1 for r in results if r['fs_revenue'] < 0)
    print(f"Negative revenue days: PF={neg_pf}, FS={neg_fs}")
    avg_pf_cycles = np.mean([r['pf_cycles'] for r in results])
    avg_fs_cycles = np.mean([r['fs_cycles'] for r in results])
    print(f"Avg cycles/day: PF={avg_pf_cycles:.2f}, FS={avg_fs_cycles:.2f}")
    print(f"Max PF cycles in a day: {max(r['pf_cycles'] for r in results):.2f}")

    print("\nDone. Output: data/backtest_results.csv")


if __name__ == '__main__':
    main()
backtest_duration_comparison.py
Head-to-head comparison of 2-hour vs 4-hour battery configurations using identical methodology. Both run at 50 MW with 85% RTE and 5–95% SoC limits. Produces per-year EUR/MW revenue breakdowns and capture-ratio comparisons to quantify the marginal value of the extra 2 hours of duration.
292 lines Duration Study 2h vs 4h
#!/usr/bin/env python3
"""
Battery duration comparison backtest: 2-hour vs 4-hour in the Irish SEM.

Runs the SAME methodology for both configurations:
  - 50 MW / 100 MWh (2-hour) -- same power, half the energy
  - 50 MW / 200 MWh (4-hour) -- existing reference case

Both with: 85% RTE, 5-95% SoC limits, perfect foresight + fixed schedule strategies.

Fixed schedules:
  2h: charge 03:00-04:59, discharge 17:00-18:59
  4h: charge 02:00-05:59, discharge 17:00-20:59

RTE convention: applied on charge. Drawing X MWh from grid stores X * 0.85 in battery.
SoC resets to minimum at the start of each day.

Output: data/backtest_2h_results.csv (2h), data/backtest_4h_results.csv (4h)
"""

import csv
import sys
import numpy as np
from scipy.optimize import linprog
from collections import defaultdict

# --- Common parameters ------------------------------------------------------------
POWER_MW = 50
RTE = 0.85
SOC_MIN_FRAC = 0.05
SOC_MAX_FRAC = 0.95

# --- Configuration per duration ---------------------------------------------------
CONFIGS = {
    '2h': {
        'capacity_mwh': 100,
        'charge_hours': {3, 4},           # 03:00-04:59
        'discharge_hours': {17, 18},      # 17:00-18:59
    },
    '4h': {
        'capacity_mwh': 200,
        'charge_hours': {2, 3, 4, 5},     # 02:00-05:59
        'discharge_hours': {17, 18, 19, 20},  # 17:00-20:59
    },
}


def load_hourly_data(path):
    """Load hourly price data, grouped by date. Returns dict[date] -> [(hour, price)]."""
    days = defaultdict(list)
    with open(path) as f:
        reader = csv.DictReader(f)
        for row in reader:
            date = row['timestamp_utc'][:10]
            hour = int(row['hour'])
            price = float(row['price_eur_mwh'])
            days[date].append((hour, price))
    for date in days:
        days[date].sort()
    return days


def perfect_foresight_lp(prices_arr, capacity_mwh):
    """
    Solve daily arbitrage LP with perfect knowledge of hourly prices.

    Same formulation as backtest_battery.py but parameterised on capacity.
    """
    n = len(prices_arr)
    if n == 0:
        return None

    soc_min = capacity_mwh * SOC_MIN_FRAC
    soc_max = capacity_mwh * SOC_MAX_FRAC
    usable = soc_max - soc_min

    # Objective: minimize cost = sum(price * charge) - sum(price * discharge)
    c_obj = np.concatenate([prices_arr, -prices_arr])

    # Variable bounds: charge and discharge each 0..POWER_MW
    bounds = [(0, POWER_MW)] * (2 * n)

    # SoC constraints via cumulative energy
    A_ub = np.zeros((2 * n, 2 * n))
    b_ub = np.zeros(2 * n)

    for k in range(n):
        # Upper: RTE * sum(c[0..k]) - sum(d[0..k]) <= usable
        A_ub[k, :k + 1] = RTE
        A_ub[k, n:n + k + 1] = -1.0
        b_ub[k] = usable

        # Lower: -RTE * sum(c[0..k]) + sum(d[0..k]) <= 0
        A_ub[n + k, :k + 1] = -RTE
        A_ub[n + k, n:n + k + 1] = 1.0
        b_ub[n + k] = 0.0

    result = linprog(c_obj, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')

    if not result.success:
        return None

    charge = result.x[:n]
    discharge = result.x[n:]

    return {
        'revenue': float(np.sum(prices_arr * (discharge - charge))),
        'mwh_charged': float(np.sum(charge)),
        'mwh_discharged': float(np.sum(discharge)),
        'usable': usable,
    }


def fixed_schedule_sim(hours, prices, capacity_mwh, charge_hours, discharge_hours):
    """
    Simulate fixed schedule: charge during charge_hours, discharge during
    discharge_hours. Processes hours in chronological order, respecting SoC limits.
    """
    soc_min = capacity_mwh * SOC_MIN_FRAC
    soc_max = capacity_mwh * SOC_MAX_FRAC
    usable = soc_max - soc_min

    hour_price = dict(zip(hours, prices))
    soc = soc_min
    total_revenue = 0.0
    mwh_charged = 0.0
    mwh_discharged = 0.0

    for h in range(24):
        if h not in hour_price:
            continue
        price = hour_price[h]

        if h in charge_hours:
            max_store = soc_max - soc
            energy_stored = min(POWER_MW * RTE, max_store)
            if energy_stored > 0:
                grid_draw = energy_stored / RTE
                soc += energy_stored
                mwh_charged += grid_draw
                total_revenue -= price * grid_draw

        elif h in discharge_hours:
            available = soc - soc_min
            energy_delivered = min(POWER_MW, available)
            if energy_delivered > 0:
                soc -= energy_delivered
                mwh_discharged += energy_delivered
                total_revenue += price * energy_delivered

    return {
        'revenue': total_revenue,
        'mwh_charged': mwh_charged,
        'mwh_discharged': mwh_discharged,
        'usable': usable,
    }


def run_backtest(days, dates, config_name, config):
    """Run backtest for a single configuration. Returns list of daily result dicts."""
    capacity = config['capacity_mwh']
    charge_hrs = config['charge_hours']
    discharge_hrs = config['discharge_hours']
    usable = capacity * (SOC_MAX_FRAC - SOC_MIN_FRAC)

    results = []
    lp_failures = 0

    for i, date in enumerate(dates):
        day_data = days[date]
        hours = [h for h, _ in day_data]
        prices = [p for _, p in day_data]

        if len(hours) < 12:
            continue

        prices_arr = np.array(prices)

        # Perfect foresight
        pf = perfect_foresight_lp(prices_arr, capacity)
        if pf is None:
            lp_failures += 1
            continue

        # Fixed schedule
        fs = fixed_schedule_sim(hours, prices, capacity, charge_hrs, discharge_hrs)

        row = {
            'date': date,
            'year': int(date[:4]),
            'month': int(date[5:7]),
            'duration': config_name,
            'capacity_mwh': capacity,
            'n_hours': len(hours),
            'pf_revenue': round(pf['revenue'], 2),
            'pf_mwh_charged': round(pf['mwh_charged'], 2),
            'pf_mwh_discharged': round(pf['mwh_discharged'], 2),
            'pf_cycles': round(pf['mwh_discharged'] / usable, 4),
            'fs_revenue': round(fs['revenue'], 2),
            'fs_mwh_charged': round(fs['mwh_charged'], 2),
            'fs_mwh_discharged': round(fs['mwh_discharged'], 2),
            'fs_cycles': round(fs['mwh_discharged'] / usable, 4),
        }
        results.append(row)

        if (i + 1) % 500 == 0:
            print(f"  [{config_name}] Processed {i + 1}/{len(dates)} days...")

    print(f"  [{config_name}] Done: {len(results)} days ({lp_failures} LP failures)")
    return results


def write_csv(results, path):
    """Write results to CSV."""
    fieldnames = [
        'date', 'year', 'month', 'duration', 'capacity_mwh', 'n_hours',
        'pf_revenue', 'pf_mwh_charged', 'pf_mwh_discharged', 'pf_cycles',
        'fs_revenue', 'fs_mwh_charged', 'fs_mwh_discharged', 'fs_cycles',
    ]
    with open(path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(results)
    print(f"  Written: {path} ({len(results)} rows)")


def print_summary(results, label):
    """Print year-by-year summary for one configuration."""
    print(f"\n-- {label} --")
    print(f"{'Year':>6} {'Days':>5} {'PF Rev (EUR)':>14} {'FS Rev (EUR)':>14} "
          f"{'PF/MW':>10} {'FS/MW':>10} {'Capture':>8} {'PF cyc/d':>9}")

    all_years = sorted(set(r['year'] for r in results))
    for year in all_years:
        yr = [r for r in results if r['year'] == year]
        pf_total = sum(r['pf_revenue'] for r in yr)
        fs_total = sum(r['fs_revenue'] for r in yr)
        n_days = len(yr)
        pf_per_mw = pf_total / POWER_MW
        fs_per_mw = fs_total / POWER_MW
        capture = (fs_total / pf_total * 100) if pf_total > 0 else 0
        avg_pf_cycles = sum(r['pf_cycles'] for r in yr) / n_days
        # Annualise partial years
        ann_factor = 365 / n_days if n_days > 0 else 1
        print(f"{year:>6} {n_days:>5} {pf_total:>14,.0f} {fs_total:>14,.0f} "
              f"{pf_per_mw:>10,.0f} {fs_per_mw:>10,.0f} {capture:>7.1f}% "
              f"{avg_pf_cycles:>8.2f}")


def main():
    print("Loading hourly price data...")
    days = load_hourly_data('data/sem_complete_hourly.csv')
    dates = sorted(days.keys())
    print(f"Loaded {len(dates)} days ({dates[0]} to {dates[-1]})")

    # Run both configurations
    results_2h = run_backtest(days, dates, '2h', CONFIGS['2h'])
    results_4h = run_backtest(days, dates, '4h', CONFIGS['4h'])

    # Write separate CSVs
    write_csv(results_2h, 'data/backtest_2h_results.csv')
    write_csv(results_4h, 'data/backtest_4h_results.csv')

    # Print summaries
    print_summary(results_2h, '2-HOUR (50 MW / 100 MWh)')
    print_summary(results_4h, '4-HOUR (50 MW / 200 MWh)')

    # Quick comparison
    print("\n-- HEAD-TO-HEAD (EUR/MW/year) --")
    print(f"{'Year':>6} {'2h PF/MW':>10} {'4h PF/MW':>10} {'2h/4h PF':>9} "
          f"{'2h FS/MW':>10} {'4h FS/MW':>10} {'2h/4h FS':>9}")

    years_2h = {r['year'] for r in results_2h}
    years_4h = {r['year'] for r in results_4h}
    for year in sorted(years_2h & years_4h):
        yr2 = [r for r in results_2h if r['year'] == year]
        yr4 = [r for r in results_4h if r['year'] == year]
        pf2 = sum(r['pf_revenue'] for r in yr2) / POWER_MW
        pf4 = sum(r['pf_revenue'] for r in yr4) / POWER_MW
        fs2 = sum(r['fs_revenue'] for r in yr2) / POWER_MW
        fs4 = sum(r['fs_revenue'] for r in yr4) / POWER_MW
        ratio_pf = (pf2 / pf4 * 100) if pf4 > 0 else 0
        ratio_fs = (fs2 / fs4 * 100) if fs4 > 0 else 0
        print(f"{year:>6} {pf2:>10,.0f} {pf4:>10,.0f} {ratio_pf:>8.1f}% "
              f"{fs2:>10,.0f} {fs4:>10,.0f} {ratio_fs:>8.1f}%")

    print("\nDone.")


if __name__ == '__main__':
    main()
monte_carlo_sim.py
1,000-scenario Monte Carlo simulation of annual battery revenue. Generates synthetic price curves from a fitted regime-switching model with hourly correlation structure, price spikes, and seasonal patterns. Dispatches each day using an adaptive day-ahead LP with realistic ~12% MAPE forecast error. Outputs full distributional statistics including VaR, probability thresholds, and regime breakdowns.
315 lines Monte Carlo 1,000 Scenarios
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
"""
Part 3a: Monte Carlo Battery Revenue Simulation

Generates 1,000 price scenarios and runs battery dispatch on each.
Uses an adaptive day-ahead strategy (not PF, not FS).

Battery spec: 50 MW / 200 MWh / 85% RTE / ~0.85 cycles/day avg

Strategy: Adaptive day-ahead optimization with forecast error
  - Uses the true day-ahead prices but adds forecast noise
  - Optimizes charge/discharge schedule using noisy forecast
  - Executes against actual prices (revenue based on actual)

Output: data/monte_carlo_results.csv
"""

import numpy as np
import json
import csv
import sys
import time
from scipy.optimize import linprog

np.random.seed(42)

DATA_DIR = 'data'

# --- Battery Parameters -----------------------------------------------------------

POWER_MW = 50
CAPACITY_MWH = 200
SOC_MIN_FRAC = 0.05
SOC_MAX_FRAC = 0.95
SOC_MIN = CAPACITY_MWH * SOC_MIN_FRAC    # 10 MWh
SOC_MAX = CAPACITY_MWH * SOC_MAX_FRAC    # 190 MWh
USABLE_MWH = SOC_MAX - SOC_MIN           # 180 MWh
RTE = 0.85
TARGET_CYCLES = 0.85  # average cycles per day

# --- Load Price Model -------------------------------------------------------------

def load_model_params():
    with open(f'{DATA_DIR}/price_model_params.json') as f:
        return json.load(f)


# --- Price Generation (inline from price_model.py) --------------------------------

def generate_prices(params, n_days=365, seed=None):
    """Generate synthetic daily price curves."""
    from scipy.linalg import cholesky

    if seed is not None:
        rng = np.random.RandomState(seed)
    else:
        rng = np.random.RandomState()

    trans = np.array(params['transition_matrix'])
    corr = np.array(params['noise_correlation'])
    L = cholesky(corr, lower=True)

    # Stationary distribution
    eigvals, eigvecs = np.linalg.eig(trans.T)
    idx = np.argmin(np.abs(eigvals - 1))
    stat_dist = np.real(eigvecs[:, idx])
    stat_dist = stat_dist / stat_dist.sum()

    regime = rng.choice(3, p=stat_dist)
    prev_daily_mean = None
    daily_ac = params['daily_autocorrelation']

    all_prices = np.zeros((n_days, 24))
    all_regimes = np.zeros(n_days, dtype=int)

    for d in range(n_days):
        if d > 0:
            regime = rng.choice(3, p=trans[regime])
        all_regimes[d] = regime

        month = min(12, (d * 12) // n_days + 1)
        dow = d % 7
        day_type = 'weekend' if dow >= 5 else 'weekday'
        season_map = {1: 'winter', 2: 'winter', 3: 'spring', 4: 'spring',
                      5: 'spring', 6: 'summer', 7: 'summer', 8: 'summer',
                      9: 'autumn', 10: 'autumn', 11: 'autumn', 12: 'winter'}
        season = season_map[month]

        key = f"{regime}_{season}_{day_type}"
        profile = params['hourly_profiles'].get(key)
        if profile is None:
            key = f"{regime}_{season}_weekday"
            profile = params['hourly_profiles'][key]

        hourly_mean = np.array(profile['mean'])
        hourly_std = np.array(profile['std'])

        monthly_factor = params['monthly_factors'].get(str(month), 1.0)

        z = L @ rng.randn(24)

        if prev_daily_mean is not None:
            overall_mean = hourly_mean.mean()
            daily_innovation = (1 - daily_ac) * overall_mean + daily_ac * prev_daily_mean
            mean_shift = daily_innovation - overall_mean
            hourly_mean = hourly_mean + mean_shift * 0.3

        prices = hourly_mean * monthly_factor + hourly_std * z * 0.6

        if rng.random() < params['spike']['daily_probability']:
            spike_hour = rng.choice([7, 8, 17, 18, 19], p=[0.15, 0.15, 0.25, 0.25, 0.2])
            spike_excess = max(0, rng.normal(
                params['spike']['mean_excess'],
                params['spike']['std_excess']
            ))
            prices[spike_hour] += spike_excess
            if spike_hour > 0:
                prices[spike_hour - 1] += spike_excess * 0.4
            if spike_hour < 23:
                prices[spike_hour + 1] += spike_excess * 0.4

        if regime == 0:
            for h in [2, 3, 4, 5, 13, 14]:
                if rng.random() < 0.05:
                    prices[h] = rng.normal(params['negative']['mean'],
                                           abs(params['negative']['std']))

        prices = np.clip(prices, -500, 4000)
        all_prices[d] = prices
        prev_daily_mean = prices.mean()

    return all_prices, all_regimes


# --- Battery Dispatch: Adaptive Day-Ahead ----------------------------------------

def solve_day_lp(prices_24, power_mw, usable_mwh, rte):
    """Solve daily arbitrage LP given (possibly noisy) prices."""
    n = 24
    c_obj = np.concatenate([prices_24, -prices_24])
    bounds = [(0, power_mw)] * (2 * n)

    A_ub = np.zeros((2 * n, 2 * n))
    b_ub = np.zeros(2 * n)

    for k in range(n):
        A_ub[k, :k + 1] = rte
        A_ub[k, n:n + k + 1] = -1.0
        b_ub[k] = usable_mwh

        A_ub[n + k, :k + 1] = -rte
        A_ub[n + k, n:n + k + 1] = 1.0
        b_ub[n + k] = 0.0

    result = linprog(c_obj, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')

    if not result.success:
        return np.zeros(n), np.zeros(n)

    return result.x[:n], result.x[n:]


def simulate_battery_day(actual_prices, forecast_error_std=0.12):
    """
    Adaptive day-ahead strategy:
    1. Generate noisy forecast of day-ahead prices
    2. Optimize dispatch against forecast
    3. Compute revenue against actual prices
    """
    n = 24
    actual = np.array(actual_prices)

    # Forecast = actual * (1 + noise), noise ~ N(0, forecast_error_std)
    # This gives ~12% MAPE which is realistic for DA forecasting
    noise = np.random.normal(0, forecast_error_std, n)
    forecast = actual * (1 + noise)

    # Solve LP using forecast prices
    charge, discharge = solve_day_lp(forecast, POWER_MW, USABLE_MWH, RTE)

    # Revenue is computed on ACTUAL prices
    revenue = float(np.sum(actual * (discharge - charge)))
    mwh_charged = float(np.sum(charge))
    mwh_discharged = float(np.sum(discharge))
    cycles = mwh_discharged / USABLE_MWH

    return {
        'revenue': revenue,
        'mwh_charged': mwh_charged,
        'mwh_discharged': mwh_discharged,
        'cycles': cycles,
    }


# --- Main Simulation Loop --------------------------------------------------------

def run_simulation(n_scenarios=1000, n_days=365):
    print("Loading model parameters...")
    params = load_model_params()

    print(f"Running {n_scenarios} scenarios x {n_days} days = {n_scenarios * n_days:,} days total")
    print(f"Battery: {POWER_MW} MW / {CAPACITY_MWH} MWh / {RTE*100:.0f}% RTE")

    results = []
    t_start = time.time()

    for s in range(n_scenarios):
        # Generate prices for this scenario
        prices, regimes = generate_prices(params, n_days=n_days, seed=s)

        # Run battery dispatch for each day
        annual_revenue = 0.0
        annual_charged = 0.0
        annual_discharged = 0.0
        annual_cycles = 0.0
        daily_revenues = []
        daily_spreads = []

        for d in range(n_days):
            day_result = simulate_battery_day(prices[d])
            annual_revenue += day_result['revenue']
            annual_charged += day_result['mwh_charged']
            annual_discharged += day_result['mwh_discharged']
            annual_cycles += day_result['cycles']
            daily_revenues.append(day_result['revenue'])
            daily_spreads.append(prices[d].max() - prices[d].min())

        daily_revenues = np.array(daily_revenues)
        daily_spreads = np.array(daily_spreads)

        # Compute scenario statistics
        mean_daily_price = prices.mean()
        mean_daily_spread = daily_spreads.mean()
        mean_four_hr_spread = np.mean([
            np.sort(prices[d])[-4:].mean() - np.sort(prices[d])[:4].mean()
            for d in range(n_days)
        ])
        negative_pct = (prices < 0).mean() * 100

        # Regime mix
        regime_pcts = [float((regimes == r).mean()) for r in range(3)]

        row = {
            'scenario': s,
            'annual_revenue': round(annual_revenue, 2),
            'revenue_per_mw': round(annual_revenue / POWER_MW, 2),
            'total_charged_mwh': round(annual_charged, 1),
            'total_discharged_mwh': round(annual_discharged, 1),
            'avg_cycles_per_day': round(annual_cycles / n_days, 4),
            'mean_daily_revenue': round(daily_revenues.mean(), 2),
            'std_daily_revenue': round(daily_revenues.std(), 2),
            'min_daily_revenue': round(daily_revenues.min(), 2),
            'max_daily_revenue': round(daily_revenues.max(), 2),
            'negative_revenue_days': int((daily_revenues < 0).sum()),
            'mean_price': round(mean_daily_price, 2),
            'mean_daily_spread': round(mean_daily_spread, 2),
            'mean_four_hr_spread': round(mean_four_hr_spread, 2),
            'negative_price_pct': round(negative_pct, 4),
            'regime_low_pct': round(regime_pcts[0] * 100, 1),
            'regime_normal_pct': round(regime_pcts[1] * 100, 1),
            'regime_high_pct': round(regime_pcts[2] * 100, 1),
        }
        results.append(row)

        if (s + 1) % 100 == 0:
            elapsed = time.time() - t_start
            rate = (s + 1) / elapsed
            remaining = (n_scenarios - s - 1) / rate
            print(f"  Scenario {s+1}/{n_scenarios} "
                  f"(rev: EUR {annual_revenue:,.0f}, "
                  f"per MW: EUR {annual_revenue/POWER_MW:,.0f}) "
                  f"[{elapsed:.0f}s elapsed, ~{remaining:.0f}s remaining]")

    elapsed = time.time() - t_start
    print(f"\nSimulation complete in {elapsed:.1f}s ({elapsed/n_scenarios:.3f}s per scenario)")

    # Write CSV
    fieldnames = list(results[0].keys())
    with open(f'{DATA_DIR}/monte_carlo_results.csv', 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(results)
    print(f"Saved results to {DATA_DIR}/monte_carlo_results.csv")

    # Quick summary
    revenues = np.array([r['revenue_per_mw'] for r in results])
    print(f"\n{'='*60}")
    print(f"REVENUE PER MW SUMMARY ({n_scenarios} scenarios)")
    print(f"{'='*60}")
    print(f"  Mean:   EUR {revenues.mean():>10,.0f}")
    print(f"  Median: EUR {np.median(revenues):>10,.0f}")
    print(f"  Std:    EUR {revenues.std():>10,.0f}")
    print(f"  P5:     EUR {np.percentile(revenues, 5):>10,.0f}")
    print(f"  P10:    EUR {np.percentile(revenues, 10):>10,.0f}")
    print(f"  P25:    EUR {np.percentile(revenues, 25):>10,.0f}")
    print(f"  P75:    EUR {np.percentile(revenues, 75):>10,.0f}")
    print(f"  P90:    EUR {np.percentile(revenues, 90):>10,.0f}")
    print(f"  P95:    EUR {np.percentile(revenues, 95):>10,.0f}")
    print(f"  Min:    EUR {revenues.min():>10,.0f}")
    print(f"  Max:    EUR {revenues.max():>10,.0f}")

    # Probability thresholds
    prob_69k = (revenues >= 69000).mean() * 100
    prob_80k = (revenues >= 80000).mean() * 100
    prob_50k = (revenues >= 50000).mean() * 100
    print(f"\n  P(revenue >= EUR 50k/MW): {prob_50k:.1f}%")
    print(f"  P(revenue >= EUR 69k/MW): {prob_69k:.1f}%")
    print(f"  P(revenue >= EUR 80k/MW): {prob_80k:.1f}%")
    print(f"  Value-at-Risk (P10):      EUR {np.percentile(revenues, 10):,.0f}/MW")


if __name__ == '__main__':
    run_simulation(n_scenarios=1000, n_days=365)
About these scripts. These analysis scripts were written to support the Wholesale Market Data & Backtest page and the broader Irish BESS feasibility research. They process real SEM day-ahead price data (Jan 2024 – Dec 2025) and generate the quantitative outputs displayed throughout the webapp. Dependencies: Python 3.10+, NumPy, SciPy.