Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Sliding predictions for an arbitrary DF #940

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions nbs/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -942,24 +942,12 @@
" fcsts_df : pandas.DataFrame\n",
" DataFrame with insample predictions for all fitted `models`. \n",
" \"\"\"\n",
" if not self._fitted:\n",
" raise Exception('The models must be fitted first with `fit` or `cross_validation`.')\n",
"\n",
" for model in self.models:\n",
" if model.SAMPLING_TYPE == 'recurrent':\n",
" warnings.warn(f'Predict insample might not provide accurate predictions for \\\n",
" recurrent model {repr(model)} class yet due to scaling.')\n",
" print(f'WARNING: Predict insample might not provide accurate predictions for \\\n",
" recurrent model {repr(model)} class yet due to scaling.')\n",
" \n",
" cols = []\n",
" count_names = {'model': 0}\n",
" for model in self.models:\n",
" model_name = repr(model)\n",
" count_names[model_name] = count_names.get(model_name, -1) + 1\n",
" if count_names[model_name] > 0:\n",
" model_name += str(count_names[model_name])\n",
" cols += [model_name + n for n in model.loss.output_names]\n",
"\n",
" # Remove test set from dataset and last dates\n",
" test_size = self.models[0].get_test_size()\n",
Expand All @@ -977,12 +965,70 @@
" else:\n",
" trimmed_dataset = self.dataset\n",
" times = self.ds\n",
" \n",
" # original y\n",
" original_y = {\n",
" self.id_col: ufp.repeat(self.uids, np.diff(self.dataset.indptr)),\n",
" self.time_col: self.ds,\n",
" self.target_col: self.dataset.temporal[:, 0].numpy(),\n",
" }\n",
"\n",
" return self._predict_sliding(trimmed_dataset, self.uids, times, test_size, original_y, step_size=step_size)\n",
"\n",
"\n",
" def predict_sliding(self, df, step_size : int= 1):\n",
" \"\"\"Sliding window prediction with core.NeuralForecast.\n",
"\n",
" `core.NeuralForecast`'s `predict_sliding` uses stored fitted `models`\n",
" to predict values of a time series given a dataframe.\n",
"\n",
" Parameters\n",
" ----------\n",
" df : pandas or polars DataFrame \n",
" step_size : int (default=1)\n",
" Step size between each window.\n",
"\n",
" Returns\n",
" -------\n",
" fcsts_df : pandas.DataFrame\n",
" DataFrame with insample predictions for all fitted `models`. \n",
" \"\"\"\n",
" for model in self.models:\n",
" if model.SAMPLING_TYPE == 'recurrent':\n",
" warnings.warn(f'Sliding predictions might not provide accurate predictions for \\\n",
" recurrent model {repr(model)} class yet due to scaling.')\n",
" print(f'WARNING: Sliding predictions might not provide accurate predictions for \\\n",
" recurrent model {repr(model)} class yet due to scaling.')\n",
" \n",
" # TODO does sorting need to happen?\n",
" dataset, uids, _, times = TimeSeriesDataset.from_df(df, id_col=self.id_col, time_col=self.time_col, target_col=self.target_col)\n",
" \n",
" original_y = {\n",
" self.id_col: ufp.repeat(uids, np.diff(dataset.indptr)),\n",
" self.time_col: times,\n",
" self.target_col: dataset.temporal[:, 0].numpy(),\n",
" }\n",
"\n",
" return self._predict_sliding(dataset, uids, times, 0, original_y, step_size=step_size)\n",
"\n",
" def _predict_sliding(self, dataset, uids, times, test_size, original_y, step_size: int = 1):\n",
" if not self._fitted:\n",
" raise Exception('The models must be fitted first with `fit` or `cross_validation`.')\n",
"\n",
" cols = []\n",
" count_names = {'model': 0}\n",
" for model in self.models:\n",
" model_name = repr(model)\n",
" count_names[model_name] = count_names.get(model_name, -1) + 1\n",
" if count_names[model_name] > 0:\n",
" model_name += str(count_names[model_name])\n",
" cols += [model_name + n for n in model.loss.output_names]\n",
"\n",
" # Generate dates\n",
" fcsts_df = _insample_times(\n",
" times=times,\n",
" uids=self.uids,\n",
" indptr=trimmed_dataset.indptr,\n",
" uids=uids,\n",
" indptr=dataset.indptr,\n",
" h=self.h,\n",
" freq=self.freq,\n",
" step_size=step_size,\n",
Expand All @@ -995,22 +1041,16 @@
"\n",
" for model in self.models:\n",
" # Test size is the number of periods to forecast (full size of trimmed dataset)\n",
" model.set_test_size(test_size=trimmed_dataset.max_size)\n",
" model.set_test_size(test_size=dataset.max_size)\n",
"\n",
" # Predict\n",
" model_fcsts = model.predict(trimmed_dataset, step_size=step_size)\n",
" model_fcsts = model.predict(dataset, step_size=step_size)\n",
" # Append predictions in memory placeholder\n",
" output_length = len(model.loss.output_names)\n",
" fcsts[:,col_idx:(col_idx + output_length)] = model_fcsts\n",
" col_idx += output_length \n",
" model.set_test_size(test_size=test_size) # Set original test_size\n",
"\n",
" # original y\n",
" original_y = {\n",
" self.id_col: ufp.repeat(self.uids, np.diff(self.dataset.indptr)),\n",
" self.time_col: self.ds,\n",
" self.target_col: self.dataset.temporal[:, 0].numpy(),\n",
" }\n",
"\n",
" # Add predictions to forecasts DataFrame\n",
" if isinstance(self.uids, pl_Series):\n",
Expand Down
4 changes: 4 additions & 0 deletions neuralforecast/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast._no_refit_cross_validation': ( 'core.html#neuralforecast._no_refit_cross_validation',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast._predict_sliding': ( 'core.html#neuralforecast._predict_sliding',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast._prepare_fit': ( 'core.html#neuralforecast._prepare_fit',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast._reset_models': ( 'core.html#neuralforecast._reset_models',
Expand All @@ -146,6 +148,8 @@
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.predict_insample': ( 'core.html#neuralforecast.predict_insample',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.predict_sliding': ( 'core.html#neuralforecast.predict_sliding',
'neuralforecast/core.py'),
'neuralforecast.core.NeuralForecast.save': ('core.html#neuralforecast.save', 'neuralforecast/core.py'),
'neuralforecast.core._id_as_idx': ('core.html#_id_as_idx', 'neuralforecast/core.py'),
'neuralforecast.core._insample_times': ('core.html#_insample_times', 'neuralforecast/core.py'),
Expand Down
105 changes: 80 additions & 25 deletions neuralforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,11 +874,6 @@ def predict_insample(self, step_size: int = 1):
fcsts_df : pandas.DataFrame
DataFrame with insample predictions for all fitted `models`.
"""
if not self._fitted:
raise Exception(
"The models must be fitted first with `fit` or `cross_validation`."
)

for model in self.models:
if model.SAMPLING_TYPE == "recurrent":
warnings.warn(
Expand All @@ -890,15 +885,6 @@ def predict_insample(self, step_size: int = 1):
recurrent model {repr(model)} class yet due to scaling."
)

cols = []
count_names = {"model": 0}
for model in self.models:
model_name = repr(model)
count_names[model_name] = count_names.get(model_name, -1) + 1
if count_names[model_name] > 0:
model_name += str(count_names[model_name])
cols += [model_name + n for n in model.loss.output_names]

# Remove test set from dataset and last dates
test_size = self.models[0].get_test_size()
if test_size > 0:
Expand All @@ -918,11 +904,87 @@ def predict_insample(self, step_size: int = 1):
trimmed_dataset = self.dataset
times = self.ds

# original y
original_y = {
self.id_col: ufp.repeat(self.uids, np.diff(self.dataset.indptr)),
self.time_col: self.ds,
self.target_col: self.dataset.temporal[:, 0].numpy(),
}

return self._predict_sliding(
trimmed_dataset,
self.uids,
times,
test_size,
original_y,
step_size=step_size,
)

def predict_sliding(self, df, step_size: int = 1):
"""Sliding window prediction with core.NeuralForecast.

