Click any script below to expand its full source code with syntax highlighting. Each script is self-contained and documented with docstrings.
analyze_24month_spreads.py
Best-N-hours spread analysis across 24 months of SEM day-ahead prices.
Computes daily achievable arbitrage spreads for 1h, 2h, and 4h batteries,
revenue concentration distributions, and forward scenario modelling
(wind growth, Celtic Interconnector, BESS fleet expansion, data centre demand).
analyze_24month_spreads.py
Best-N-hours spread analysis across 24 months of SEM day-ahead prices.
Computes daily achievable arbitrage spreads for 1h, 2h, and 4h batteries,
revenue concentration distributions, and forward scenario modelling
(wind growth, Celtic Interconnector, BESS fleet expansion, data centre demand).
#!/usr/bin/env python3
"""
24-month SEM price analysis for BESS arbitrage.
Computes daily achievable spreads, revenue distribution, and forward scenarios.
Input: 4 JSON files from Energy-Charts API (H1 2024, H2 2024, H1 2025, H2 2025)
Output: JSON for webapp + printed summary statistics
"""
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from collections import defaultdict
import math
DATA_DIR = "/home/nicky/research/energy-markets/data"
OUTPUT_DIR = "/home/nicky/research/energy-markets/data"
FILES = [
"sem_2024_h1.json",
"sem_2024_h2.json",
"sem_2025_h1.json",
"sem_2025_h2.json",
]
def load_all_data():
"""Load all price files and aggregate to hourly."""
hourly = {} # {(date, hour): [prices]}
for fname in FILES:
path = os.path.join(DATA_DIR, fname)
with open(path) as f:
data = json.load(f)
for ts, price in zip(data["unix_seconds"], data["price"]):
if price is None:
continue
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
key = (dt.strftime("%Y-%m-%d"), dt.hour)
if key not in hourly:
hourly[key] = []
hourly[key].append(price)
# Average sub-hourly data to get one price per hour
hourly_avg = {}
for key, prices in hourly.items():
hourly_avg[key] = sum(prices) / len(prices)
return hourly_avg
def group_by_day(hourly_data):
"""Group hourly prices into daily arrays."""
days = defaultdict(dict)
for (date_str, hour), price in hourly_data.items():
days[date_str][hour] = price
# Only keep days with >= 20 hours of data
result = {}
for date_str in sorted(days):
if len(days[date_str]) >= 20:
prices_24 = [days[date_str].get(h) for h in range(24)]
# Fill missing hours with interpolation
valid_prices = [p for p in prices_24 if p is not None]
if len(valid_prices) >= 20:
mean_p = sum(valid_prices) / len(valid_prices)
prices_24 = [p if p is not None else mean_p for p in prices_24]
result[date_str] = prices_24
return result
def daily_arbitrage(prices_24, duration_hrs, rte=0.85):
"""
Compute achievable daily arbitrage for a battery of given duration.
Strategy: charge during cheapest N hours, discharge during most expensive N.
Returns gross spread (EUR/MWh of capacity) after RTE loss.
"""
n = duration_hrs
sorted_prices = sorted(prices_24)
cheapest_n = sorted_prices[:n]
expensive_n = sorted_prices[-n:]
charge_cost = sum(cheapest_n) / n # avg price per MWh when charging
discharge_revenue = sum(expensive_n) / n # avg price per MWh when discharging
# Account for round-trip efficiency: need to buy 1/RTE MWh to discharge 1 MWh
effective_charge_cost = charge_cost / rte
net_spread = discharge_revenue - effective_charge_cost
return {
"net_spread": net_spread,
"charge_avg": charge_cost,
"discharge_avg": discharge_revenue,
"gross_spread": discharge_revenue - charge_cost,
}
def percentile(data, pct):
"""Compute percentile of a sorted list."""
if not data:
return 0
k = (len(data) - 1) * pct / 100
f = math.floor(k)
c = math.ceil(k)
if f == c:
return data[int(k)]
return data[f] * (c - k) + data[c] * (k - f)
def analyze_spreads(daily_data, duration_hrs=4, rte=0.85):
"""Full analysis of daily spreads."""
results = []
for date_str, prices in sorted(daily_data.items()):
arb = daily_arbitrage(prices, duration_hrs, rte)
dt = datetime.strptime(date_str, "%Y-%m-%d")
results.append({
"date": date_str,
"month": dt.strftime("%Y-%m"),
"weekday": dt.weekday(),
"is_weekend": dt.weekday() >= 5,
"daily_mean": sum(prices) / len(prices),
"daily_min": min(prices),
"daily_max": max(prices),
"max_min_spread": max(prices) - min(prices),
"net_spread": arb["net_spread"],
"gross_spread": arb["gross_spread"],
"charge_avg": arb["charge_avg"],
"discharge_avg": arb["discharge_avg"],
"has_negative": any(p < 0 for p in prices),
"negative_hours": sum(1 for p in prices if p < 0),
})
return results
def revenue_distribution(daily_results, power_mw=50, duration_hrs=4):
"""
Compute revenue distribution for a specific battery configuration.
Shows what % of annual revenue comes from the top X% of days.
"""
capacity_mwh = power_mw * duration_hrs
daily_revenues = []
for d in daily_results:
# Revenue for one cycle: net_spread * capacity
rev = max(0, d["net_spread"]) * capacity_mwh
daily_revenues.append({"date": d["date"], "revenue": rev})
daily_revenues.sort(key=lambda x: x["revenue"], reverse=True)
total = sum(r["revenue"] for r in daily_revenues)
# Compute cumulative revenue by top N%
cumulative = []
running = 0
for i, r in enumerate(daily_revenues):
running += r["revenue"]
pct_days = (i + 1) / len(daily_revenues) * 100
pct_revenue = running / total * 100 if total > 0 else 0
if (i + 1) in [1, 5, 10, 20, 36, 50, 73, 100, 183, 365, len(daily_revenues)]:
cumulative.append({
"days": i + 1,
"pct_days": round(pct_days, 1),
"pct_revenue": round(pct_revenue, 1),
"daily_rev_eur": round(r["revenue"]),
})
return {
"total_annual_eur": round(total),
"per_mw_eur": round(total / power_mw) if power_mw else 0,
"per_mwh_eur": round(total / capacity_mwh) if capacity_mwh else 0,
"profitable_days": sum(1 for d in daily_results if d["net_spread"] > 0),
"total_days": len(daily_results),
"cumulative": cumulative,
}
def forward_scenarios(daily_results, daily_data):
"""
Model how projected supply/demand changes affect the spread distribution.
Scenarios:
1. Baseline (current data as-is)
2. +5 GW wind (2030): more low-price hours, more negatives
3. Celtic Interconnector: peaks capped at ~EUR 150 (French nuclear + congestion)
4. +2 GW BESS fleet: spreads compressed ~30% (GB precedent)
5. Data centre demand +30%: raises floor prices
6. Combined 2028: Celtic + 1.5 GW BESS + moderate wind growth
7. Combined 2030: Celtic + 3 GW BESS + high wind + data centres
"""
scenarios = {}
# Helper: apply transformation to all daily price arrays and recompute spreads
def apply_and_analyze(transform_fn, label):
transformed = {}
for date_str, prices in daily_data.items():
transformed[date_str] = transform_fn(list(prices), date_str)
results = analyze_spreads(transformed, duration_hrs=4, rte=0.85)
spreads = sorted([d["net_spread"] for d in results])
profitable = [s for s in spreads if s > 0]
return {
"label": label,
"median_spread": round(percentile(spreads, 50), 1),
"p25_spread": round(percentile(spreads, 25), 1),
"p75_spread": round(percentile(spreads, 75), 1),
"p90_spread": round(percentile(spreads, 90), 1),
"mean_spread": round(sum(spreads) / len(spreads), 1) if spreads else 0,
"profitable_days_pct": round(len(profitable) / len(spreads) * 100, 1),
"negative_price_days": sum(1 for d in results if d["has_negative"]),
"annual_revenue_per_mw": round(
sum(max(0, s) * 4 for s in spreads) / 2 # /2 for annualized from 24mo
),
}
# 1. Baseline
scenarios["baseline"] = apply_and_analyze(lambda p, d: p, "Current (2024-2025)")
# 2. More wind: on ~40% of days, reduce prices by 15-30% during off-peak
def more_wind(prices, date_str):
# Simulate higher wind penetration: depress low-price hours further
import hashlib
seed = int(hashlib.md5(date_str.encode()).hexdigest()[:8], 16)
is_windy = (seed % 100) < 45 # 45% of days are windy (up from ~32%)
if is_windy:
return [p * 0.75 if p < 80 else p * 0.90 for p in prices]
return prices
scenarios["more_wind"] = apply_and_analyze(more_wind, "+5 GW Wind (2030)")
# 3. Celtic Interconnector: cap peaks at ~EUR 150
def celtic(prices, date_str):
return [min(p, 150) for p in prices]
scenarios["celtic"] = apply_and_analyze(celtic, "Celtic Interconnector (2028)")
# 4. +2 GW BESS: compress spreads ~30%
def more_bess(prices, date_str):
mean = sum(prices) / len(prices)
return [mean + (p - mean) * 0.70 for p in prices]
scenarios["bess_2gw"] = apply_and_analyze(more_bess, "+2 GW BESS Fleet")
# 5. Data centre demand: raises floor by ~EUR 10
def datacentre(prices, date_str):
return [p + 10 if p < 80 else p + 5 for p in prices]
scenarios["datacentre"] = apply_and_analyze(datacentre, "+30% Data Centre Demand")
# 6. Combined 2028: Celtic + 1.5 GW BESS + moderate wind
def combined_2028(prices, date_str):
# Celtic cap
prices = [min(p, 150) for p in prices]
# 1.5 GW BESS: ~20% spread compression
mean = sum(prices) / len(prices)
prices = [mean + (p - mean) * 0.80 for p in prices]
# Moderate wind increase
import hashlib
seed = int(hashlib.md5(date_str.encode()).hexdigest()[:8], 16)
if (seed % 100) < 38:
prices = [p * 0.85 if p < 80 else p * 0.95 for p in prices]
# Data centre uplift
prices = [p + 5 for p in prices]
return prices
scenarios["combined_2028"] = apply_and_analyze(combined_2028, "Combined 2028 Projection")
# 7. Combined 2030
def combined_2030(prices, date_str):
# Celtic cap
prices = [min(p, 140) for p in prices]
# 3 GW BESS: ~40% spread compression
mean = sum(prices) / len(prices)
prices = [mean + (p - mean) * 0.60 for p in prices]
# High wind
import hashlib
seed = int(hashlib.md5(date_str.encode()).hexdigest()[:8], 16)
if (seed % 100) < 50:
prices = [p * 0.70 if p < 80 else p * 0.85 for p in prices]
# Higher data centre demand
prices = [p + 8 for p in prices]
return prices
scenarios["combined_2030"] = apply_and_analyze(combined_2030, "Combined 2030 Projection")
return scenarios
def main():
print("=" * 70)
print("24-Month SEM Price Analysis for BESS Arbitrage")
print("Period: January 2024 - December 2025")
print("=" * 70)
print()
# Load and process data
print("Loading data...")
hourly = load_all_data()
print(f" {len(hourly)} hourly price points loaded")
daily_data = group_by_day(hourly)
print(f" {len(daily_data)} complete days")
# Date range
dates = sorted(daily_data.keys())
print(f" Range: {dates[0]} to {dates[-1]}")
print()
# Analyze for different battery durations
for dur in [1, 2, 4]:
print(f"{'=' * 70}")
print(f" {dur}-HOUR BATTERY ANALYSIS (85% RTE)")
print(f"{'=' * 70}")
results = analyze_spreads(daily_data, duration_hrs=dur, rte=0.85)
# Net spread statistics
net_spreads = sorted([d["net_spread"] for d in results])
gross_spreads = sorted([d["gross_spread"] for d in results])
profitable = [s for s in net_spreads if s > 0]
negative_days = [d for d in results if d["has_negative"]]
print(f"\n Daily Achievable Spread (EUR/MWh, net of 85% RTE):")
print(f" Mean: EUR {sum(net_spreads)/len(net_spreads):6.1f}/MWh")
print(f" Median: EUR {percentile(net_spreads, 50):6.1f}/MWh")
print(f" P25: EUR {percentile(net_spreads, 25):6.1f}/MWh")
print(f" P75: EUR {percentile(net_spreads, 75):6.1f}/MWh")
print(f" P90: EUR {percentile(net_spreads, 90):6.1f}/MWh")
print(f" P95: EUR {percentile(net_spreads, 95):6.1f}/MWh")
print(f" Max: EUR {max(net_spreads):6.1f}/MWh")
print(f" Min: EUR {min(net_spreads):6.1f}/MWh")
print(f"\n Profitability:")
print(f" Profitable days: {len(profitable)}/{len(net_spreads)} ({len(profitable)/len(net_spreads)*100:.1f}%)")
print(f" Days with negative prices: {len(negative_days)} ({len(negative_days)/len(results)*100:.1f}%)")
if dur == 4:
# Revenue distribution for 50 MW / 200 MWh
print(f"\n Revenue Distribution (50 MW / 200 MWh, annualized from 24mo):")
rev = revenue_distribution(results, power_mw=50, duration_hrs=4)
print(f" Total 24-month revenue: EUR {rev['total_annual_eur']:,}")
print(f" Annualized per MW: EUR {rev['per_mw_eur']:,}/MW/yr")
print(f" Annualized per MWh: EUR {rev['per_mwh_eur']:,}/MWh/yr")
print(f"\n Revenue concentration:")
for c in rev["cumulative"]:
print(f" Top {c['pct_days']:5.1f}% of days ({c['days']:3d} days): "
f"{c['pct_revenue']:5.1f}% of revenue "
f"(worst day in group: EUR {c['daily_rev_eur']:,})")
# Monthly breakdown
print(f"\n Monthly Median Spread (EUR/MWh):")
monthly = defaultdict(list)
for d in results:
monthly[d["month"]].append(d["net_spread"])
for month in sorted(monthly):
spreads = sorted(monthly[month])
med = percentile(spreads, 50)
bar = "#" * max(0, int(med / 2))
print(f" {month}: {med:6.1f} {bar}")
# Weekday vs weekend
wd_spreads = [d["net_spread"] for d in results if not d["is_weekend"]]
we_spreads = [d["net_spread"] for d in results if d["is_weekend"]]
print(f"\n Weekday vs Weekend:")
print(f" Weekday mean spread: EUR {sum(wd_spreads)/len(wd_spreads):.1f}/MWh ({len(wd_spreads)} days)")
print(f" Weekend mean spread: EUR {sum(we_spreads)/len(we_spreads):.1f}/MWh ({len(we_spreads)} days)")
# Forward scenarios (4hr battery)
print(f"\n{'=' * 70}")
print(f" FORWARD SCENARIO ANALYSIS (4-hour battery, 50 MW)")
print(f"{'=' * 70}")
scenarios = forward_scenarios(
analyze_spreads(daily_data, duration_hrs=4, rte=0.85),
daily_data
)
print(f"\n {'Scenario':<30s} {'Median':>8s} {'Mean':>8s} {'P75':>8s} {'P90':>8s} {'Prof%':>7s} {'Rev/MW':>10s}")
print(f" {'-'*30} {'-'*8} {'-'*8} {'-'*8} {'-'*8} {'-'*7} {'-'*10}")
for key, s in scenarios.items():
print(f" {s['label']:<30s} {s['median_spread']:>7.1f} {s['mean_spread']:>7.1f} "
f"{s['p75_spread']:>7.1f} {s['p90_spread']:>7.1f} "
f"{s['profitable_days_pct']:>6.1f}% {s['annual_revenue_per_mw']:>9,}")
# Export data for webapp
print(f"\n\nExporting webapp data...")
results_4hr = analyze_spreads(daily_data, duration_hrs=4, rte=0.85)
# Build histogram data
spreads_for_hist = [d["net_spread"] for d in results_4hr]
bin_width = 10
min_bin = int(min(spreads_for_hist) / bin_width) * bin_width - bin_width
max_bin = int(max(spreads_for_hist) / bin_width) * bin_width + bin_width * 2
bins = list(range(min_bin, max_bin, bin_width))
hist = [0] * len(bins)
for s in spreads_for_hist:
for i, b in enumerate(bins):
if b <= s < b + bin_width:
hist[i] += 1
break
# Monthly summary
monthly_summary = {}
monthly_data = defaultdict(list)
for d in results_4hr:
monthly_data[d["month"]].append(d)
for month in sorted(monthly_data):
days = monthly_data[month]
spreads = [d["net_spread"] for d in days]
monthly_summary[month] = {
"median_spread": round(percentile(sorted(spreads), 50), 1),
"mean_spread": round(sum(spreads) / len(spreads), 1),
"mean_price": round(sum(d["daily_mean"] for d in days) / len(days), 1),
"profitable_pct": round(sum(1 for s in spreads if s > 0) / len(spreads) * 100, 1),
"negative_price_days": sum(1 for d in days if d["has_negative"]),
"days": len(days),
}
webapp_data = {
"period": f"{dates[0]} to {dates[-1]}",
"total_days": len(results_4hr),
"total_hours": len(hourly),
"battery_config": "4-hour, 85% RTE",
"histogram": {"bins": bins, "counts": hist, "bin_width": bin_width},
"monthly": monthly_summary,
"scenarios": scenarios,
"revenue_50mw": revenue_distribution(results_4hr, power_mw=50, duration_hrs=4),
"summary": {
"mean_spread": round(sum(spreads_for_hist) / len(spreads_for_hist), 1),
"median_spread": round(percentile(sorted(spreads_for_hist), 50), 1),
"p25": round(percentile(sorted(spreads_for_hist), 25), 1),
"p75": round(percentile(sorted(spreads_for_hist), 75), 1),
"p90": round(percentile(sorted(spreads_for_hist), 90), 1),
"p95": round(percentile(sorted(spreads_for_hist), 95), 1),
"profitable_pct": round(len([s for s in spreads_for_hist if s > 0]) / len(spreads_for_hist) * 100, 1),
},
"durations": {},
}
# Add 1hr and 2hr summaries
for dur in [1, 2, 4]:
res = analyze_spreads(daily_data, duration_hrs=dur, rte=0.85)
spreads = sorted([d["net_spread"] for d in res])
webapp_data["durations"][f"{dur}hr"] = {
"mean_spread": round(sum(spreads) / len(spreads), 1),
"median_spread": round(percentile(spreads, 50), 1),
"p75": round(percentile(spreads, 75), 1),
"p90": round(percentile(spreads, 90), 1),
"profitable_pct": round(len([s for s in spreads if s > 0]) / len(spreads) * 100, 1),
"annual_rev_per_mw": round(sum(max(0, s) * dur for s in spreads) / 2),
}
output_path = os.path.join(OUTPUT_DIR, "spread_analysis_24month.json")
with open(output_path, "w") as f:
json.dump(webapp_data, f, indent=2)
print(f" Written to {output_path}")
if __name__ == "__main__":
main()
backtest_battery.py
LP-optimised perfect foresight backtest and fixed-schedule simulation for a
50 MW / 200 MWh battery in the Irish SEM. Solves a daily linear programme
for optimal charge/discharge decisions, then compares against a naive
fixed-window strategy (charge 02:00–05:59, discharge 17:00–20:59).
backtest_battery.py
LP-optimised perfect foresight backtest and fixed-schedule simulation for a
50 MW / 200 MWh battery in the Irish SEM. Solves a daily linear programme
for optimal charge/discharge decisions, then compares against a naive
fixed-window strategy (charge 02:00–05:59, discharge 17:00–20:59).
#!/usr/bin/env python3
"""
Battery backtest: simulate a 50 MW / 200 MWh battery in the Irish SEM.
Two strategies:
1. Perfect foresight (PF): LP-optimized daily arbitrage knowing all 24 hourly prices.
2. Fixed schedule (FS): charge hours 2-5 (02:00-05:59), discharge hours 17-20
(17:00-20:59). No foresight required.
SoC resets to SOC_MIN at the start of each day. This is standard for daily
arbitrage backtests -- the battery fully cycles each day.
RTE convention (per user spec): RTE is applied on charge.
- Charging 50 MWh from grid stores 50 * 0.85 = 42.5 MWh in the battery.
- Discharging 50 MWh from the battery delivers 50 MWh to the grid.
Output: data/backtest_results.csv
"""
import csv
import sys
import numpy as np
from scipy.optimize import linprog
from collections import defaultdict
# --- Battery parameters -----------------------------------------------------------
POWER_MW = 50
CAPACITY_MWH = 200
SOC_MIN_FRAC = 0.05
SOC_MAX_FRAC = 0.95
SOC_MIN = CAPACITY_MWH * SOC_MIN_FRAC # 10 MWh
SOC_MAX = CAPACITY_MWH * SOC_MAX_FRAC # 190 MWh
USABLE_MWH = SOC_MAX - SOC_MIN # 180 MWh
RTE = 0.85
# Fixed schedule hours (clock hours, UTC)
CHARGE_HOURS = {2, 3, 4, 5} # 02:00-05:59
DISCHARGE_HOURS = {17, 18, 19, 20} # 17:00-20:59
def load_hourly_data(path):
"""Load hourly price data, grouped by date. Returns dict[date] -> [(hour, price)]."""
days = defaultdict(list)
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
date = row['timestamp_utc'][:10]
hour = int(row['hour'])
price = float(row['price_eur_mwh'])
days[date].append((hour, price))
for date in days:
days[date].sort()
return days
def perfect_foresight_lp(prices_arr):
"""
Solve daily arbitrage LP with perfect knowledge of hourly prices.
Variables: x = [c_0 .. c_{n-1}, d_0 .. d_{n-1}]
c_h = charge power drawn from grid in hour h (MW, 0 to POWER_MW)
d_h = discharge power delivered to grid in hour h (MW, 0 to POWER_MW)
Objective (maximize):
sum_h price_h * (d_h - c_h)
SoC evolution:
SoC[0] = SOC_MIN
SoC[h+1] = SoC[h] + RTE * c_h - d_h
Constraints:
SOC_MIN <= SoC[h] <= SOC_MAX for h = 1..n
(equivalently, 0 <= cumulative_energy[h] <= USABLE_MWH)
where cumulative_energy[h] = RTE * sum(c[0..h-1]) - sum(d[0..h-1])
"""
n = len(prices_arr)
if n == 0:
return None
# Objective: minimize sum(price * c) - sum(price * d) [i.e. negative of revenue]
c_obj = np.concatenate([prices_arr, -prices_arr])
# Variable bounds
bounds = [(0, POWER_MW)] * (2 * n)
# SoC constraints via cumulative energy
# For each step k (0..n-1), after processing hour k:
# cum[k] = RTE * sum(c[0..k]) - sum(d[0..k])
#
# Upper: cum[k] <= USABLE_MWH -> RTE*sum(c) - sum(d) <= USABLE_MWH
# Lower: cum[k] >= 0 -> -RTE*sum(c) + sum(d) <= 0
A_ub = np.zeros((2 * n, 2 * n))
b_ub = np.zeros(2 * n)
for k in range(n):
# Upper SoC: RTE * sum(c[0..k]) - sum(d[0..k]) <= USABLE_MWH
A_ub[k, :k + 1] = RTE
A_ub[k, n:n + k + 1] = -1.0
b_ub[k] = USABLE_MWH
# Lower SoC: -RTE * sum(c[0..k]) + sum(d[0..k]) <= 0
A_ub[n + k, :k + 1] = -RTE
A_ub[n + k, n:n + k + 1] = 1.0
b_ub[n + k] = 0.0
result = linprog(c_obj, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
if not result.success:
return None
charge = result.x[:n]
discharge = result.x[n:]
mwh_charged = float(np.sum(charge))
mwh_discharged = float(np.sum(discharge))
revenue = float(np.sum(prices_arr * (discharge - charge)))
return {
'revenue': revenue,
'mwh_charged': mwh_charged,
'mwh_discharged': mwh_discharged,
}
def fixed_schedule_sim(hours, prices):
"""
Simulate fixed schedule: charge during CHARGE_HOURS, discharge during
DISCHARGE_HOURS. Processes hours in chronological order, respecting SoC limits.
"""
hour_price = dict(zip(hours, prices))
soc = SOC_MIN
total_revenue = 0.0
mwh_charged = 0.0
mwh_discharged = 0.0
# Walk through all 24 clock hours in order
for h in range(24):
if h not in hour_price:
continue
price = hour_price[h]
if h in CHARGE_HOURS:
# Charge: draw from grid, store RTE fraction
max_store = SOC_MAX - soc # headroom in battery
energy_stored = min(POWER_MW * RTE, max_store) # MWh added to SoC
if energy_stored > 0:
grid_draw = energy_stored / RTE # MWh drawn from grid
soc += energy_stored
mwh_charged += grid_draw
total_revenue -= price * grid_draw
elif h in DISCHARGE_HOURS:
# Discharge: deliver to grid, deplete SoC
available = soc - SOC_MIN # MWh above min SoC
energy_delivered = min(POWER_MW, available) # MWh to grid
if energy_delivered > 0:
soc -= energy_delivered
mwh_discharged += energy_delivered
total_revenue += price * energy_delivered
return {
'revenue': total_revenue,
'mwh_charged': mwh_charged,
'mwh_discharged': mwh_discharged,
}
def main():
print("Loading hourly price data...")
days = load_hourly_data('data/sem_complete_hourly.csv')
dates = sorted(days.keys())
print(f"Loaded {len(dates)} days ({dates[0]} to {dates[-1]})")
results = []
lp_failures = 0
for i, date in enumerate(dates):
day_data = days[date]
hours = [h for h, _ in day_data]
prices = [p for _, p in day_data]
if len(hours) < 12:
continue
prices_arr = np.array(prices)
# Perfect foresight (LP)
pf = perfect_foresight_lp(prices_arr)
if pf is None:
lp_failures += 1
continue
# Fixed schedule (simulation)
fs = fixed_schedule_sim(hours, prices)
row = {
'date': date,
'year': int(date[:4]),
'month': int(date[5:7]),
'n_hours': len(hours),
'pf_revenue': round(pf['revenue'], 2),
'pf_mwh_charged': round(pf['mwh_charged'], 2),
'pf_mwh_discharged': round(pf['mwh_discharged'], 2),
'pf_cycles': round(pf['mwh_discharged'] / USABLE_MWH, 4),
'fs_revenue': round(fs['revenue'], 2),
'fs_mwh_charged': round(fs['mwh_charged'], 2),
'fs_mwh_discharged': round(fs['mwh_discharged'], 2),
'fs_cycles': round(fs['mwh_discharged'] / USABLE_MWH, 4),
}
results.append(row)
if (i + 1) % 500 == 0:
print(f" Processed {i + 1}/{len(dates)} days...")
print(f"\nProcessed {len(results)} days ({lp_failures} LP failures skipped)")
# Write CSV
fieldnames = [
'date', 'year', 'month', 'n_hours',
'pf_revenue', 'pf_mwh_charged', 'pf_mwh_discharged', 'pf_cycles',
'fs_revenue', 'fs_mwh_charged', 'fs_mwh_discharged', 'fs_cycles',
]
with open('data/backtest_results.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
# Print year-by-year summary
print("\n-- Year-by-year summary --")
print(f"{'Year':>6} {'Days':>5} {'PF Revenue':>14} {'FS Revenue':>14} {'Capture':>8}")
for year in sorted(set(r['year'] for r in results)):
yr = [r for r in results if r['year'] == year]
pf_total = sum(r['pf_revenue'] for r in yr)
fs_total = sum(r['fs_revenue'] for r in yr)
n_days = len(yr)
capture = (fs_total / pf_total * 100) if pf_total > 0 else 0
print(f"{year:>6} {n_days:>5} EUR{pf_total:>12,.0f} EUR{fs_total:>12,.0f} {capture:>7.1f}%")
# Sanity checks
print("\n-- Sanity checks --")
sample = results[0]
print(f"First day ({sample['date']}): PF EUR{sample['pf_revenue']:.0f}, "
f"FS EUR{sample['fs_revenue']:.0f}")
neg_pf = sum(1 for r in results if r['pf_revenue'] < 0)
neg_fs = sum(1 for r in results if r['fs_revenue'] < 0)
print(f"Negative revenue days: PF={neg_pf}, FS={neg_fs}")
avg_pf_cycles = np.mean([r['pf_cycles'] for r in results])
avg_fs_cycles = np.mean([r['fs_cycles'] for r in results])
print(f"Avg cycles/day: PF={avg_pf_cycles:.2f}, FS={avg_fs_cycles:.2f}")
print(f"Max PF cycles in a day: {max(r['pf_cycles'] for r in results):.2f}")
print("\nDone. Output: data/backtest_results.csv")
if __name__ == '__main__':
main()
backtest_duration_comparison.py
Head-to-head comparison of 2-hour vs 4-hour battery configurations using
identical methodology. Both run at 50 MW with 85% RTE and 5–95% SoC limits.
Produces per-year EUR/MW revenue breakdowns and capture-ratio comparisons
to quantify the marginal value of the extra 2 hours of duration.
backtest_duration_comparison.py
Head-to-head comparison of 2-hour vs 4-hour battery configurations using
identical methodology. Both run at 50 MW with 85% RTE and 5–95% SoC limits.
Produces per-year EUR/MW revenue breakdowns and capture-ratio comparisons
to quantify the marginal value of the extra 2 hours of duration.
#!/usr/bin/env python3
"""
Battery duration comparison backtest: 2-hour vs 4-hour in the Irish SEM.
Runs the SAME methodology for both configurations:
- 50 MW / 100 MWh (2-hour) -- same power, half the energy
- 50 MW / 200 MWh (4-hour) -- existing reference case
Both with: 85% RTE, 5-95% SoC limits, perfect foresight + fixed schedule strategies.
Fixed schedules:
2h: charge 03:00-04:59, discharge 17:00-18:59
4h: charge 02:00-05:59, discharge 17:00-20:59
RTE convention: applied on charge. Drawing X MWh from grid stores X * 0.85 in battery.
SoC resets to minimum at the start of each day.
Output: data/backtest_2h_results.csv (2h), data/backtest_4h_results.csv (4h)
"""
import csv
import sys
import numpy as np
from scipy.optimize import linprog
from collections import defaultdict
# --- Common parameters ------------------------------------------------------------
POWER_MW = 50
RTE = 0.85
SOC_MIN_FRAC = 0.05
SOC_MAX_FRAC = 0.95
# --- Configuration per duration ---------------------------------------------------
CONFIGS = {
'2h': {
'capacity_mwh': 100,
'charge_hours': {3, 4}, # 03:00-04:59
'discharge_hours': {17, 18}, # 17:00-18:59
},
'4h': {
'capacity_mwh': 200,
'charge_hours': {2, 3, 4, 5}, # 02:00-05:59
'discharge_hours': {17, 18, 19, 20}, # 17:00-20:59
},
}
def load_hourly_data(path):
"""Load hourly price data, grouped by date. Returns dict[date] -> [(hour, price)]."""
days = defaultdict(list)
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
date = row['timestamp_utc'][:10]
hour = int(row['hour'])
price = float(row['price_eur_mwh'])
days[date].append((hour, price))
for date in days:
days[date].sort()
return days
def perfect_foresight_lp(prices_arr, capacity_mwh):
"""
Solve daily arbitrage LP with perfect knowledge of hourly prices.
Same formulation as backtest_battery.py but parameterised on capacity.
"""
n = len(prices_arr)
if n == 0:
return None
soc_min = capacity_mwh * SOC_MIN_FRAC
soc_max = capacity_mwh * SOC_MAX_FRAC
usable = soc_max - soc_min
# Objective: minimize cost = sum(price * charge) - sum(price * discharge)
c_obj = np.concatenate([prices_arr, -prices_arr])
# Variable bounds: charge and discharge each 0..POWER_MW
bounds = [(0, POWER_MW)] * (2 * n)
# SoC constraints via cumulative energy
A_ub = np.zeros((2 * n, 2 * n))
b_ub = np.zeros(2 * n)
for k in range(n):
# Upper: RTE * sum(c[0..k]) - sum(d[0..k]) <= usable
A_ub[k, :k + 1] = RTE
A_ub[k, n:n + k + 1] = -1.0
b_ub[k] = usable
# Lower: -RTE * sum(c[0..k]) + sum(d[0..k]) <= 0
A_ub[n + k, :k + 1] = -RTE
A_ub[n + k, n:n + k + 1] = 1.0
b_ub[n + k] = 0.0
result = linprog(c_obj, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
if not result.success:
return None
charge = result.x[:n]
discharge = result.x[n:]
return {
'revenue': float(np.sum(prices_arr * (discharge - charge))),
'mwh_charged': float(np.sum(charge)),
'mwh_discharged': float(np.sum(discharge)),
'usable': usable,
}
def fixed_schedule_sim(hours, prices, capacity_mwh, charge_hours, discharge_hours):
"""
Simulate fixed schedule: charge during charge_hours, discharge during
discharge_hours. Processes hours in chronological order, respecting SoC limits.
"""
soc_min = capacity_mwh * SOC_MIN_FRAC
soc_max = capacity_mwh * SOC_MAX_FRAC
usable = soc_max - soc_min
hour_price = dict(zip(hours, prices))
soc = soc_min
total_revenue = 0.0
mwh_charged = 0.0
mwh_discharged = 0.0
for h in range(24):
if h not in hour_price:
continue
price = hour_price[h]
if h in charge_hours:
max_store = soc_max - soc
energy_stored = min(POWER_MW * RTE, max_store)
if energy_stored > 0:
grid_draw = energy_stored / RTE
soc += energy_stored
mwh_charged += grid_draw
total_revenue -= price * grid_draw
elif h in discharge_hours:
available = soc - soc_min
energy_delivered = min(POWER_MW, available)
if energy_delivered > 0:
soc -= energy_delivered
mwh_discharged += energy_delivered
total_revenue += price * energy_delivered
return {
'revenue': total_revenue,
'mwh_charged': mwh_charged,
'mwh_discharged': mwh_discharged,
'usable': usable,
}
def run_backtest(days, dates, config_name, config):
"""Run backtest for a single configuration. Returns list of daily result dicts."""
capacity = config['capacity_mwh']
charge_hrs = config['charge_hours']
discharge_hrs = config['discharge_hours']
usable = capacity * (SOC_MAX_FRAC - SOC_MIN_FRAC)
results = []
lp_failures = 0
for i, date in enumerate(dates):
day_data = days[date]
hours = [h for h, _ in day_data]
prices = [p for _, p in day_data]
if len(hours) < 12:
continue
prices_arr = np.array(prices)
# Perfect foresight
pf = perfect_foresight_lp(prices_arr, capacity)
if pf is None:
lp_failures += 1
continue
# Fixed schedule
fs = fixed_schedule_sim(hours, prices, capacity, charge_hrs, discharge_hrs)
row = {
'date': date,
'year': int(date[:4]),
'month': int(date[5:7]),
'duration': config_name,
'capacity_mwh': capacity,
'n_hours': len(hours),
'pf_revenue': round(pf['revenue'], 2),
'pf_mwh_charged': round(pf['mwh_charged'], 2),
'pf_mwh_discharged': round(pf['mwh_discharged'], 2),
'pf_cycles': round(pf['mwh_discharged'] / usable, 4),
'fs_revenue': round(fs['revenue'], 2),
'fs_mwh_charged': round(fs['mwh_charged'], 2),
'fs_mwh_discharged': round(fs['mwh_discharged'], 2),
'fs_cycles': round(fs['mwh_discharged'] / usable, 4),
}
results.append(row)
if (i + 1) % 500 == 0:
print(f" [{config_name}] Processed {i + 1}/{len(dates)} days...")
print(f" [{config_name}] Done: {len(results)} days ({lp_failures} LP failures)")
return results
def write_csv(results, path):
"""Write results to CSV."""
fieldnames = [
'date', 'year', 'month', 'duration', 'capacity_mwh', 'n_hours',
'pf_revenue', 'pf_mwh_charged', 'pf_mwh_discharged', 'pf_cycles',
'fs_revenue', 'fs_mwh_charged', 'fs_mwh_discharged', 'fs_cycles',
]
with open(path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f" Written: {path} ({len(results)} rows)")
def print_summary(results, label):
"""Print year-by-year summary for one configuration."""
print(f"\n-- {label} --")
print(f"{'Year':>6} {'Days':>5} {'PF Rev (EUR)':>14} {'FS Rev (EUR)':>14} "
f"{'PF/MW':>10} {'FS/MW':>10} {'Capture':>8} {'PF cyc/d':>9}")
all_years = sorted(set(r['year'] for r in results))
for year in all_years:
yr = [r for r in results if r['year'] == year]
pf_total = sum(r['pf_revenue'] for r in yr)
fs_total = sum(r['fs_revenue'] for r in yr)
n_days = len(yr)
pf_per_mw = pf_total / POWER_MW
fs_per_mw = fs_total / POWER_MW
capture = (fs_total / pf_total * 100) if pf_total > 0 else 0
avg_pf_cycles = sum(r['pf_cycles'] for r in yr) / n_days
# Annualise partial years
ann_factor = 365 / n_days if n_days > 0 else 1
print(f"{year:>6} {n_days:>5} {pf_total:>14,.0f} {fs_total:>14,.0f} "
f"{pf_per_mw:>10,.0f} {fs_per_mw:>10,.0f} {capture:>7.1f}% "
f"{avg_pf_cycles:>8.2f}")
def main():
print("Loading hourly price data...")
days = load_hourly_data('data/sem_complete_hourly.csv')
dates = sorted(days.keys())
print(f"Loaded {len(dates)} days ({dates[0]} to {dates[-1]})")
# Run both configurations
results_2h = run_backtest(days, dates, '2h', CONFIGS['2h'])
results_4h = run_backtest(days, dates, '4h', CONFIGS['4h'])
# Write separate CSVs
write_csv(results_2h, 'data/backtest_2h_results.csv')
write_csv(results_4h, 'data/backtest_4h_results.csv')
# Print summaries
print_summary(results_2h, '2-HOUR (50 MW / 100 MWh)')
print_summary(results_4h, '4-HOUR (50 MW / 200 MWh)')
# Quick comparison
print("\n-- HEAD-TO-HEAD (EUR/MW/year) --")
print(f"{'Year':>6} {'2h PF/MW':>10} {'4h PF/MW':>10} {'2h/4h PF':>9} "
f"{'2h FS/MW':>10} {'4h FS/MW':>10} {'2h/4h FS':>9}")
years_2h = {r['year'] for r in results_2h}
years_4h = {r['year'] for r in results_4h}
for year in sorted(years_2h & years_4h):
yr2 = [r for r in results_2h if r['year'] == year]
yr4 = [r for r in results_4h if r['year'] == year]
pf2 = sum(r['pf_revenue'] for r in yr2) / POWER_MW
pf4 = sum(r['pf_revenue'] for r in yr4) / POWER_MW
fs2 = sum(r['fs_revenue'] for r in yr2) / POWER_MW
fs4 = sum(r['fs_revenue'] for r in yr4) / POWER_MW
ratio_pf = (pf2 / pf4 * 100) if pf4 > 0 else 0
ratio_fs = (fs2 / fs4 * 100) if fs4 > 0 else 0
print(f"{year:>6} {pf2:>10,.0f} {pf4:>10,.0f} {ratio_pf:>8.1f}% "
f"{fs2:>10,.0f} {fs4:>10,.0f} {ratio_fs:>8.1f}%")
print("\nDone.")
if __name__ == '__main__':
main()
monte_carlo_sim.py
1,000-scenario Monte Carlo simulation of annual battery revenue.
Generates synthetic price curves from a fitted regime-switching model
with hourly correlation structure, price spikes, and seasonal patterns.
Dispatches each day using an adaptive day-ahead LP with realistic
~12% MAPE forecast error. Outputs full distributional statistics
including VaR, probability thresholds, and regime breakdowns.
monte_carlo_sim.py
1,000-scenario Monte Carlo simulation of annual battery revenue.
Generates synthetic price curves from a fitted regime-switching model
with hourly correlation structure, price spikes, and seasonal patterns.
Dispatches each day using an adaptive day-ahead LP with realistic
~12% MAPE forecast error. Outputs full distributional statistics
including VaR, probability thresholds, and regime breakdowns.
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
"""
Part 3a: Monte Carlo Battery Revenue Simulation
Generates 1,000 price scenarios and runs battery dispatch on each.
Uses an adaptive day-ahead strategy (not PF, not FS).
Battery spec: 50 MW / 200 MWh / 85% RTE / ~0.85 cycles/day avg
Strategy: Adaptive day-ahead optimization with forecast error
- Uses the true day-ahead prices but adds forecast noise
- Optimizes charge/discharge schedule using noisy forecast
- Executes against actual prices (revenue based on actual)
Output: data/monte_carlo_results.csv
"""
import numpy as np
import json
import csv
import sys
import time
from scipy.optimize import linprog
np.random.seed(42)
DATA_DIR = 'data'
# --- Battery Parameters -----------------------------------------------------------
POWER_MW = 50
CAPACITY_MWH = 200
SOC_MIN_FRAC = 0.05
SOC_MAX_FRAC = 0.95
SOC_MIN = CAPACITY_MWH * SOC_MIN_FRAC # 10 MWh
SOC_MAX = CAPACITY_MWH * SOC_MAX_FRAC # 190 MWh
USABLE_MWH = SOC_MAX - SOC_MIN # 180 MWh
RTE = 0.85
TARGET_CYCLES = 0.85 # average cycles per day
# --- Load Price Model -------------------------------------------------------------
def load_model_params():
with open(f'{DATA_DIR}/price_model_params.json') as f:
return json.load(f)
# --- Price Generation (inline from price_model.py) --------------------------------
def generate_prices(params, n_days=365, seed=None):
"""Generate synthetic daily price curves."""
from scipy.linalg import cholesky
if seed is not None:
rng = np.random.RandomState(seed)
else:
rng = np.random.RandomState()
trans = np.array(params['transition_matrix'])
corr = np.array(params['noise_correlation'])
L = cholesky(corr, lower=True)
# Stationary distribution
eigvals, eigvecs = np.linalg.eig(trans.T)
idx = np.argmin(np.abs(eigvals - 1))
stat_dist = np.real(eigvecs[:, idx])
stat_dist = stat_dist / stat_dist.sum()
regime = rng.choice(3, p=stat_dist)
prev_daily_mean = None
daily_ac = params['daily_autocorrelation']
all_prices = np.zeros((n_days, 24))
all_regimes = np.zeros(n_days, dtype=int)
for d in range(n_days):
if d > 0:
regime = rng.choice(3, p=trans[regime])
all_regimes[d] = regime
month = min(12, (d * 12) // n_days + 1)
dow = d % 7
day_type = 'weekend' if dow >= 5 else 'weekday'
season_map = {1: 'winter', 2: 'winter', 3: 'spring', 4: 'spring',
5: 'spring', 6: 'summer', 7: 'summer', 8: 'summer',
9: 'autumn', 10: 'autumn', 11: 'autumn', 12: 'winter'}
season = season_map[month]
key = f"{regime}_{season}_{day_type}"
profile = params['hourly_profiles'].get(key)
if profile is None:
key = f"{regime}_{season}_weekday"
profile = params['hourly_profiles'][key]
hourly_mean = np.array(profile['mean'])
hourly_std = np.array(profile['std'])
monthly_factor = params['monthly_factors'].get(str(month), 1.0)
z = L @ rng.randn(24)
if prev_daily_mean is not None:
overall_mean = hourly_mean.mean()
daily_innovation = (1 - daily_ac) * overall_mean + daily_ac * prev_daily_mean
mean_shift = daily_innovation - overall_mean
hourly_mean = hourly_mean + mean_shift * 0.3
prices = hourly_mean * monthly_factor + hourly_std * z * 0.6
if rng.random() < params['spike']['daily_probability']:
spike_hour = rng.choice([7, 8, 17, 18, 19], p=[0.15, 0.15, 0.25, 0.25, 0.2])
spike_excess = max(0, rng.normal(
params['spike']['mean_excess'],
params['spike']['std_excess']
))
prices[spike_hour] += spike_excess
if spike_hour > 0:
prices[spike_hour - 1] += spike_excess * 0.4
if spike_hour < 23:
prices[spike_hour + 1] += spike_excess * 0.4
if regime == 0:
for h in [2, 3, 4, 5, 13, 14]:
if rng.random() < 0.05:
prices[h] = rng.normal(params['negative']['mean'],
abs(params['negative']['std']))
prices = np.clip(prices, -500, 4000)
all_prices[d] = prices
prev_daily_mean = prices.mean()
return all_prices, all_regimes
# --- Battery Dispatch: Adaptive Day-Ahead ----------------------------------------
def solve_day_lp(prices_24, power_mw, usable_mwh, rte):
"""Solve daily arbitrage LP given (possibly noisy) prices."""
n = 24
c_obj = np.concatenate([prices_24, -prices_24])
bounds = [(0, power_mw)] * (2 * n)
A_ub = np.zeros((2 * n, 2 * n))
b_ub = np.zeros(2 * n)
for k in range(n):
A_ub[k, :k + 1] = rte
A_ub[k, n:n + k + 1] = -1.0
b_ub[k] = usable_mwh
A_ub[n + k, :k + 1] = -rte
A_ub[n + k, n:n + k + 1] = 1.0
b_ub[n + k] = 0.0
result = linprog(c_obj, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
if not result.success:
return np.zeros(n), np.zeros(n)
return result.x[:n], result.x[n:]
def simulate_battery_day(actual_prices, forecast_error_std=0.12):
"""
Adaptive day-ahead strategy:
1. Generate noisy forecast of day-ahead prices
2. Optimize dispatch against forecast
3. Compute revenue against actual prices
"""
n = 24
actual = np.array(actual_prices)
# Forecast = actual * (1 + noise), noise ~ N(0, forecast_error_std)
# This gives ~12% MAPE which is realistic for DA forecasting
noise = np.random.normal(0, forecast_error_std, n)
forecast = actual * (1 + noise)
# Solve LP using forecast prices
charge, discharge = solve_day_lp(forecast, POWER_MW, USABLE_MWH, RTE)
# Revenue is computed on ACTUAL prices
revenue = float(np.sum(actual * (discharge - charge)))
mwh_charged = float(np.sum(charge))
mwh_discharged = float(np.sum(discharge))
cycles = mwh_discharged / USABLE_MWH
return {
'revenue': revenue,
'mwh_charged': mwh_charged,
'mwh_discharged': mwh_discharged,
'cycles': cycles,
}
# --- Main Simulation Loop --------------------------------------------------------
def run_simulation(n_scenarios=1000, n_days=365):
print("Loading model parameters...")
params = load_model_params()
print(f"Running {n_scenarios} scenarios x {n_days} days = {n_scenarios * n_days:,} days total")
print(f"Battery: {POWER_MW} MW / {CAPACITY_MWH} MWh / {RTE*100:.0f}% RTE")
results = []
t_start = time.time()
for s in range(n_scenarios):
# Generate prices for this scenario
prices, regimes = generate_prices(params, n_days=n_days, seed=s)
# Run battery dispatch for each day
annual_revenue = 0.0
annual_charged = 0.0
annual_discharged = 0.0
annual_cycles = 0.0
daily_revenues = []
daily_spreads = []
for d in range(n_days):
day_result = simulate_battery_day(prices[d])
annual_revenue += day_result['revenue']
annual_charged += day_result['mwh_charged']
annual_discharged += day_result['mwh_discharged']
annual_cycles += day_result['cycles']
daily_revenues.append(day_result['revenue'])
daily_spreads.append(prices[d].max() - prices[d].min())
daily_revenues = np.array(daily_revenues)
daily_spreads = np.array(daily_spreads)
# Compute scenario statistics
mean_daily_price = prices.mean()
mean_daily_spread = daily_spreads.mean()
mean_four_hr_spread = np.mean([
np.sort(prices[d])[-4:].mean() - np.sort(prices[d])[:4].mean()
for d in range(n_days)
])
negative_pct = (prices < 0).mean() * 100
# Regime mix
regime_pcts = [float((regimes == r).mean()) for r in range(3)]
row = {
'scenario': s,
'annual_revenue': round(annual_revenue, 2),
'revenue_per_mw': round(annual_revenue / POWER_MW, 2),
'total_charged_mwh': round(annual_charged, 1),
'total_discharged_mwh': round(annual_discharged, 1),
'avg_cycles_per_day': round(annual_cycles / n_days, 4),
'mean_daily_revenue': round(daily_revenues.mean(), 2),
'std_daily_revenue': round(daily_revenues.std(), 2),
'min_daily_revenue': round(daily_revenues.min(), 2),
'max_daily_revenue': round(daily_revenues.max(), 2),
'negative_revenue_days': int((daily_revenues < 0).sum()),
'mean_price': round(mean_daily_price, 2),
'mean_daily_spread': round(mean_daily_spread, 2),
'mean_four_hr_spread': round(mean_four_hr_spread, 2),
'negative_price_pct': round(negative_pct, 4),
'regime_low_pct': round(regime_pcts[0] * 100, 1),
'regime_normal_pct': round(regime_pcts[1] * 100, 1),
'regime_high_pct': round(regime_pcts[2] * 100, 1),
}
results.append(row)
if (s + 1) % 100 == 0:
elapsed = time.time() - t_start
rate = (s + 1) / elapsed
remaining = (n_scenarios - s - 1) / rate
print(f" Scenario {s+1}/{n_scenarios} "
f"(rev: EUR {annual_revenue:,.0f}, "
f"per MW: EUR {annual_revenue/POWER_MW:,.0f}) "
f"[{elapsed:.0f}s elapsed, ~{remaining:.0f}s remaining]")
elapsed = time.time() - t_start
print(f"\nSimulation complete in {elapsed:.1f}s ({elapsed/n_scenarios:.3f}s per scenario)")
# Write CSV
fieldnames = list(results[0].keys())
with open(f'{DATA_DIR}/monte_carlo_results.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"Saved results to {DATA_DIR}/monte_carlo_results.csv")
# Quick summary
revenues = np.array([r['revenue_per_mw'] for r in results])
print(f"\n{'='*60}")
print(f"REVENUE PER MW SUMMARY ({n_scenarios} scenarios)")
print(f"{'='*60}")
print(f" Mean: EUR {revenues.mean():>10,.0f}")
print(f" Median: EUR {np.median(revenues):>10,.0f}")
print(f" Std: EUR {revenues.std():>10,.0f}")
print(f" P5: EUR {np.percentile(revenues, 5):>10,.0f}")
print(f" P10: EUR {np.percentile(revenues, 10):>10,.0f}")
print(f" P25: EUR {np.percentile(revenues, 25):>10,.0f}")
print(f" P75: EUR {np.percentile(revenues, 75):>10,.0f}")
print(f" P90: EUR {np.percentile(revenues, 90):>10,.0f}")
print(f" P95: EUR {np.percentile(revenues, 95):>10,.0f}")
print(f" Min: EUR {revenues.min():>10,.0f}")
print(f" Max: EUR {revenues.max():>10,.0f}")
# Probability thresholds
prob_69k = (revenues >= 69000).mean() * 100
prob_80k = (revenues >= 80000).mean() * 100
prob_50k = (revenues >= 50000).mean() * 100
print(f"\n P(revenue >= EUR 50k/MW): {prob_50k:.1f}%")
print(f" P(revenue >= EUR 69k/MW): {prob_69k:.1f}%")
print(f" P(revenue >= EUR 80k/MW): {prob_80k:.1f}%")
print(f" Value-at-Risk (P10): EUR {np.percentile(revenues, 10):,.0f}/MW")
if __name__ == '__main__':
run_simulation(n_scenarios=1000, n_days=365)
About these scripts.
These analysis scripts were written to support the
Wholesale Market Data & Backtest page
and the broader Irish BESS feasibility research. They process real SEM day-ahead
price data (Jan 2024 – Dec 2025) and generate the quantitative outputs
displayed throughout the webapp. Dependencies: Python 3.10+, NumPy, SciPy.