-
Notifications
You must be signed in to change notification settings - Fork 218
Open
Labels
Description
Describe the bug
Multiplicative ETS forecasters must be strictly positive, but on some settings during optimisation it can produce negative forecasts, which leads to a numerical error. We can guard against this but the simple solution was not universal.
Comments made here
Steps/Code to reproduce the bug
import numpy as np
import time
import pandas as pd
import traceback
from sklearn.metrics import mean_squared_error
from aeon.forecasting.stats import ETS
import numpy as np
from aeon.datasets import load_airline
import os
from statsforecast import StatsForecast
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.holtwinters import ExponentialSmoothing as SM_ETS
# from statsforecast import StatsForecast
# from statsforecast.models import ETS as SF_ETS
from aeon.forecasting.stats import ETS as AeonETS
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import random
from typing import Optional, Dict, Tuple
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from aeon.forecasting.stats import ETS as AeonETS
from statsforecast import StatsForecast
from statsforecast.models import AutoETS as SF_AutoETS
from statsforecast import StatsForecast
import time
import numpy as np
import pandas as pd
from typing import List
from dataclasses import dataclass
from typing import Optional, List, Tuple
def simulate_random_ets(n: int = 100, sigma: float = 0.1, seed=None):
rng = np.random.default_rng(seed)
trend_type = rng.choice([None, "A", "M"])
season_type = rng.choice([None, "A", "M"])
error_type = rng.choice(["A", "M"])
# choose m only if seasonality exists, and ensure m >= 2
seasonal_choices = [4, 7, 12] # or whatever you want, but all >= 2
m = int(rng.choice(seasonal_choices)) if season_type is not None else None
alpha = float(rng.uniform(0.05, 0.4))
beta = float(rng.uniform(0.01, 0.15)) if trend_type in ("A","M") else 0.0
gamma_max = max(0.05, 1.0 - alpha - 1e-6)
gamma = float(rng.uniform(0.05, min(0.4, gamma_max))) if season_type in ("A","M") else 0.0
phi = float(rng.uniform(0.8, 0.99)) if trend_type in ("A","M") else 1.0
# initial states...
if (error_type == "M") or (trend_type == "M") or (season_type == "M"):
l0, b0, s0 = 1.0, rng.normal(0.0, 0.02) if trend_type else 0.0, None
else:
l0, b0, s0 = float(rng.uniform(0.5, 2.0)), float(rng.normal(0.0, 0.05)) if trend_type=="A" else 0.0, None
cfg = {
"error": error_type,
"trend": {"A":"add", "M":"mul"}.get(trend_type, None),
"seasonal": {"A":"add", "M":"mul"}.get(season_type, None),
"seasonal_periods": m, # None when no season
"alpha": alpha, "beta": beta, "gamma": gamma, "phi": phi,
"l0": l0, "b0": b0, "s0": s0,
}
# final sanity: if seasonal present but m invalid, drop seasonality
if cfg["seasonal"] is not None and (cfg["seasonal_periods"] is None or cfg["seasonal_periods"] < 2):
cfg["seasonal"] = None
cfg["seasonal_periods"] = None
y = _simulate_safe_ets(n=n, cfg=cfg, rng=rng, sigma=sigma)
if (error_type == "M") or (trend_type == "M") or (season_type == "M"):
assert (y > 0).all()
return y, cfg
def _simulate_safe_ets(
n: int,
cfg: dict,
rng: np.random.Generator,
sigma: float = 0.15,
growth_cap: float = 1e6,
) -> np.ndarray:
"""
Simulate one ETS series using a dict config (statsmodels-style).
cfg keys:
error: 'A' or 'M'
trend: 'add' / 'mul' / None
seasonal: 'add' / 'mul' / None
seasonal_periods: int or None
alpha, beta, gamma, phi: floats
l0, b0: floats
s0: Optional[np.ndarray] (len m)
"""
error = cfg["error"] # 'A' or 'M'
trend = cfg["trend"] # 'add'/'mul'/None
seasonal = cfg["seasonal"] # 'add'/'mul'/None
sp_cfg = cfg.get("seasonal_periods", None)
# Seasonality is active only if seasonal set and m >= 2
has_season = (seasonal in ("add", "mul")) and (sp_cfg is not None) and (int(sp_cfg) >= 2)
m = int(sp_cfg) if has_season else 1
alpha = float(cfg["alpha"])
beta = float(cfg["beta"])
gamma = float(cfg["gamma"])
phi = float(cfg["phi"])
l0 = float(cfg["l0"])
b0 = float(cfg["b0"])
s0 = cfg.get("s0", None)
has_trend = trend in ("add", "mul")
any_multiplicative = (error == "M") or (trend == "mul") or (seasonal == "mul")
if any_multiplicative:
# ----- simulate in log-space: x_t = log y_t -----
ell = np.log(l0) if l0 > 0.0 else 0.0
b = b0
if has_season:
if s0 is not None:
s = np.array(s0, dtype=float).copy()
if s.size != m:
s = np.resize(s, m)
else:
s = rng.normal(0.0, 0.2, size=m)
s -= s.mean()
else:
s = np.zeros(1, dtype=float) # dummy
x = np.empty(n, dtype=float)
for t in range(n):
st = s[t % m] if has_season else 0.0
xhat = ell + (phi * b if has_trend else 0.0) + st
e = rng.normal(0.0, sigma)
x[t] = xhat + e
et = x[t] - xhat
ell = ell + (phi * b if has_trend else 0.0) + alpha * et
if has_trend:
b = phi * b + beta * et
if has_season:
i = t % m
s[i] = s[i] + gamma * et
if i == m - 1:
s -= s.mean()
y = np.exp(x) # strictly positive
else:
# ----- purely additive on original scale -----
ell = l0
b = b0 if has_trend else 0.0
if has_season:
if s0 is not None:
s = np.array(s0, dtype=float).copy()
if s.size != m:
s = np.resize(s, m)
s -= s.mean()
else:
s = rng.normal(0.0, 1.0, size=m)
s -= s.mean()
else:
s = np.zeros(1, dtype=float)
y = np.empty(n, dtype=float)
for t in range(n):
st = s[t % m] if has_season else 0.0
yhat = ell + (phi * b if has_trend else 0.0) + st
e = rng.normal(0.0, sigma)
y[t] = yhat + e
et = y[t] - yhat
ell = ell + (phi * b if has_trend else 0.0) + alpha * et
if has_trend:
b = phi * b + beta * et
if has_season:
i = t % m
s[i] = s[i] + gamma * et
if i == m - 1:
s -= s.mean()
# Soft “don’t explode” guard; resample once with smaller sigma if absurd growth
ymin = y.min() if y.size else 0.0
ymax = y.max() if y.size else 0.0
denom = max(abs(ymin), 1e-12)
if (ymax / denom) > growth_cap:
return _simulate_safe_ets(n, cfg, rng, sigma * 0.5, growth_cap)
return y
def _to_sf_code(x):
"""Map our 'add'/'mul'/None to StatsForecast ETS code letters."""
if x is None:
return "N"
x = x.lower()
if x == "add":
return "A"
if x == "mul":
return "M"
# fallback
return "N"
def _to_aeon_component(x):
"""Map our 'add'/'mul'/None to aeon 'additive'/'multiplicative'/None."""
if x is None:
return None
x = x.lower()
if x == "add":
return "additive"
if x == "mul":
return "multiplicative"
return None
def _to_aeon_error(x):
"""'A'/'M' -> 'additive'/'multiplicative'"""
if x is None:
return None
xl = x.lower()
if xl == "a":
return "additive"
if xl == "m":
return "multiplicative"
return None
def run_ets_experiment(
lengths: List[int],
m: int = 5,
sigma: float = 1.0,
seed_base: int = 0,
):
"""
Compare ETS forecasting across statsmodels, aeon (non-auto ETS), and statsforecast AutoETS
using both the 'true' simulated config and a simple 'default' config.
For each series:
- simulate with simulate_random_ets(...)
- fit on y[:-1], predict y[-1]
- record runtime and MSE
Returns
-------
pd.DataFrame with columns:
length, rep,
sm_true_mse, sm_true_time, sm_default_mse, sm_default_time,
aeon_true_mse, aeon_true_time, aeon_default_mse, aeon_default_time,
sf_true_mse, sf_true_time, sf_default_mse, sf_default_time
"""
results = []
mse= []
# Cold start to compile Numba functions
y, cfg = simulate_random_ets(n=100, sigma=sigma, seed=0)
y_train, y_test = y[:-1], y[-1:]
sp_true = cfg["seasonal_periods"] if cfg["seasonal_periods"] is not None else None
t0 = time.time()
sm_model = ExponentialSmoothing(y_train).fit(optimized=True)
# TRUE CONFIG
t0 = time.time()
aeon_model = AeonETS()
aeon_model.fit(y_train)
for n in lengths:
print(f"Series length: {n}")
for rep in range(m):
seed = seed_base + rep + n * 1000
# --- simulate (must be your updated, positive-only simulator) ---
y, cfg = simulate_random_ets(n=n, sigma=sigma, seed=seed)
y = y +2.0
eps = 1e-6
# y_norm = (y - y.min()) / (y.max() - y.min() + eps)
# y_norm = np.clip(y_norm, eps, 1.0)
y_train, y_test = y[:-1], y[-1:]
sp_true = cfg["seasonal_periods"] if cfg["seasonal_periods"] is not None else None
# ===== statsmodels =====
# TRUE CONFIG
t0 = time.time()
sm_model = ExponentialSmoothing(
y_train,
trend=cfg["trend"], # 'add'/'mul'/None
seasonal=cfg["seasonal"], # 'add'/'mul'/None
seasonal_periods=sp_true if sp_true else None, # None if no season
).fit(optimized=True)
sm_forecast = sm_model.forecast(1)[0]
sm_time = time.time() - t0
sm_mse = float((y_test[0] - sm_forecast) ** 2)
if sm_mse > 100.0:
print(f"SM TRUE: high MSE {sm_mse:.4f} for series length {n}, "
f"rep {rep}, true {y_test[0]} forecast {sm_forecast}")
rep = rep-1
continue
# ===== aeon (non-auto ETS) =====
# Map to 'additive'/'multiplicative'/None
aeon_error = _to_aeon_error(cfg["error"])
aeon_trend = _to_aeon_component(cfg["trend"])
aeon_seasonal = _to_aeon_component(cfg["seasonal"])
aeon_sp = sp_true if sp_true else 1 # aeon uses sp >= 1, use 1 when no season
# TRUE CONFIG
t0 = time.time()
aeon_model = AeonETS(
error_type=aeon_error,
trend_type=aeon_trend,
seasonality_type=aeon_seasonal,
seasonal_period=aeon_sp,
)
try:
aeon_model.fit(y_train)
except Exception as e:
print(f"AEON TRUE: fit failed for series length {n}, rep {rep}, "
f"error: {e}, seed {seed}")
# traceback.print_exc()
aeon_forecast = aeon_model.forecast_
aeon_time = time.time() - t0
aeon_mse = float((y_test[0] - aeon_forecast) ** 2)
if aeon_mse > 100.0:
print(f"AEON TRUE: high MSE {aeon_mse:.4f} for series length {n}, "
f"rep {rep}, true {y_test[0]} forecast {aeon_forecast}")
rep = rep-1
continue
# ===== statsforecast (AutoETS with explicit model string) =====
# Build "ET S" code like 'AAA', 'AAN', etc.
E = _to_sf_code(cfg["error"])
T = _to_sf_code(cfg["trend"])
S = _to_sf_code(cfg["seasonal"])
model_str_true = E + T + S
sf_season_len_true = sp_true if sp_true else 1 # must be >= 1
# Prepare data frame for StatsForecast
df_sf = pd.DataFrame({
"unique_id": ["series1"] * len(y_train),
"ds": pd.date_range("2000-01-01", periods=len(y_train), freq="D"),
"y": y_train,
})
# TRUE CONFIG
t0 = time.time()
sf_true = StatsForecast(
models=[SF_AutoETS()],
freq="D",
n_jobs=1,
)
_ = sf_true.fit(df_sf)
fc_df = sf_true.predict(h=1)
# single model: yhat column is the forecast
sf_forecast = fc_df["AutoETS"].values[0]
sf_time = time.time() - t0
sf_mse = float((y_test[0] - sf_forecast) ** 2)
if sf_mse > 100.0:
print(f"SF DEFAULT: high MSE {sf_mse:.4f} for series length {n}, "
f"rep {rep}, true {y_test[0]} forecast {sf_forecast}")
rep = rep-1
continue
results.append([
n, rep,
sm_mse, sm_time,
aeon_mse, aeon_time,
sf_mse, sf_time,
])
cols = [
"length", "rep",
"sm_true_mse", "sm_true_time",
"aeon_true_mse", "aeon_true_time",
"sf_true_mse", "sf_true_time",
]
return pd.DataFrame(results, columns=cols)
def plot_ets_results(df):
"""
Plot MSE and runtime for ETS experiment results from run_ets_experiment.
Parameters
----------
df : pd.DataFrame
Output from run_ets_experiment.
"""
# Aggregate mean across repetitions
mean_df = df.groupby("length", as_index=False).mean()
lengths = mean_df["length"]
# --- True config ---
plt.figure(figsize=(12, 8))
# 1. TRUE MSE
plt.subplot(2, 2, 1)
plt.plot(lengths, mean_df["sm_true_mse"], marker='o', label="statsmodels")
plt.plot(lengths, mean_df["aeon_true_mse"], marker='s', label="aeon")
plt.plot(lengths, mean_df["sf_true_mse"], marker='^', label="statsforecast")
plt.xlabel("Series length")
plt.ylabel("MSE")
plt.title("True config - MSE")
plt.legend()
plt.grid(True)
# 3. TRUE runtime (log scale)
plt.subplot(2, 2, 3)
plt.plot(lengths, mean_df["sm_true_time"], marker='o', label="statsmodels")
plt.plot(lengths, mean_df["aeon_true_time"], marker='s', label="aeon")
plt.plot(lengths, mean_df["sf_true_time"], marker='^', label="statsforecast")
plt.xlabel("Series length")
plt.ylabel("Runtime (s, log scale)")
plt.yscale("log")
plt.title("True config - Runtime")
plt.legend()
plt.grid(True, which="both")
plt.tight_layout()
plt.show()
def debug():
seed=300077
n=300
sigma=1.0
# TRUE CONFIG
from aeon.forecasting.utils._loss_functions import _ets_update_states
_ets_update_states.recompile()
y, cfg = simulate_random_ets(n=n, sigma=sigma, seed=seed)
t0 = time.time()
sp_true = cfg["seasonal_periods"] if cfg["seasonal_periods"] is not None else None
aeon_error = _to_aeon_component(cfg["error"])
aeon_trend = _to_aeon_component(cfg["trend"])
aeon_seasonal = _to_aeon_component(cfg["seasonal"])
aeon_sp = sp_true if sp_true else 1 # aeon uses sp >= 1, use 1 when no season
eps = 1e-6
y_norm = (y - y.min()) / (y.max() - y.min() + eps)
y_norm = np.clip(y_norm, eps, 1.0)
y_train, y_test = y[:-1], y[-1:]
aeon_model = AeonETS(
error_type=aeon_error,
trend_type=aeon_trend,
seasonality_type=aeon_seasonal,
seasonal_period=aeon_sp,
)
try:
aeon_model.fit(y_train)
except Exception as e:
print(f"AEON FAILED")
traceback.print_exc()
print("Aeon finished in", time.time() - t0, "seconds")
if __name__ == "__main__":
# debug()
values = list(range(200, 2001, 200))
df_results = run_ets_experiment(values, m=50, sigma=1.0, seed_base=49)
# Plot the results
plot_ets_results(df_results)
Expected results
Handle it more gracefully
Actual results
Traceback (most recent call last):
File "C:\Code\aeon\local\ets_evaluate.py", line 314, in run_ets_experiment
aeon_model.fit(y_train)
File "C:\Code\aeon\aeon\forecasting\base.py", line 97, in fit
self._fit(y, exog)
File "C:\Code\aeon\aeon\forecasting\stats\_ets.py", line 182, in _fit
(self.parameters_, self.aic_) = nelder_mead(
^^^^^^^^^^^^
ZeroDivisionError: division by zero
Versions
No response