`core.NeuralForecast`'s `predict_sliding` uses stored fitted `models`
to predict values of a time series given a dataframe.

Parameters
----------
df : pandas or polars DataFrame
step_size : int (default=1)
Step size between each window.

Returns
-------
fcsts_df : pandas.DataFrame
DataFrame with insample predictions for all fitted `models`.
"""
for model in self.models:
if model.SAMPLING_TYPE == "recurrent":
warnings.warn(
f"Sliding predictions might not provide accurate predictions for \
recurrent model {repr(model)} class yet due to scaling."
)
print(
f"WARNING: Sliding predictions might not provide accurate predictions for \
recurrent model {repr(model)} class yet due to scaling."
)

# TODO does sorting need to happen?
dataset, uids, _, times = TimeSeriesDataset.from_df(
df, id_col=self.id_col, time_col=self.time_col, target_col=self.target_col
)

original_y = {
self.id_col: ufp.repeat(uids, np.diff(dataset.indptr)),
self.time_col: times,
self.target_col: dataset.temporal[:, 0].numpy(),
}

return self._predict_sliding(
dataset, uids, times, 0, original_y, step_size=step_size
)

def _predict_sliding(
self, dataset, uids, times, test_size, original_y, step_size: int = 1
):
if not self._fitted:
raise Exception(
"The models must be fitted first with `fit` or `cross_validation`."
)

cols = []
count_names = {"model": 0}
for model in self.models:
model_name = repr(model)
count_names[model_name] = count_names.get(model_name, -1) + 1
if count_names[model_name] > 0:
model_name += str(count_names[model_name])
cols += [model_name + n for n in model.loss.output_names]

# Generate dates
fcsts_df = _insample_times(
times=times,
uids=self.uids,
indptr=trimmed_dataset.indptr,
uids=uids,
indptr=dataset.indptr,
h=self.h,
freq=self.freq,
step_size=step_size,
Expand All @@ -935,23 +997,16 @@ def predict_insample(self, step_size: int = 1):

for model in self.models:
# Test size is the number of periods to forecast (full size of trimmed dataset)
model.set_test_size(test_size=trimmed_dataset.max_size)
model.set_test_size(test_size=dataset.max_size)

# Predict
model_fcsts = model.predict(trimmed_dataset, step_size=step_size)
model_fcsts = model.predict(dataset, step_size=step_size)
# Append predictions in memory placeholder
output_length = len(model.loss.output_names)
fcsts[:, col_idx : (col_idx + output_length)] = model_fcsts
col_idx += output_length
model.set_test_size(test_size=test_size) # Set original test_size

# original y
original_y = {
self.id_col: ufp.repeat(self.uids, np.diff(self.dataset.indptr)),
self.time_col: self.ds,
self.target_col: self.dataset.temporal[:, 0].numpy(),
}

# Add predictions to forecasts DataFrame
if isinstance(self.uids, pl_Series):
fcsts = pl_DataFrame(dict(zip(cols, fcsts.T)))
Expand Down