-
Notifications
You must be signed in to change notification settings - Fork 632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Example on multiple targets (with proposed solution) #1440
Comments
For this to work, I had to do the following: __all__ = ["AutoRegressiveBaseModel"]
from loguru import logger
from typing import List, Union, Any, Sequence, Tuple, Dict, Callable
import torch
from torch import Tensor
from pytorch_forecasting.metrics import MultiLoss, DistributionLoss
from pytorch_forecasting.utils import to_list, apply_to_list
from pytorch_forecasting.models.base_model import AutoRegressiveBaseModel as AutoRegressiveBaseModel_
class AutoRegressiveBaseModel(AutoRegressiveBaseModel_): # pylint: disable=abstract-method
"""Basically AutoRegressiveBaseModel from `pytorch_forecasting` but fixed for multi-target. Worked for `LSTM`."""
def output_to_prediction(
self,
normalized_prediction_parameters: torch.Tensor,
target_scale: Union[List[torch.Tensor], torch.Tensor],
n_samples: int = 1,
**kwargs: Any,
) -> Tuple[Union[List[torch.Tensor], torch.Tensor], torch.Tensor]:
"""
Convert network output to rescaled and normalized prediction.
Function is typically not called directly but via :py:meth:`~decode_autoregressive`.
Args:
normalized_prediction_parameters (torch.Tensor): network prediction output
target_scale (Union[List[torch.Tensor], torch.Tensor]): target scale to rescale network output
n_samples (int, optional): Number of samples to draw independently. Defaults to 1.
**kwargs: extra arguments for dictionary passed to :py:meth:`~transform_output` method.
Returns:
Tuple[Union[List[torch.Tensor], torch.Tensor], torch.Tensor]: tuple of rescaled prediction and
normalized prediction (e.g. for input into next auto-regressive step)
"""
logger.trace(f"normalized_prediction_parameters={normalized_prediction_parameters.size()}")
B = normalized_prediction_parameters.size(0)
D = normalized_prediction_parameters.size(-1)
single_prediction = to_list(normalized_prediction_parameters)[0].ndim == 2
logger.trace(f"single_prediction={single_prediction}")
if single_prediction: # add time dimension as it is expected
normalized_prediction_parameters = apply_to_list(
normalized_prediction_parameters, lambda x: x.unsqueeze(1)
)
# transform into real space
prediction_parameters = self.transform_output(
prediction=normalized_prediction_parameters, target_scale=target_scale, **kwargs
)
logger.trace(
f"prediction_parameters ({len(prediction_parameters)}): {[p.size() for p in prediction_parameters]}"
)
# sample value(s) from distribution and select first sample
if isinstance(self.loss, DistributionLoss) or (
isinstance(self.loss, MultiLoss) and isinstance(self.loss[0], DistributionLoss)
):
if n_samples > 1:
prediction_parameters = apply_to_list(
prediction_parameters, lambda x: x.reshape(int(x.size(0) / n_samples), n_samples, -1)
)
prediction = self.loss.sample(prediction_parameters, 1)
prediction = apply_to_list(prediction, lambda x: x.reshape(x.size(0) * n_samples, 1, -1))
else:
prediction = self.loss.sample(normalized_prediction_parameters, 1)
else:
prediction = prediction_parameters
logger.trace(f"prediction ({len(prediction)}): {[p.size() for p in prediction]}")
# normalize prediction prediction
normalized_prediction = self.output_transformer.transform(prediction, target_scale=target_scale)
if isinstance(normalized_prediction, list):
logger.trace(f"normalized_prediction: {[p.size() for p in normalized_prediction]}")
input_target = normalized_prediction[-1] # torch.cat(normalized_prediction, dim=-1) # dim=-1
else:
logger.trace(f"normalized_prediction: {normalized_prediction.size()}")
input_target = normalized_prediction # set next input target to normalized prediction
logger.trace(f"input_target: {input_target.size()}")
assert input_target.size(0) == B
assert input_target.size(-1) == D, f"{input_target.size()} but D={D}"
# remove time dimension
if single_prediction:
prediction = apply_to_list(prediction, lambda x: x.squeeze(1))
input_target = input_target.squeeze(1)
logger.trace(f"input_target: {input_target.size()}")
return prediction, input_target
def decode_autoregressive(
self,
decode_one: Callable,
first_target: Union[List[torch.Tensor], torch.Tensor],
first_hidden_state: Any,
target_scale: Union[List[torch.Tensor], torch.Tensor],
n_decoder_steps: int,
n_samples: int = 1,
**kwargs: Any,
) -> Union[List[torch.Tensor], torch.Tensor]:
"""
Make predictions in auto-regressive manner. Supports only continuous targets.
Args:
decode_one (Callable): function that takes at least the following arguments:
* ``idx`` (int): index of decoding step (from 0 to n_decoder_steps-1)
* ``lagged_targets`` (List[torch.Tensor]): list of normalized targets.
List is ``idx + 1`` elements long with the most recent entry at the end, i.e. ``previous_target = lagged_targets[-1]`` and in general ``lagged_targets[-lag]``.
* ``hidden_state`` (Any): Current hidden state required for prediction. Keys are variable names. Only lags that are greater than ``idx`` are included.
* additional arguments are not dynamic but can be passed via the ``**kwargs`` argument And returns tuple of (not rescaled) network prediction output and hidden state for next auto-regressive step.
first_target (Union[List[torch.Tensor], torch.Tensor]): first target value to use for decoding
first_hidden_state (Any): first hidden state used for decoding
target_scale (Union[List[torch.Tensor], torch.Tensor]): target scale as in ``x``
n_decoder_steps (int): number of decoding/prediction steps
n_samples (int): number of independent samples to draw from the distribution -
only relevant for multivariate models. Defaults to 1.
**kwargs: additional arguments that are passed to the decode_one function.
Returns:
Union[List[torch.Tensor], torch.Tensor]: re-scaled prediction
"""
# make predictions which are fed into next step
output: List[Union[List[Tensor], Tensor]] = []
current_hidden_state = first_hidden_state
normalized_output = [first_target]
for idx in range(n_decoder_steps):
# get lagged targets
current_target, current_hidden_state = decode_one(
idx, lagged_targets=normalized_output, hidden_state=current_hidden_state, **kwargs
)
assert isinstance(current_target, Tensor)
logger.trace(f"current_target: {current_target.size()}")
# get prediction and its normalized version for the next step
prediction, current_target = self.output_to_prediction(
current_target, target_scale=target_scale, n_samples=n_samples
)
logger.trace(f"current_target: {current_target.size()}")
if isinstance(prediction, Tensor):
logger.trace(f"prediction ({type(prediction)}): {prediction.size()}")
else:
logger.trace(
f"prediction ({type(prediction)}|{len(prediction)}): {[p.size() for p in prediction]}"
)
# save normalized output for lagged targets
normalized_output.append(current_target)
# set output to unnormalized samples, append each target as n_batch_samples x n_random_samples
output.append(prediction)
# Check things before finishing
if isinstance(prediction, Tensor):
logger.trace(f"output ({len(output)}): {[o.size() for o in output]}") # type: ignore
else:
logger.trace(f"output ({len(output)}): {[{len(o)} for o in output]}")
if isinstance(self.hparams.target, str):
# Here, output is List[Tensor]
final_output = torch.stack(output, dim=1) # type: ignore
logger.trace(f"final_output: {final_output.size()}")
return final_output
# For multi-targets: output is List[List[Tensor]]
# final_output_multitarget = [
# torch.stack([out[idx] for out in output], dim=1) for idx in range(len(self.target_positions))
# ]
# self.target_positions is always Tensor([0]), so len() of that is always 1...
final_output_multitarget = torch.stack([out[0] for out in output], dim=1)
if final_output_multitarget.dim() > 3:
final_output_multitarget = final_output_multitarget.squeeze(2)
if isinstance(final_output_multitarget, Tensor):
logger.trace(f"final_output_multitarget: {final_output_multitarget.size()}")
else:
logger.trace(
f"final_output_multitarget ({type(final_output_multitarget)}): {[o.size() for o in final_output_multitarget]}"
)
r = [final_output_multitarget[..., i] for i in range(final_output_multitarget.size(-1))]
return r |
Then, in the LSTM model: __all__ = ["LSTMModel"]
from loguru import logger
from typing import List, Union, Any, Sequence, Tuple, Dict, Callable
import torch
from torch import nn, Tensor
from torch.nn.utils import rnn
from pytorch_forecasting.metrics import MAE, Metric, MultiLoss
from pytorch_forecasting.models.nn import LSTM
from ._base_autoregressive import AutoRegressiveBaseModel
class LSTMModel(AutoRegressiveBaseModel):
"""Simple LSTM model."""
def __init__(
self,
target: Union[str, Sequence[str]],
target_lags: Dict[str, Dict[str, int]],
n_layers: int,
hidden_size: int,
dropout: float = 0.1,
input_size: int = None,
loss: Metric = None,
**kwargs: Any,
):
"""Prefer using the `LSTMModel.from_dataset()` method rather than this constructor.
Args:
target (Union[str, Sequence[str]]):
Name (or list of names) of target variable(s).
target_lags (Dict[str, Dict[str, int]]): _description_
n_layers (int):
Number of LSTM layers.
hidden_size (int):
Hidden size for LSTM model.
dropout (float, optional):
Droput probability (<1). Defaults to 0.1.
input_size (int, optional):
Input size. Defaults to: inferred from `target`.
loss (Metric):
Loss criterion. Can be different for each target in multi-target setting thanks to `MultiLoss`. Defaults to `MAE`.
**kwargs:
See :class:`pytorch_forecasting.models.base_model.AutoRegressiveBaseModel`.
"""
n_targets = len(target) if isinstance(target, (list, tuple)) else 1
if input_size is None:
input_size = n_targets
logger.debug(f"input_size={input_size} | n_targets={n_targets}")
# arguments target and target_lags are required for autoregressive models
# even though target_lags cannot be used without covariates
# saves arguments in signature to `.hparams` attribute, mandatory call - do not skip this
self.save_hyperparameters()
# loss
if loss is None:
loss = MultiLoss([MAE() for _ in range(n_targets)]) if n_targets > 1 else MAE()
# pass additional arguments to BaseModel.__init__, mandatory call - do not skip this
super().__init__(loss=loss, **kwargs)
# use version of LSTM that can handle zero-length sequences
self.lstm = LSTM(
hidden_size=hidden_size,
input_size=input_size,
num_layers=n_layers,
dropout=dropout,
batch_first=True,
)
# output layer
self.output_layer = nn.Linear(hidden_size, n_targets)
# others
self._input_vector: Tensor
def encode(self, x: Dict[str, torch.Tensor]) -> Tuple[Tensor, Tensor]:
"""Encode method.
Args:
x (Dict[str, torch.Tensor]):
First item returned by a `DataLoader` obtained from `TimeSeriesDataset.to_dataloader()`.
Returns:
Tuple[Tensor, Tensor]:
Tuple of hidden and cell state.
"""
# we need at least one encoding step as because the target needs to be lagged by one time step
# because we use the custom LSTM, we do not have to require encoder lengths of > 1
# but can handle lengths of >= 1
assert x["encoder_lengths"].min() >= 1
input_vector = x["encoder_cont"].clone()
# lag target by one
input_vector[..., self.target_positions] = torch.roll(
input_vector[..., self.target_positions],
shifts=1,
dims=1,
)
input_vector = input_vector[:, 1:] # first time step cannot be used because of lagging
# determine effective encoder_length length
effective_encoder_lengths = x["encoder_lengths"] - 1
# run through LSTM network
hidden_state: Tuple[Tensor, Tensor]
_, hidden_state = self.lstm(
input_vector,
lengths=effective_encoder_lengths,
enforce_sorted=False, # passing the lengths directly
) # second ouput is not needed (hidden state)
return hidden_state
def decode(
self,
x: Dict[str, torch.Tensor],
hidden_state: Tuple[Tensor, Tensor],
) -> Union[List[Tensor], Tensor]:
"""
Args:
x (Dict[str, torch.Tensor]):
First item returned by a `DataLoader` obtained from `TimeSeriesDataset.to_dataloader()`.
hidden_state (Tuple[Tensor, Tensor]):
Tuple of hidden and cell state.
Returns:
(Union[List[Tensor], Tensor]):
Tensor if one target, list of Tensors if multi-target.
"""
# again lag target by one
input_vector = x["decoder_cont"].clone() # (B,L,D)
logger.trace(f"input_vector: {input_vector.size()}")
B, L, D = input_vector.size()
input_vector[..., self.target_positions] = torch.roll(
input_vector[..., self.target_positions], shifts=1, dims=1
)
# but this time fill in missing target from encoder_cont at the first time step instead of throwing it away
last_encoder_target = x["encoder_cont"][
torch.arange(x["encoder_cont"].size(0), device=x["encoder_cont"].device),
x["encoder_lengths"] - 1,
self.target_positions.unsqueeze(-1),
].T
input_vector[:, 0, self.target_positions] = last_encoder_target
# Training mode
if self.training: # training mode
lstm_output, _ = self.lstm(
input_vector, hidden_state, lengths=x["decoder_lengths"], enforce_sorted=False
)
logger.trace(f"lstm_output ({type(lstm_output)}): {lstm_output.size()}")
# transform into right shape
out: Tensor = self.output_layer(lstm_output)
logger.trace(f"out ({type(out)}): {out.size()}")
if self.n_targets > 1:
out = [out[:, :, i].view(B, L, -1) for i in range(D)] # type: ignore
prediction: List[Tensor] = self.transform_output(out, target_scale=x["target_scale"])
# predictions are not yet rescaled
logger.trace(
f"prediction ({type(prediction)}|{len(prediction)}): {[p.size() for p in prediction]})"
)
return prediction
# Prediction mode
self._input_vector = input_vector
n_decoder_steps = input_vector.size(1)
logger.trace(f"n_decoder_steps={n_decoder_steps}")
first_target = input_vector[:, 0, :] # self.target_positions?
first_target = first_target.view(B, 1, D)
target_scale = x["target_scale"]
output: Union[List[Tensor], Tensor] = self.decode_autoregressive(
self.decode_one, # make predictions which are fed into next step
first_target=first_target,
first_hidden_state=hidden_state,
target_scale=target_scale,
n_decoder_steps=n_decoder_steps,
)
# predictions are already rescaled
if isinstance(output, Tensor):
logger.trace(f"output ({type(output)}): {output.size()}")
else:
logger.trace(f"output ({type(output)}|{len(output)}): {[o.size() for o in output]})")
return output
def forward(self, x: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
"""_summary_
Args:
x (Dict[str, torch.Tensor]): _description_
Returns:
Dict[str, torch.Tensor]: _description_
"""
hidden_state = self.encode(x) # encode to hidden state
output = self.decode(x, hidden_state) # decode leveraging hidden state
out: Dict[str, torch.Tensor] = self.to_network_output(prediction=output)
return out
def decode_one(
self,
idx: int,
lagged_targets: List[Tensor],
hidden_state: Tuple[Tensor, Tensor],
) -> Tuple[Tensor, Tuple[Tensor, Tensor]]:
"""_summary_
Args:
idx (int):
(???).
lagged_targets (List[Tensor]):
(???).
hidden_state (Tuple[Tensor, Tensor]):
`(h,c)` (hidden state, cell state).
Returns:
Tuple[Tensor, Tuple[Tensor, Tensor]]:
One-step-ahead prediction and tuple of `(h,c)` (hidden state, cell state).
"""
B, _, D = self._input_vector.size()
logger.trace(f"idx: {idx}")
logger.trace(
f"lagged_targets ({type(lagged_targets)}|{len(lagged_targets)}): {[h.size() for h in lagged_targets]}"
)
logger.trace(f"hidden_state ({type(hidden_state)}): {[h.size() for h in hidden_state]}")
logger.trace(f"input_vector ({type(self._input_vector)}): {self._input_vector.size()}")
# input has shape (B,L,D)
x = self._input_vector[:, [idx]]
logger.trace(f"x ({type(x)}): {x.size()}")
# take most recent target (i.e. lag=1)
lag = lagged_targets[-1]
logger.trace(f"lag: {lag.size()}")
assert lag.size(0) == B
assert lag.size(-1) == D
# make sure it has shape (B,D)
lag = lag.view(B, D)
logger.trace(f"lag: {lag.size()}")
# overwrite at target positions
x[:, 0, :] = lag
logger.trace(f"x ({type(x)}): {x.size()}")
lstm_output, hidden_state = self.lstm(x, hidden_state)
logger.trace(f"lstm_output ({type(lstm_output)}): {lstm_output.size()}")
# transform into right shape
prediction: Tensor = self.output_layer(lstm_output)[:, 0] # take first timestep
if self.n_targets > 1:
prediction = prediction.view(B, 1, D)
logger.trace(f"prediction ({type(prediction)}): {prediction.size()}")
return prediction, hidden_state |
Here is the test: import pytest
import sys
import typing as ty
from loguru import logger
import numpy as np
import pandas as pd
from torch import Tensor
from lightning.pytorch import Trainer
from lightning.pytorch.utilities.model_summary import ModelSummary
from pytorch_forecasting import TimeSeriesDataSet
from pytorch_forecasting.data.encoders import TorchNormalizer, MultiNormalizer
from <my-package>.models import LSTMModel
@pytest.fixture(scope="session")
def timeseriesdataset(prediction_length: int) -> TimeSeriesDataSet:
"""Dummy `TimeSeriesDataSet`."""
data = pd.DataFrame(
dict(
target=np.random.rand(30),
group=np.repeat(np.arange(3), 10),
time_idx=np.tile(np.arange(10), 3),
)
)
dataset = TimeSeriesDataSet(
data,
group_ids=["group"],
target="target",
time_idx="time_idx",
min_encoder_length=5,
max_encoder_length=5,
min_prediction_length=prediction_length,
max_prediction_length=prediction_length,
time_varying_unknown_reals=["target"],
target_normalizer=TorchNormalizer(),
)
return dataset
@pytest.fixture(scope="session")
def timeseriesdataset_multitarget(prediction_length: int) -> TimeSeriesDataSet:
"""Dummy multi-target `TimeSeriesDataSet`."""
multi_target_test_data = pd.DataFrame(
dict(
target1=np.random.rand(30),
target2=np.random.rand(30),
group=np.repeat(np.arange(3), 10),
time_idx=np.tile(np.arange(10), 3),
)
)
dataset = TimeSeriesDataSet(
multi_target_test_data,
group_ids=["group"],
target=["target1", "target2"],
time_idx="time_idx",
min_encoder_length=5,
max_encoder_length=5,
min_prediction_length=prediction_length,
max_prediction_length=prediction_length,
time_varying_unknown_reals=["target1", "target2"],
target_normalizer=MultiNormalizer([TorchNormalizer(), TorchNormalizer()]),
)
return dataset
@pytest.mark.parametrize("multitarget", [False, True])
def test_lstm_on_device_dataset(
timeseriesdataset: TimeSeriesDataSet,
timeseriesdataset_multitarget: TimeSeriesDataSet,
multitarget: bool,
) -> None:
"""Test we can train a `LSTMModel` model."""
# Data
if multitarget:
dataset = timeseriesdataset_multitarget
else:
dataset = timeseriesdataset
prediction_length = dataset.max_prediction_length
# batch info
batch_size = 4
loader = dataset.to_dataloader(batch_size=batch_size)
X, y = next(iter(loader))
targets: ty.List[Tensor] = y[0]
if multitarget:
for t in targets:
_assert_size(t, batch_size, prediction_length)
else:
assert isinstance(targets, Tensor)
logger.info(f"Target: {targets.size()}")
# create model
model = LSTMModel.from_dataset(
dataset,
hidden_size=10,
n_layers=2,
learning_rate=1e-3,
)
logger.info(ModelSummary(model, max_depth=-1))
logger.info(model.hparams)
# check prediction
out: dict = model(X)
prediction = out["prediction"]
_assert_prediction(prediction, targets, batch_size, prediction_length)
# check can anomaly score
scores: Tensor = model.anomaly_score(X, y)
logger.info(scores.size())
assert scores.size(0) == batch_size
# loss
logger.info("Evaluating loss...")
loss_before = model.loss(prediction, y)
logger.info(f"Loss (before training): {loss_before}")
# train
trainer = Trainer(
logger=False,
enable_checkpointing=False,
max_epochs=1 if fake_data else 50,
accelerator="cpu",
)
trainer.fit(model, loader)
# loss
logger.info(f"Evaluating prediction (prediction_length={prediction_length})...")
model.eval()
out = model(X)
prediction = out["prediction"]
_assert_prediction(prediction, targets, batch_size, prediction_length)
logger.info("Evaluating loss...")
model.train()
loss = model.loss(model(X)["prediction"], y)
logger.info(f"Loss (after training): {loss}")
# test loss has decreased
if not fake_data:
assert loss < loss_before, f"{loss} is not lower than {loss_before}"
def _assert_size(t: Tensor, batch_size: int, prediction_length: int) -> None:
"""helper."""
print(f"\t{t.size()}")
assert t.dim() == 3 or t.dim() == 2, f"{t.size()}"
assert int(t.size(0)) == batch_size
assert int(t.size(1)) == prediction_length
if t.dim() == 3:
assert int(t.size(2)) == 1
def _assert_prediction(
prediction: ty.Union[ty.List[Tensor], Tensor],
targets: ty.Union[ty.List[Tensor], Tensor],
batch_size: int,
prediction_length: int,
) -> None:
"""helper"""
logger.info(
f"Targets ({type(targets)}): {targets.size() if isinstance(targets,Tensor) else (len(targets), [t.size() for t in targets])}"
)
if isinstance(prediction, (list, tuple)):
logger.info(f"Prediction ({type(prediction)}|{len(prediction)}): {[p.size() for p in prediction]}")
assert len(prediction) == len(targets)
for t in prediction:
_assert_size(t, batch_size, prediction_length)
elif isinstance(prediction, Tensor):
assert isinstance(targets, Tensor)
logger.info(f"Prediction ({type(prediction)}: {prediction.size()}):")
assert prediction.size() == targets.view(batch_size, prediction_length, -1).size()
else:
raise ValueError(f"Prediction: {type(prediction)}")
if __name__ == "__main__":
logger.remove()
logger.add(sys.stderr, level="TRACE")
pytest.main([__file__, "-x", "-s"]) |
svnv-svsv-jm
changed the title
Example on multiple targets
Example on multiple targets (with proposed solution)
Nov 22, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
1.0.0
2.1.0
3.10.10
There is currently no example about how to set up any model to predict multiple targets (auto-regressively).
My
targets
are also the input to the model, as it is an auto-regressive use case.I've update the code, see my comments below. Now it will work with multi-targets! :)
The text was updated successfully, but these errors were encountered: