"""
This file defines the WindowGenerator model.
"""
from typing import Callable, Union, Tuple, List, Optional
import pandas as pd
import numpy as np
import math
import tensorflow as tf
from keras.backend import floatx
from .. import AutopycoinBaseClass
from ..utils import features, date_features, convert_to_list
[docs]class WindowGenerator(AutopycoinBaseClass):
"""Transform a time serie into an usable format for tensorflow model.
It can be either a pandas dataframe, tensorflow tensor or numpy array.
Parameters
----------
input_width : int
The number of historical time steps to use during the forecasting.
label_width : int
the number of time steps to forecast.
shift : int
Compute the shift between input time steps (`input_width`) and
labels time steps (`label_width`).
Hence if `label_width` is higher than `shift` label input and label datasets
will have some indentical values.
valid_size : int
The number of examples in the validation set. Use a float between 0 and 1 to use proportion.
test_size : int
The number of examples in the test set. Use a float between 0 and 1 to use proportion.
flat : bool
Flatten the inputs and labels tensors.
batch_size : int
The number of examples per batch. If None, then all examples are stacked in one batch.
Default to None.
preprocessing : callable or None
Preprocessing function to use on the data.
This function needs to take input of shape ((inputs, ...), labels).
It is applied after the train, validation and test split.
Default to None.
Attributes
----------
input_width : int
label_width : int
shift : int
valid_size : int
test_size : int
flat : bool
batch_size : int or None
train : :literal:`dataset`
valid : :literal:`dataset`
test : :literal:`dataset`
data : DataFrame or ndarray or :literal:`Tensor`
Notes
-----
The dataset's shape depends on the columns defined in :literal:`from_array` method.
There are currently four input tensors which can be added inside the inputs dataset.
Output shape:
when all columns components are defined:
Tuple of shape ((inputs, known, date_inputs, date_labels), labels)
inputs tensor:
The input tensor of shape (batch_size, input_width, input_columns) or (batch_size, input_width * input_columns)
depending if flat is set to True. Basically, they are historical values.
known tensor:
The known tensor of shape (batch_size, input_width, known_columns) or (batch_size, input_width * known_columns)
depending if flat is set to True are the variables whose values
are known in advance or estimated.
For example: time dates or temperatures.
date_inputs tensor:
Dates of shape (batch_size, input_width) are the dates associated to the inputs tensor.
Default to a tensor generated by :literal:`tf.range`.
date_labels tensor:
Dates of shape (batch_size, input_width) are the dates associated to the inputs tensor.
Default to a tensor generated by :literal:`tf.range`.
labels tensor:
The Output variables of shape (batch_size, label_width, label_columns) or (batch_size, label_width * label_columns)
depending if flat is set to True. They are the values to predict.
Examples
--------
>>> import pandas as pd
>>> from autopycoin.data import random_ts
>>> from autopycoin.dataset import WindowGenerator
...
... # We generate data
>>> data = random_ts(n_steps=100,
... trend_degree=2,
... periods=[10],
... fourier_orders=[10],
... trend_mean=0,
... trend_std=1,
... seasonality_mean=0,
... seasonality_std=1,
... batch_size=1,
... n_variables=1,
... noise=True,
... seed=42)
...
>>> w_oneshot = WindowGenerator(input_width=3,
... label_width=2,
... shift=10,
... valid_size=2,
... test_size=3,
... flat=True,
... batch_size=None,
... preprocessing=None)
...
... # Here juste inputs and labels tensors are generated
>>> w_oneshot = w_oneshot.from_array(data[0],
... input_columns=[0],
... label_columns=[0])
"""
def __init__(
self,
input_width: int,
label_width: int,
shift: Union[None, int] = None,
valid_size: Union[int, float] = 0,
test_size: Union[int, float] = 0,
flat: bool = False,
sequence_stride: int = 1,
batch_size: int = None,
preprocessing: Union[None, Callable] = None,
):
self._input_width = input_width
self._label_width = label_width
self._shift = shift if shift is not None else label_width
self._sequence_stride = sequence_stride
self._valid_size = valid_size
self._test_size = test_size
self._batch_size = batch_size
self._flat = flat
# We separate init functions in order to perfom validation.
self._compute_window_parameters()
# Preprocessing layers
self._preprocessing = preprocessing
self._initialized = False
def _compute_window_parameters(self) -> None:
"""Calculate the window parameters."""
self._total_window_size = self.input_width + self.shift
self._input_slice = slice(0, self.input_width)
self._input_indices = np.arange(self._total_window_size)[self._input_slice]
self._label_start = self._total_window_size - self.label_width
self._label_slice = slice(self._label_start, self._total_window_size)
self._label_indices = np.arange(self._total_window_size)[self._label_slice]
[docs] def from_array(
self,
data: Union[pd.DataFrame, np.ndarray, tf.Tensor, pd.Series],
input_columns: Union[None, List[Union[int, str]]] = None,
label_columns: Union[None, List[Union[int, str]]] = None,
known_columns: Union[None, List[Union[int, str]]] = None,
date_columns: Union[None, List[Union[int, str]]] = None,
):
"""Feed :literal:`WindowGenerator` with a pandas dataframe or a numpy ndarray.
This method has to be called before using `train, `test` or `valid` methods
as it initializes the data.
Parameters
----------
data : :literal:`DataFrame, Serie, list, ndarray or Tensor of shape (timesteps, variables)`
The time series dataframe on which train, valid and test datasets are built.
input_columns : list[str or int]
The input column names. Variables used to forecast target values.
label_columns : list[str or int]
The label column names. Target variables to forecast, default to None.
known_columns : list[str or int]
The known column names, default to None.
Those variables that we know exact or strong estimated values which happen during target period.
Example: Dates or temperatures.
date_columns : list[str or int]
The date column names. Dates associated to each steps, default to None.
Date columns will be cast to string and join by
'-' delimiter to be used as xticks in plot function.
Returns
-------
self : :literal:`WindowGenerator`
return the instance.
"""
if isinstance(data, pd.Series):
data = data.values
if len(data.shape) == 1:
data = tf.expand_dims(data, axis=-1)
if input_columns is None:
input_columns = [col for col in range(data.shape[-1])]
if label_columns is None:
label_columns = [col for col in range(data.shape[-1])]
if isinstance(data, pd.DataFrame):
self._from_dataframe(
data, input_columns, label_columns, known_columns, date_columns
)
elif isinstance(data, (np.ndarray, tf.Tensor)):
self._from_array(
data, input_columns, label_columns, known_columns, date_columns
)
else:
raise ValueError(
f"{type(data)} is not handled, please provide a pandas dataframe, a numpy array or a tensor."
)
self._split_train_valid_test()
return self
def _from_dataframe(
self,
data: pd.DataFrame,
input_columns: Union[None, List[Union[int, str]]],
label_columns: Union[None, List[Union[int, str]]] = None,
known_columns: Union[None, List[Union[int, str]]] = None,
date_columns: Union[None, List[Union[int, str]]] = None,
):
"""Handle dataframe."""
self._initialized = True
# Avoid replacing original dataframe
data = data.copy()
# Convert dataframe into array
self._data_columns = data.columns
self._data = data.values
# Get index for each columns
# In case if columns are not defined
try:
self._input_columns = [
self._data_columns.get_loc(col) for col in input_columns
]
self._label_columns = (
[self._data_columns.get_loc(col) for col in label_columns]
if label_columns
else None
)
self._known_columns = (
[self._data_columns.get_loc(col) for col in known_columns]
if known_columns
else None
)
self._date_columns = (
[self._data_columns.get_loc(col) for col in date_columns]
if date_columns
else None
)
except KeyError as error:
raise KeyError(
f"Columns are not found inside data, got input_columns: {input_columns},"
f"label_columns: {label_columns}, known_columns: {known_columns} and date_columns: {date_columns}."
f"Expected {self._data_columns}."
) from error
def _from_array(
self,
data: Union[np.ndarray, tf.Tensor],
input_columns: Union[None, List[Union[slice, int]]],
label_columns: Union[None, List[Union[slice, int]]] = None,
known_columns: Union[None, List[Union[slice, int]]] = None,
date_columns: Union[None, List[Union[slice, int]]] = None,
):
"""Handle array and tensor."""
self._initialized = True
# Converting data into array
data = np.array(data)
self._data = data
self._data_columns = None # Used in `production`
# In case if columns are not defined
self._input_columns = input_columns if input_columns else None
self._label_columns = label_columns if label_columns else None
self._known_columns = known_columns if known_columns else None
self._date_columns = date_columns if date_columns else None
def _split_train_valid_test(self):
"""Create train, valid and test dataset."""
self._dataset = self._make_dataset(self.data)
n_train_examples, n_valid_examples, n_shift_examples = self._get_dataset_sizes(
self._dataset
)
self._train = self._dataset.take(n_train_examples)
self._valid = self._dataset.skip(n_train_examples + n_shift_examples).take(
n_valid_examples
)
self._test = self._dataset.skip(
n_train_examples + n_valid_examples + 2 * n_shift_examples
)
if self.batch_size:
self._train = self._train.unbatch().batch(self.batch_size)
self._valid = self._valid.unbatch().batch(self.batch_size)
self._test = self._test.unbatch().batch(self.batch_size)
def _get_dataset_sizes(self, dataset: tf.data.Dataset):
"""Calculate the sizes of train, valid and test dataset from the provided dataset and window parameters."""
cardinality = dataset.cardinality()
n_shift_examples = math.floor(self.label_width / self.sequence_stride) - 1
n_test_examples = self.test_size
if isinstance(self.test_size, float) and self.test_size <= 1:
n_test_examples = int(cardinality.numpy() * n_test_examples)
n_valid_examples = self.valid_size
if isinstance(self.valid_size, float) and self.valid_size <= 1:
n_valid_examples = int(
(cardinality.numpy() - n_test_examples) * self.valid_size
)
n_train_examples = (
cardinality.numpy()
- n_valid_examples
- n_test_examples
- 2 * n_shift_examples
)
return n_train_examples, n_valid_examples, n_shift_examples
def _make_dataset(
self, data: Union[pd.DataFrame, np.ndarray, tf.Tensor],
) -> tf.data.Dataset:
"""Compute a tensorflow dataset object.
Parameters
----------
data : :literal:`DataFrame`, ndarray or `Tensor of shape (timestep, variables)`
The time series dataset.
batch_size : int
Set up the batch size a.k.a the number of examples per batch.
Returns
-------
ds : :literal:`PrefetchDataset`
The dataset that can be used in keras model.
"""
data = data.astype(floatx())
dataset = tf.keras.preprocessing.timeseries_dataset_from_array(
data=data,
targets=None,
sequence_length=self._total_window_size,
sequence_stride=self.sequence_stride,
shuffle=False,
batch_size=1,
)
dataset = dataset.map(self._split_window, num_parallel_calls=tf.data.AUTOTUNE)
if self._preprocessing is not None:
dataset = dataset.map(
self._preprocessing, num_parallel_calls=tf.data.AUTOTUNE
)
return dataset.prefetch(tf.data.experimental.AUTOTUNE)
def _split_window(self, feature_tensor: tf.Tensor) -> Tuple[tf.Tensor]:
"""
Compute the windows split.
Parameters
----------
feature_tensor : :literal:`tensor of shape (Batch_size, timestep, variables)`
The window defined by `timeseries_dataset_from_array`.
Returns
-------
inputs : :literal:`Tensor`
The input tensor of shape (batch_size, input_width, input_columns)
if `flat` is set to `False` else (batch_size, input_width * input_columns).
known : :literal:`Tensor`
The known tensor of shape (batch_size, input_width, known_columns)
if :literal:`flat` is set to `False` else (batch_size, input_width * known_columns).
Variables whose values are known.
in advance or estimated. For example: time dates or temperatures.
date_inputs : :literal:`Tensor`
Input dates of shape (batch_size, input_width).
Default to a tensor generated by :literal:`tf.range`.
date_labels : :literal:`Tensor`
label dates of shape (batch_size, label_width).
Default to a tensor generated by `tf.range`.
labels : :literal:`Tensor`
The Output variables of shape (batch_size, label_width, label_columns)
if :literal:`flat` is set to :literal:`False` else (batch_size, label_width * label_columns).
"""
# function used to transform the shape of inputs and labels tensors
if self.flat:
func = tf.keras.layers.Flatten()
else:
func = tf.identity
inputs = features(feature_tensor, self._input_slice, self._input_columns)
output = func(inputs)
# TODO: unit testing
if self.known_columns:
known = features(feature_tensor, self._label_slice, self._known_columns)
output = convert_to_list(output)
output.append(func(known))
if self.date_columns:
date_inputs = date_features(
feature_tensor, self._input_slice, self._date_columns
)
date_labels = date_features(
feature_tensor, self._label_slice, self._date_columns
)
output = convert_to_list(output)
output.append(func(date_inputs))
output.append(func(date_labels))
if isinstance(output, list):
output = tuple(output)
if self.label_columns:
labels = features(feature_tensor, self._label_slice, self._label_columns)
return output, func(labels)
return output
[docs] def production(
self,
data: Union[pd.DataFrame, np.array, tf.Tensor],
batch_size: Optional[int] = None,
) -> tf.data.Dataset:
"""
Build the production dataset.
Parameters
----------
data : :literal:`DataFrame of shape (input_width + shift, variables)`
Data to forecast. inputs steps need to be inside data.
Returns
-------
data : :literal:`PrefetchDataset of shape (inputs, known, date_inputs, date_labels), labels`
MapDataset which returns data with shape
((inputs, known, date_inputs, date_labels), labels).
Raises
------
AssertionError
It raises an error if not all columns defined in the constructor method are inside data.
"""
# If a dataframe has been previously initialized then variables columns from the current
# dataframe doesn't need to perfectly match self._data_columns.
if isinstance(data, pd.DataFrame) and self._data_columns is not None:
assert (
data.shape[0] >= self._input_width
), f"The given dataframe doesn't contain enough values, got {data.shape[0]} values, expected at least {self._input_width} values."
# Columns may be none then w e have to translates into []
columns = self._data_columns[
self.input_columns + self.label_columns
if self.label_columns
else [] + self.known_columns
if self.known_columns
else [] + self.date_columns
if self.date_columns
else []
]
assert all(
columns.isin(data.columns)
), f"The given data columns doesn't match the expected columns, got {data.columns}. Expected at least {columns}"
data = data.loc[:, self._data_columns].values
else:
# If an array is provided or a dataframe but `from_dataframe` was not used previously then
# Data shape has to match the specs saved from the methods `from_array` or `from_dataframe`.
assert (
data.shape[0] >= self._input_width
and data.shape[1:] == self.data.shape[1:]
), f"""The given array doesn't contain enough data, got data of shape {data.shape}.
Expected at least shape {(self._input_width, *self.data.shape[1:])}."""
data = self._make_dataset(data)
if batch_size is not None:
data = data.unbatch().batch(batch_size)
return data
def get_config(self):
"""Return the config values."""
return {
"input_width": self.input_width,
"label_width": self.label_width,
"shift": self.shift,
"valid_size": self.valid_size,
"test_size": self.test_size,
"flat": self.flat,
"batch_size": self.batch_size,
"preprocessing": self._preprocessing,
}
@property
def train(self) -> tf.data.Dataset:
"""
Return the train dataset.
Returns
-------
dataset: :literal:`Dataset`
Train dataset. It cannot be empty.
"""
return self._train
@property
def valid(self) -> tf.data.Dataset:
"""
Return the valid dataset.
Returns
-------
dataset: :literal:`Dataset`
"""
return self._valid
@property
def test(self) -> Union[tf.data.Dataset, None]:
"""
Build the test dataset.
Returns
-------
dataset: :literal:`Dataset`
"""
return self._test
@property
def data(self) -> np.ndarray:
"""
Return the original data.
"""
if self._initialized:
return self._data
raise AttributeError(
"""The instance is not initialized.
Call :literal:`from_array` to initialize it."""
)
@data.setter
def data(self, _) -> None:
"""
Set the new data.
"""
raise AttributeError(
"You cannot modify :literal:`data`, use :literal:`from_array` instead."
)
@property
def input_width(self) -> int:
"""
Return the input_width.
"""
return self._input_width
@property
def label_width(self) -> int:
"""
Return the label_width.
"""
return self._label_width
@property
def shift(self) -> int:
"""
Return the shift.
"""
return self._shift
@property
def valid_size(self) -> int:
"""
Return the valid_size.
"""
return self._valid_size
@property
def test_size(self) -> int:
"""
Return the test_size.
"""
return self._test_size
@property
def flat(self):
"""
Return the attribute flat.
"""
return self._flat
@property
def batch_size(self):
"""
Return the attribute batch_size.
"""
return self._batch_size
@property
def sequence_stride(self):
"""
Return the attribute sequence_stride.
"""
return self._sequence_stride
@property
def input_columns(self) -> List[Union[int, slice]]:
"""
Return the input_width.
"""
if self._initialized:
return self._input_columns
raise AttributeError(
"""The instance is not initialized.
Call `from_array` to initialize it."""
)
@input_columns.setter
def input_columns(self, _) -> None:
"""
Set the new data.
"""
raise AttributeError(
"You cannot modify `input_columns`, use `from_array` instead."
)
@property
def label_columns(self) -> List[Union[int, slice]]:
"""
Return the label_columns.
"""
if self._initialized:
return self._label_columns
raise AttributeError(
"""The instance is not initialized.
Call `from_array` to initialize it."""
)
@label_columns.setter
def label_columns(self, _) -> None:
"""
Set the new data.
"""
raise AttributeError(
"You cannot modify `label_columns`, use `from_array` instead."
)
@property
def known_columns(self) -> List[Union[int, slice]]:
"""
Return the known_columns.
"""
if self._initialized:
return self._known_columns
raise AttributeError(
"""The instance is not initialized.
Call `from_array` to initialize it."""
)
@known_columns.setter
def known_columns(self, _) -> None:
"""
Set the new data.
"""
raise AttributeError(
"You cannot modify `known_columns`, use `from_array` instead."
)
@property
def date_columns(self) -> List[Union[int, slice]]:
"""
Return date_columns.
"""
if self._initialized:
return self._date_columns
raise AttributeError(
"""The instance is not initialized.
Call `from_array` to initialize it."""
)
@date_columns.setter
def date_columns(self, _) -> None:
"""
Set the new data.
"""
raise AttributeError(
"You cannot modify `date_columns`, use `from_array` instead."
)
def _val___init__(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
"""
Validates attributes and args of __init__ method.
"""
assert (
self.input_width > 0
), f"The input width has to be strictly positive, got {self.input_width}."
assert (
self.label_width > 0
), f"The label width has to be strictly positive, got {self.label_width}."
assert (
self.shift > 0
), f"The shift has to be strictly positive, got {self.shift}."
assert (
self.label_width < self._total_window_size
), f"The label width has to be equal or lower than {self._total_window_size}, got {self.label_width}"
assert (
self.test_size >= 0
), f"The test size has to be positive or null, got {self.test_size}."
assert (
self.valid_size >= 0
), f"The valid size has to be positive or null, got {self.valid_size}."
if self.batch_size:
assert (
self.batch_size > 0
), f"The batch size has to be strictly positive, got {self.batch_size}."
def _val__from_dataframe(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
"""
Validates attributes and args of :literal:`_from_dataframe` method.
"""
assert len(self.input_columns) > 0, "The input columns list is empty."
assert np.size(self.data), "The given parameter `data` is an empty DataFrame."
def _val__from_array(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
"""
Validates attributes and args of :literal:`_from_array` method.
"""
assert len(self.input_columns) > 0, "The input columns list is empty."
assert np.size(self.data), "The given parameter `data` is an empty DataFrame."
def _val__split_train_valid_test(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
"""
Validates attributes and args of :literal:`_compute_train_valid_test_split` method.
"""
n_train_examples, _, _ = self._get_dataset_sizes(self._dataset)
assert (
n_train_examples
) > 0, f"""The training dataset is empty, please redefine the test size or valid size."""
def _val_from_array(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
"""
Validates attributes and args of :literal:`_val_from_array` method.
"""
assert (
max(self.input_columns) < self.data.shape[1]
), f"""Indice {max(self.input_columns)} superior to data shape {self.data.shape}."""
if self.label_columns:
assert (
max(self.label_columns) < self.data.shape[1]
), f"""Indice {max(self.label_columns)} superior to data shape {self.data.shape}."""
if self.known_columns:
assert (
max(self.known_columns) < self.data.shape[1]
), f"""Indice {max(self.known_columns)} superior to data shape {self.data.shape}."""
if self.date_columns:
assert (
max(self.date_columns) < self.data.shape[1]
), f"""Indice {max(self.date_columns)} superior to data shape {self.data.shape}."""