Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AR(p) and MA(q) models #117

Merged
merged 9 commits into from
Apr 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 298 additions & 4 deletions financetoolkit/technicals/statistic_model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Statistic Module"""

__docformat__ = "google"

import numpy as np
import pandas as pd
import scipy


def get_beta(returns: pd.Series, benchmark_returns: pd.Series) -> pd.Series:
Expand Down Expand Up @@ -81,7 +83,7 @@ def get_spearman_correlation(series1: pd.Series, series2: pd.Series) -> float:
return spearman_corr


def get_variance(prices: pd.Series) -> float:
def get_variance(data: pd.Series) -> float:
"""
Calculate the Variance of a given data series.

Expand All @@ -95,10 +97,10 @@ def get_variance(prices: pd.Series) -> float:
Returns:
float: Variance value.
"""
return prices.var()
return data.var()


def get_standard_deviation(prices: pd.Series) -> float:
def get_standard_deviation(data: pd.Series) -> float:
"""
Calculate the Standard Deviation of a given data series.

Expand All @@ -112,4 +114,296 @@ def get_standard_deviation(prices: pd.Series) -> float:
Returns:
float: Standard Deviation value.
"""
return prices.std()
return data.std()


def get_ar_weights_lsm(series: np.ndarray, p: int) -> tuple:
"""
Fit an AR(p) model to a time series.

The LSM finds the coefficients (parameters) that minimize the sum of the squared residuals
between the actual and predicted values in a time series, providing a straightforward
and often efficient way to estimate the parameters of an Autoregressive (AR) model.

Upsides:
- Doesn't require stationarity.
- Consistent and unbiased for large datasets.

Downsides:
- Computationally expensive for large datasets.
- Sensible to outliers.

Args:
series (np.ndarry): The time series data to model.
p (int): The order of the autoregressive model, indicating how many past values to consider.

Returns:
tuple: A tuple containing three elements:
- numpy array of estimated parameters phi,
- float representing the estimated constant c,
- float representing the sigma squared of the white noise.
"""
X = np.column_stack(
[np.ones(len(series) - p)] + [series[i : len(series) - p + i] for i in range(p)]
)
Y = series[p:]

# Solving for AR coefficients and constant term using the Least Squares Method
params = np.linalg.lstsq(X, Y, rcond=None)[0]
phi = params[1:]
c = params[0]

residuals = Y - X @ params
sigma2 = residuals @ residuals / len(Y)

return phi, c, sigma2


def estimate_ar_weights_yule_walker(series: pd.Series, p: int) -> tuple:
"""
Estimate the weights (parameters) of an Autoregressive (AR) model using the Yule-Walker Method.

This method computes the Yule-Walker equations which are a set of linear equations
to estimate the parameters of an AR model based on the autocorrelation function of
the input series. It is specifically designed for AR models and is highly efficient
for stationary time series data. Make sure the series is stationary before using this method.

Args:
series (pd.Series): The time series data to model.
p (int): The order of the autoregressive model.

Returns:
tuple: A tuple containing three elements:
- numpy array of estimated parameters phi,
- float representing the estimated constant c,
- float representing the sigma squared of the white noise.
"""
autocov = [
np.correlate(series, series, mode="full")[len(series) - 1 - i] / len(series)
for i in range(p + 1)
]

# Create the Yule-Walker matrices
R = scipy.linalg.toeplitz(autocov[:p])
r = autocov[1 : p + 1]

# Solve the Yule-Walker equations
phi = np.linalg.solve(R, r)
mu = series.mean()
c = mu * (1 - np.sum(phi))
sigma2 = autocov[0] - phi @ r

return phi, c, sigma2


def get_ar(
data: np.ndarray | pd.Series | pd.DataFrame,
p: int = 1,
steps: int = 1,
phi: np.ndarray | None = None,
c: float | None = None,
method: str = "lsm",
) -> np.ndarray | pd.Series | pd.DataFrame:
"""
Predict values using an AR(p) model.

Generates future values of a time series based on an Autoregressive (AR) model.
This function uses recent observations and the AR model coefficients, forecasted,
to forecast the next 'steps' values. The predictions are made iteratively, where
each subsequent prediction becomes a new observation.

Often the following is used to estimate the order, p, of the AR model:
1. Plot the Partial Autocorrelation Function (PACF): Plot the PACF of the time series.
2. Identify Significant Lags: Look for the lag after which most partial autocorrelations
are not significantly different from zero. A common rule of thumb is that the PACF
'cuts off' after lag p.

Args:
data (np.ndarray | pd.Series | pd.DataFrame):
The data to predict values for with AR(p). The number of observations should be at
least equal to the order of the AR model. Note that it calculates the AR model
for each column of the DataFrame separately.
p (int): The order of the autoregressive model, indicating how many past values
to consider. It is only used if c or phi isn't provided. Defaults to 1.
steps (int, optional): The number of future time steps to predict. Defaults to 1.
phi (np.ndarray | None): Estimated parameters of the AR model.
c (float | None): The constant term of the AR model.
method (str, optional): The method to use to estimate the AR parameters. Can be
'lsm' (Least Squares Method) or 'yw' (Yule-Walker Method). Defaults to 'lsm'.
See the weight calculation functions documentation for more details.

Returns:
np.ndarray | pd.Series | pd.DataFrame: Predicted values for the specified
number of future steps.
"""
if isinstance(data, pd.DataFrame):
if data.index.nlevels != 1:
raise ValueError("Expects single index DataFrame, no other value.")
return data.aggregate(
lambda x: get_ar(x, steps=steps, phi=phi, p=p, method=method)
)
if isinstance(data, pd.Series):
data = data.to_numpy()

if phi is None:
if method == "lsm":
phi, c, _ = get_ar_weights_lsm(data, p)
elif method == "yw":
phi, c, _ = estimate_ar_weights_yule_walker(data, p)
else:
raise ValueError("Method must be 'lsm' or 'yw'.")

predictions = np.zeros(steps)
recent_values = list(data[-p:])

for i in range(steps):
next_value = c + phi @ recent_values
predictions[i] = next_value

recent_values.append(next_value)
recent_values.pop(0)

return predictions


def ma_likelihood(params, data: np.ndarray) -> float:
"""
Calculate the negative log-likelihood for an MA(q) model and the residuals/errors.

Args:
params (np.ndarray): Model parameters where the last element is the variance of the
white noise and the others are the MA parameters (theta_1, ..., theta_q).
data (np.ndarray): Observed time series data.

Returns:
tuple:
- float: The negative log-likelihood of the MA model.
- np.ndarray: Residuals/errors from the MA model.
"""
q = len(params) - 1
theta = params[:-1]
sigma2 = params[-1]
n = len(data)

errors = np.zeros(n)

for t in range(q, n):
errors[t] = data[t] - theta @ errors[t - q : t][::-1]
likelihood = -n / 2 * np.log(2 * np.pi * sigma2) - np.sum(errors[q:] ** 2) / (
2 * sigma2
)

return -likelihood, errors


def fit_ma_model(data: np.ndarray, q: int) -> tuple:
"""
Fit an MA(q) model to the time series data.

Finds the parameters of the MA model that minimize the negative log-likelihood of
the observed data and calculate residuals.

This MLE method is described in:
@inbook{NBERc12707,
Crossref = "NBERaesm76-1",
title = "Maximum Likelihood Estimation of Moving Average Processes",
author = "Denise R. Osborn",
BookTitle = "Annals of Economic and Social Measurement, Volume 5, number 1",
Publisher = "NBER",
pages = "75-87",
year = "1976",
URL = "http://www.nber.org/chapters/c12707",
}
@book{NBERaesm76-1,
title = "Annals of Economic and Social Measurement, Volume 5, number 1",
author = "Sanford V. Berg",
institution = "National Bureau of Economic Research",
type = "Book",
publisher = "NBER",
year = "1976",
URL = "https://www.nber.org/books-and-chapters/annals-economic-and-social-measurement-volume-5-number-1",
}

Args:
data (np.ndarray): Observed time series data.
q (int): The order of the MA model.

Returns:
tuple:
- np.ndarray: The parameters theta of the fitted MA model.
- float: The variance sigma of the fitted MA model.
- np.ndarray: The residuals/errors from the fitted MA model.
"""
initial_params = np.zeros(q + 1)
initial_params[-1] = np.var(data)

# Minimize the negative log-likelihood
result = scipy.optimize.minimize(
lambda params, data: ma_likelihood(params, data)[0],
initial_params,
args=(data,),
method="L-BFGS-B",
)

if not result.success:
raise RuntimeError("Optimization failed to converge.")
_, errors = ma_likelihood(result.x, data)

return result.x[:-1], result.x[-1], errors[q:]


def get_ma(
data: np.ndarray | pd.Series | pd.DataFrame,
q: int,
steps: int = 1,
theta: np.ndarray = None,
errors: np.ndarray = None,
) -> np.ndarray | pd.Series | pd.DataFrame:
"""
Predict values using an MA(q) model.

Generates future values of a time series based on a Moving Average (MA) model.
This function uses the series of errors (innovations) and the MA model coefficients
to forecast the next 'steps' values. The predictions are made based on the assumption
that future errors are expected to be zero, which is a common approach in MA forecasting.

Args:
data (np.ndarray | pd.Series | pd.DataFrame): The data to predict values for with MA(q).
Note that it calculates the AR model for each column of the DataFrame separately.
q (int): The order of the moving average model, indicating how many past error terms to consider.
It is only used if theta or errors isn't provided.
steps (int, optional): The number of future time steps to predict. Defaults to 1.
theta (np.ndarray | None): Estimated parameters of the MA model.
errors (np.ndarray | None): Array of past errors (residuals) from the model.

Returns:
np.ndarray | pd.Series | pd.DataFrame: Predicted values for the specified number of future steps.
"""
if isinstance(data, pd.DataFrame):
if data.index.nlevels != 1:
raise ValueError("Expects single index DataFrame.")
return data.aggregate(
lambda x: get_ma(x, q=q, steps=steps, theta=theta, errors=errors)
)
if isinstance(data, pd.Series):
data = data.to_numpy()

if theta is None or errors is None:
theta, _, errors = fit_ma_model(data, q)

if len(data) < q:
raise ValueError("Data length must be at least equal to the MA order (q).")

mu = np.mean(data)
predictions = np.full(steps, mu)

for i in range(steps):
if i < q:
relevant_errors = (
errors[-q:]
if i == 0
else np.concatenate((errors[-(q - i) :], np.zeros(i)))
)
predictions[i] += theta[: len(relevant_errors)] @ relevant_errors[::-1]

return predictions
Loading