"""
N-BEATS implementation
"""
from typing import Callable, Union, Tuple, List, Optional
from typing import List, Optional
import tensorflow as tf
from keras.engine import data_adapter
from .training import UnivariateModel
from ..layers import TrendBlock, SeasonalityBlock, GenericBlock, BaseBlock
from ..layers.nbeats_layers import SEASONALITY_TYPE
from .pool import BasePool
[docs]class Stack(UnivariateModel):
"""
A stack is a series of blocks where each block produces two outputs,
the forecast and the backcast.
Inside a stack all forecasts are sum up and compose the stack output.
In the meantime, the backcast is given to the following block.
Parameters
----------
blocks : tuple[:class:`autopycoin.models.BaseBlock`]
Blocks layers. they can be generic, seasonal or trend ones.
You can also define your own block by subclassing `BaseBlock`.
Attributes
----------
blocks : tuple[:class:`autopycoin.models.BaseBlock`]
label_width : int
input_width : int
is_interpretable : bool
stack_type : str
Examples
--------
>>> from autopycoin.layers import TrendBlock, SeasonalityBlock
>>> from autopycoin.models import Stack, NBEATS
>>> from autopycoin.losses import QuantileLossError
...
>>> trend_block = TrendBlock(label_width=20,
... p_degree=2,
... n_neurons=16,
... drop_rate=0.1,
... name="trend_block")
...
>>> seasonality_block = SeasonalityBlock(label_width=20,
... forecast_periods=[10],
... backcast_periods=[20],
... forecast_fourier_order=[10],
... backcast_fourier_order=[20],
... n_neurons=15,
... drop_rate=0.1,
... name="seasonality_block")
...
... # blocks creation
>>> trend_blocks = [trend_block for _ in range(3)]
>>> seasonality_blocks = [seasonality_block for _ in range(3)]
...
... # Stacks creation
>>> trend_stacks = Stack(trend_blocks, name="trend_stack")
>>> seasonality_stacks = Stack(seasonality_blocks, name="seasonality_stack")
...
... # model definition and compiling
>>> model = NBEATS([trend_stacks, seasonality_stacks], name="interpretable_NBEATS")
>>> model.compile(loss=QuantileLossError(quantiles=[0.5]))
Notes
-----
input shape:
N-D tensor with shape: (..., batch_size, time step).
The most common situation would be a 2D input with shape (batch_size, time step).
output shape:
N-D tensor with shape: (..., batch_size, units).
For instance, for a 2D input with shape (batch_size, units),
the output would have shape (batch_size, units).
With a QuantileLossError with 2 quantiles or higher the output would have shape (quantiles, batch_size, units).
If you add 2 variables, the output would have shape (variables, quantiles, batch_size, units).
"""
def __init__(
self,
blocks: Tuple[BaseBlock, ...],
apply_quantiles_transpose: bool = False,
apply_multivariate_transpose: bool = False,
*args: list,
**kwargs: dict,
):
super().__init__(
apply_quantiles_transpose=apply_quantiles_transpose,
apply_multivariate_transpose=apply_multivariate_transpose,
*args,
**kwargs,
)
self._blocks = blocks
self._stack_type = self._set_type()
self._is_interpretable = self._set_interpretability()
def call(
self, inputs: Union[tuple, dict, list, tf.Tensor], **kwargs: dict
) -> Tuple[tf.Tensor, ...]:
"""Call method from tensorflow."""
if isinstance(inputs, (tuple, list)):
inputs = inputs[0]
outputs = tf.constant(0.0) # init output
for block in self.blocks:
reconstructed_inputs, residual_outputs = block(inputs)
inputs = tf.subtract(inputs, reconstructed_inputs)
outputs = tf.add(outputs, residual_outputs)
return inputs, outputs
def get_config(self) -> dict:
"""See tensorflow documentation."""
config = super().get_config()
config.update({"blocks": self.blocks})
return config
def _set_type(self) -> str:
"""Return the type of the stack."""
block_type = self.blocks[0].block_type
for block in self.blocks:
if block.block_type != block_type:
return "CustomStack"
return block_type.replace("Block", "") + "Stack"
def _set_interpretability(self) -> bool:
"""True if the stack is interpretable else False."""
interpretable = all([block.is_interpretable for block in self.blocks])
if interpretable:
return True
return False
@property
def label_width(self) -> int:
"""Return the label width."""
return self.blocks[0].label_width
@property
def input_width(self) -> int:
"""Return the input width."""
return self.blocks[0].input_width
@property
def blocks(self) -> List[BaseBlock]:
"""Return the list of blocks."""
return self._blocks
@property
def stack_type(self) -> str:
"""Return the type of the stack.
`CustomStack` if the blocks are all differents."""
return self._stack_type
@property
def is_interpretable(self) -> bool:
"""Return True if the stack is interpretable."""
return self._is_interpretable
def __repr__(self):
return self.stack_type
[docs]class NBEATS(UnivariateModel):
"""
Tensorflow model defining the N-BEATS architecture.
N-BEATS is a univariate model, see :class:`autopycoin.models.UnivariateModel` for more information.
Its strong advantage resides in its structure which allows us to extract the trend and the seasonality of
temporal series. They are available from the attributes `seasonality` and `trend`.
This is an unofficial implementation of the paper https://arxiv.org/abs/1905.10437.
Parameters
----------
stacks : tuple[:class:`autopycoin.models.Stack`]
Stacks can be created from :class:`autopycoin.models.TrendBlock`,
:class:`autopycoin.models.SeasonalityBlock` or :class:`autopycoin.models.GenericBlock`.
See stack documentation for more details.
Attributes
----------
stacks : tuple[`Tensor`]
seasonality : `Tensor`
Seasonality component of the output.
trend : `Tensor`
Trend component of the output.
stack_outputs : `Tensor`
is_interpretable : bool
nbeats_type : str
label_width : int
input_width : int
Examples
--------
>>> from autopycoin.layers import TrendBlock, SeasonalityBlock
>>> from autopycoin.models import Stack, NBEATS
>>> from autopycoin.losses import QuantileLossError
>>> from autopycoin.data import random_ts
>>> from autopycoin.dataset import WindowGenerator
>>> import tensorflow as tf
>>> import pandas as pd
...
>>> data = random_ts(n_steps=1000,
... trend_degree=2,
... periods=[10],
... fourier_orders=[10],
... trend_mean=0,
... trend_std=1,
... seasonality_mean=0,
... seasonality_std=1,
... batch_size=1,
... n_variables=1,
... noise=True,
... seed=42)
>>> data = pd.DataFrame(data[0].numpy(), columns=['test'])
...
>>> w = WindowGenerator(
... input_width=20,
... label_width=10,
... shift=10,
... test_size=50,
... valid_size=10,
... flat=True,
... batch_size=32,
... preprocessing=lambda x,y: (x, (x, y))
... )
...
>>> w = w.from_array(data=data,
... input_columns=['test'],
... label_columns=['test'])
>>>
>>> trend_block = TrendBlock(label_width=w.label_width,
... p_degree=2,
... n_neurons=16,
... drop_rate=0.1,
... name="trend_block")
>>>
>>> seasonality_block = SeasonalityBlock(label_width=w.label_width,
... forecast_periods=[10],
... backcast_periods=[20],
... forecast_fourier_order=[10],
... backcast_fourier_order=[20],
... n_neurons=15,
... drop_rate=0.1,
... name="seasonality_block")
>>>
>>> trend_blocks = [trend_block for _ in range(3)]
>>> seasonality_blocks = [seasonality_block for _ in range(3)]
>>> trend_stacks = Stack(trend_blocks, name="trend_stack")
>>> seasonality_stacks = Stack(seasonality_blocks, name="seasonality_stack")
>>>
>>> model = NBEATS([trend_stacks, seasonality_stacks], name="interpretable_NBEATS")
>>> model.compile(loss=QuantileLossError(quantiles=[0.5]))
>>> history = model.fit(w.train, verbose=0)
Notes
-----
NBEATS supports the estimation of aleotoric and epistemic errors with:
- Aleotoric interval : :class:`autopycoin.loss.QuantileLossError`
- Epistemic interval : MCDropout
You can use :class:`autopycoin.loss.QuantileLossError` as loss error to estimate the
aleotoric error. Also, run multiple times a prediction with `drop_date` > 0 to estimate
the epistemic error.
*Input shape*
N-D tensor with shape: (batch_size, time step, variables) or (batch_size, time step).
The most common situation would be a 2D input with shape (batch_size, time step).
*Output shape*
Two N-D tensor with shape: (batch_size, time step, variables, quantiles) or (batch_size, time step, quantiles)
or (batch_size, time step).
For instance, for a 2D input with shape (batch_size, units),
the output would have shape (batch_size, units).
With a QuantileLossError with 2 quantiles or higher the output
would have shape (batch_size, units, quantiles).
With a multivariate inputs the output
would have shape (batch_size, units, variates, quantiles).
"""
def __init__(self, stacks: Tuple[Stack, ...], *args: list, **kwargs: dict):
super().__init__(*args, **kwargs)
# Stacks where blocks are defined
self._stacks = stacks
self._is_interpretable = self._set_interpretability()
self._nbeats_type = self._set_type()
def build(
self, input_shape: Union[tf.TensorShape, Tuple[tf.TensorShape, ...]]
) -> None:
"""See tensorflow documentation."""
if isinstance(input_shape, tuple):
input_shape = input_shape[0]
super().build(input_shape)
[docs] def compile(
self,
optimizer="rmsprop",
loss=None,
metrics=None,
loss_weights=[0.0, 1.0],
weighted_metrics=None,
run_eagerly=None,
steps_per_execution=None,
**kwargs,
) -> None:
super().compile(
optimizer=optimizer,
loss=loss,
metrics=metrics,
loss_weights=loss_weights,
weighted_metrics=weighted_metrics,
run_eagerly=run_eagerly,
steps_per_execution=steps_per_execution,
**kwargs,
)
def call(
self, inputs: Union[tuple, dict, list, tf.Tensor], **kwargs: dict
) -> tf.Tensor:
"""Call method from tensorflow."""
if isinstance(inputs, (tuple, list)):
inputs = inputs[0]
residual_inputs = tf.identity(inputs)
outputs = tf.constant(0.0)
for stack in self.stacks:
residual_inputs, residual_outputs = stack(residual_inputs)
outputs = tf.math.add(outputs, residual_outputs)
reconstructed_inputs = inputs - residual_inputs
return reconstructed_inputs, outputs
[docs] def seasonality(self, data: tf.Tensor) -> tf.Tensor:
"""
Based on the paper, the seasonality is available if
the previous stacks are composed by trend blocks.
Else, it doesn't correspond to seasonality.
Parameters
----------
data : `Tensor`
input data.
Returns
-------
seasonality : `Tensor`
Same shape as call inputs (see notes).
Raises
------
AttributeError
If all previous stacks are not composed
by trend blocks then an error is raised.
AssertionError
if no `SeasonalityStack` are defined.
"""
msg_error = f"""The first stack has to be a `TrendStack`,
hence seasonality doesn't exists . Got {self.stacks}."""
for idx, stack in enumerate(self.stacks):
if stack.stack_type != "TrendStack" and idx == 0:
raise AttributeError(msg_error)
elif stack.stack_type == "SeasonalityStack":
start = idx
elif stack.stack_type == "TrendStack":
continue
else:
break
if "start" not in locals():
raise AttributeError(f"No `SeasonalityStack` defined. Got {self.stacks}")
for stack in self.stacks[:start]:
data, _ = stack(data)
for stack in self.stacks[start : idx + 1]:
data, residual_seas = stack(data)
if "seasonality" not in locals():
seasonality = residual_seas
else:
seasonality += residual_seas
return seasonality
[docs] def trend(self, data: tf.Tensor) -> tf.Tensor:
"""
The trend component of the output.
Returns
-------
trend : `Tensor`
Same shape as call inputs (see notes).
Raises
------
AttributeError
Raises an error if previous stacks are not `TrendBlock`.
"""
for idx, stack in enumerate(self.stacks):
if stack.stack_type != "TrendStack":
break
msg = f"""No `TrendStack` defined. Got {self.stacks}.
`TrendStack` has to be defined as first stack."""
if idx == 0:
raise AttributeError(msg)
for stack in self.stacks[: idx + 1]:
data, residual_trend = stack(data)
if "trend" not in locals():
trend = residual_trend
else:
trend += residual_trend
return trend
def get_config(self) -> dict:
"""Get_config from tensorflow."""
return {"stacks": self.stacks}
def _set_interpretability(self) -> bool:
"""check if interpretable or not."""
return all(stack.is_interpretable for stack in self.stacks)
def _set_type(self):
"""Defines the type of Nbeats."""
if self.is_interpretable:
return "InterpretableNbeats"
return "Nbeats"
@property
def label_width(self) -> int:
"""Return the label width."""
return self.stacks[0].label_width
@property
def input_width(self) -> int:
"""Return the input width."""
return self.input_width
@property
def stacks(self) -> int:
"""Return the input width."""
return self._stacks
@property
def is_interpretable(self) -> bool:
"""Return True if the model is interpretable."""
return self._is_interpretable
@property
def nbeats_type(self) -> str:
"""Return the Nbeats type."""
return self._nbeats_type
def __repr__(self):
return self._nbeats_type
NbeatsModelsOptions = Union[
Union[List[NBEATS], NBEATS], Union[List[Callable], Callable],
]
[docs]def create_interpretable_nbeats(
label_width: int,
forecast_periods: SEASONALITY_TYPE = None,
backcast_periods: SEASONALITY_TYPE = None,
forecast_fourier_order: SEASONALITY_TYPE = None,
backcast_fourier_order: SEASONALITY_TYPE = None,
p_degree: int = 1,
trend_n_neurons: int = 252,
seasonality_n_neurons: int = 2048,
drop_rate: float = 0.0,
share: bool = True,
name: str = "interpretable_NBEATS",
**kwargs: dict,
):
"""
Wrapper which create an interpretable model as described in the original paper.
Two stacks are created with 3 blocks each. The first entirely composed by trend blocks,
The second entirely composed by seasonality blocks.
Within the same stack, it is possible to share the weights between blocks.
Parameters
----------
label_width : int
Past to rebuild. Usually, label_width = n * input width with n between 1 and 7.
forecast_periods : Tuple[int, ...]
Compute the fourier serie period in the forecast equation.
if a list is provided then all periods are taken.
backcast_periods : Tuple[int, ...]
Compute the fourier serie period in the backcast equation.
if a list is provided then all periods are taken.
forecast_fourier_order : Tuple[int, ...]
Compute the fourier order. each order element refers to its respective period.
backcast_fourier_order : Tuple[int, ...]
Compute the fourier order. each order element refers to its respective back period.
p_degree : int
Degree of the polynomial function. It needs to be > 0.
trend_n_neurons : int
Number of neurons in th Fully connected trend layers.
seasonality_n_neurons: int
Number of neurons in Fully connected seasonality layers.
drop_rate : float
Rate of the dropout layer. This is used to estimate the epistemic error.
Expected a value between 0 and 1. Default to 0.
share : bool
If True, the weights are shared between blocks inside a stack. Dafault to True.
Returns
-------
model : :class:`autopycoin.models.NBEATS`
Return an interpetable model with two stacks. One composed by 3 `TrendBlock`
objects and a second composed by 3 `SeasonalityBlock` objects.
Examples
--------
>>> from autopycoin.models import create_interpretable_nbeats
>>> from autopycoin.losses import QuantileLossError
>>> model = create_interpretable_nbeats(label_width=3,
... forecast_periods=[2],
... backcast_periods=[3],
... forecast_fourier_order=[2],
... backcast_fourier_order=[3],
... p_degree=1,
... trend_n_neurons=16,
... seasonality_n_neurons=16,
... drop_rate=0.1,
... share=True)
>>> model.compile(loss=QuantileLossError(quantiles=[0.5]))
"""
if share is True:
trend_block = TrendBlock(
label_width=label_width,
p_degree=p_degree,
n_neurons=trend_n_neurons,
drop_rate=drop_rate,
name="trend_block",
)
seasonality_block = SeasonalityBlock(
label_width=label_width,
forecast_periods=forecast_periods,
backcast_periods=backcast_periods,
forecast_fourier_order=forecast_fourier_order,
backcast_fourier_order=backcast_fourier_order,
n_neurons=seasonality_n_neurons,
drop_rate=drop_rate,
name="seasonality_block",
)
trend_blocks = [trend_block for _ in range(3)]
seasonality_blocks = [seasonality_block for _ in range(3)]
else:
trend_blocks = [
TrendBlock(
label_width=label_width,
p_degree=p_degree,
n_neurons=trend_n_neurons,
drop_rate=drop_rate,
name="trend_block",
)
for _ in range(3)
]
seasonality_blocks = [
SeasonalityBlock(
label_width=label_width,
forecast_periods=forecast_periods,
backcast_periods=backcast_periods,
forecast_fourier_order=forecast_fourier_order,
backcast_fourier_order=backcast_fourier_order,
n_neurons=seasonality_n_neurons,
drop_rate=drop_rate,
name="seasonality_block",
)
for _ in range(3)
]
trend_stacks = Stack(trend_blocks, name="trend_stack")
seasonality_stacks = Stack(seasonality_blocks, name="seasonality_stack")
model = NBEATS([trend_stacks, seasonality_stacks], name=name, **kwargs)
return model
[docs]def create_generic_nbeats(
label_width: int,
g_forecast_neurons: int = 524,
g_backcast_neurons: int = 524,
n_neurons: int = 524,
n_blocks: int = 1,
n_stacks: int = 30,
drop_rate: float = 0.0,
share: bool = False,
name: str = "generic_NBEATS",
**kwargs: dict,
):
"""
Wrapper which create a generic model as described in the original paper.
In the same stack, it is possible to share the weights between blocks.
Parameters
----------
label_width : int
Past to rebuild. Usually, label_width = n * input width with n between 1 and 7.
n_neurons : int
Number of neurons in th Fully connected generic layers.
n_blocks : int
Number of blocks per stack.
n_stacks : int
Number of stacks in the model.
drop_rate : float
Rate of the dropout layer. This is used to estimate the epistemic error.
Expected a value between 0 and 1. Default to 0.
share : bool
If True, the weights are shared between blocks inside a stack. Default to True.
Returns
-------
model : :class:`autopycoin.models.NBEATS`
Return an generic model with n stacks defined by the parameter `n_stack`
and respoectively n blocks defined by `n_blocks`.
Examples
--------
>>> from autopycoin.models import create_generic_nbeats
>>> from autopycoin.losses import QuantileLossError
>>> model = create_generic_nbeats(label_width=3,
... g_forecast_neurons=16,
... g_backcast_neurons=16,
... n_neurons=16,
... n_blocks=3,
... n_stacks=3,
... drop_rate=0.1,
... share=True)
>>> model.compile(loss=QuantileLossError(quantiles=[0.5]))
"""
generic_stacks = []
if share is True:
for _ in range(n_stacks):
generic_block = GenericBlock(
label_width=label_width,
g_forecast_neurons=g_forecast_neurons,
g_backcast_neurons=g_backcast_neurons,
n_neurons=n_neurons,
drop_rate=drop_rate,
name="generic_block",
)
generic_blocks = [generic_block for _ in range(n_blocks)]
generic_stacks.append(Stack(generic_blocks, name="generic_stack"))
else:
for _ in range(n_stacks):
generic_blocks = [
GenericBlock(
label_width=label_width,
g_forecast_neurons=g_forecast_neurons,
g_backcast_neurons=g_backcast_neurons,
n_neurons=n_neurons,
drop_rate=drop_rate,
name="generic_block",
)
for _ in range(n_blocks)
]
generic_stacks.append(Stack(generic_blocks, name="generic_stack"))
model = NBEATS(generic_stacks, name=name, **kwargs)
return model
# TODO: finish doc and unit testing.
[docs]class PoolNBEATS(BasePool):
"""
Tensorflow model defining a pool of N-BEATS models.
As described in the paper https://arxiv.org/abs/1905.10437, the state-of-the-art results
are reached with a bagging method of N-BEATS models including interpretable and generic ones.
The aggregation function is used in predict `method` if it is possible, i.e when the outputs shape are not differents.
As the reconstructed inputs are masked randomly the aggregation is not perfomed on them.
Fore more information about poll model see :class:`autopycoin.models.Pool`.
Parameters
----------
label_width : int
Width of the targets.
It can be not defined if `nbeats_model` is a list of NBEATS instances.
Default to None.
n_models : int
Number of models inside the pool.
The minimum value according to the paper to get SOTA results is 18.
If NBEATS instances are provided then n_models is not used.
Default to 18.
nbeats_models : list[callable] or list[NBEATS]
A list of callables which create a NBEATS model or a list of :class:`autopycoin.models.NBEATS` instances.
If None then use a mix of generic and interpretable NBEATs model.
Default to None.
fn_agg : Callable
Function of aggregation which takes an parameter axis.
It aggregates the models outputs. Default to mean.
seed: int
Used in combination with tf.random.set_seed to create a
reproducible sequence of tensors across multiple calls.
Returns
-------
outputs : Tuple[Tuple[`Tensor` | QuantileTensor | UnivariateTensor], Tuple[`Tensor` | QuantileTensor | UnivariateTensor]]
Return the reconstructed inputs and inferred outputs as tuple (reconstructed inputs, outputs).
Reconstructed inputs is a tuple of tensors as the mask is not the same through models.
Outputs can be a tuple of tensors or an aggregated tensor if the prediction is used through `predict` method.
Attributes
----------
see :class:`autopycoin.models.Pool`
Examples
--------
>>> from autopycoin.data import random_ts
>>> from autopycoin.models import PoolNBEATS, create_interpretable_nbeats
>>> from autopycoin.dataset import WindowGenerator
>>> import tensorflow as tf
>>> import pandas as pd
...
>>> data = random_ts(n_steps=1000,
... trend_degree=2,
... periods=[10],
... fourier_orders=[10],
... trend_mean=0,
... trend_std=1,
... seasonality_mean=0,
... seasonality_std=1,
... batch_size=1,
... n_variables=1,
... noise=True,
... seed=42)
>>> data = pd.DataFrame(data[0].numpy(), columns=['test'])
...
>>> w = WindowGenerator(
... input_width=70,
... label_width=10,
... shift=10,
... test_size=50,
... valid_size=10,
... flat=True,
... batch_size=32,
... preprocessing=lambda x,y: (x, (x, y))
... )
...
>>> w = w.from_array(data=data,
... input_columns=['test'],
... label_columns=['test'])
...
>>> model = PoolNBEATS(
... label_width=10,
... n_models=2,
... nbeats_models=create_interpretable_nbeats,
... )
>>> model.compile(tf.keras.optimizers.Adam(
... learning_rate=0.015, beta_1=0.9, beta_2=0.999, epsilon=1e-07, amsgrad=True,
... name='Adam'), loss=[['mse', 'mse'], ['mae', 'mae'], ['mape', 'mape']], metrics=['mae'])
>>> history = model.fit(w.train, validation_data=w.valid, epochs=1, verbose=0)
>>> model.predict(w.test.take(1))[1].shape
(32, 10)
Notes
-----
PoolNBEATS is just a wrapper around nbeats models hence you can use epistemic loss error
or multivariates inputs.
This class only applies mask to its inputs.
*Input shape*
N-D tensor with shape: (batch_size, time step, variables) or (batch_size, time step).
The most common situation would be a 2D input with shape (batch_size, time step).
*Output shape*
n N-D tensors with shape: (batch_size, time step, variables, quantiles) or (batch_size, time step, quantiles)
or (batch_size, time step) with n the number of models generated randomly or registered in the constructor.
For instance, for a 2D input with shape (batch_size, units) and three models,
the output would have shape
(((batch_size, units), (batch_size, units), (batch_size, units)), ((batch_size, units), (batch_size, units), (batch_size, units)))
if call is used else
(((batch_size, units), (batch_size, units), (batch_size, units)), (batch_size, units)) if predict is used.
The outputs tensors can be aggregated during `predict` method only if all tensors are similar in shape.
"""
def __init__(
self,
label_width: int = None,
n_models: int = 18,
nbeats_models: Union[None, NbeatsModelsOptions] = [
create_interpretable_nbeats,
create_generic_nbeats,
],
fn_agg: Callable = tf.reduce_mean,
seed: Optional[int] = None,
**kwargs: dict,
):
super().__init__(
label_width=label_width,
n_models=n_models,
models=nbeats_models,
fn_agg=fn_agg,
seed=seed,
**kwargs,
)
[docs] def compile(
self,
optimizer="rmsprop",
loss=None,
metrics=None,
loss_weights=[0.0, 1.0],
weighted_metrics=None,
run_eagerly=None,
steps_per_execution=None,
**kwargs,
) -> None:
self.check_valid_structure(loss, name='loss')
self.check_valid_structure(loss_weights, name='loss_weights')
super().compile(
optimizer=optimizer,
loss=loss,
metrics=metrics,
loss_weights=loss_weights,
weighted_metrics=weighted_metrics,
run_eagerly=run_eagerly,
steps_per_execution=steps_per_execution,
**kwargs,
)
# TODO: test
[docs] def check_valid_structure(self, structure, name):
"""Check if loss and loss_weights are list of lists or a list of two elements."""
if not isinstance(structure, (list, tuple)):
raise ValueError(f'{name} has to be a list or a tuple, got {structure}')
elif all(isinstance(l, (list, tuple)) for l in structure):
if any(len(l)!=2 for l in structure):
raise ValueError(f'In case of list of list, {name} elements has to be length 2, got {structure}')
elif len(structure) != 2 :
raise ValueError(f'if {name} is not a list of list, {name} has to be length 2, got {structure}')
[docs] def checks(self, nbeats_models: List[NBEATS]) -> None:
"""Check if `label_width` are equals through models instances."""
labels_width = [model.label_width for model in nbeats_models]
# If `label_width` is defined in the init then use it to check models else use the first model value.
self._label_width = self.label_width or labels_width[0]
assert all([label_width == self.label_width for label_width in labels_width]), (
f"`label_width` parameter has to be identical through models and against the value given in the init method. "
f"Got {labels_width} for models and `label_width` = {self.label_width}"
)
def build(
self, input_shape: Union[tf.TensorShape, Tuple[tf.TensorShape, ...]]
) -> None:
"""See tensorflow documentation."""
# Defines masks
mask = tf.random.uniform(
(self.n_models,),
minval=0,
maxval=int(input_shape[1] / self.label_width) or 1,
dtype=tf.int32,
seed=self.seed,
)
self._mask = input_shape[1] - (mask * self.label_width)
super().build(input_shape)
def call(
self, inputs: Union[tuple, dict, list, tf.Tensor], **kwargs: dict
) -> tf.Tensor:
"""Call method from tensorflow Model.
Make prediction with every models generated during the constructor method.
"""
output_fn = lambda idx: self.models[idx](inputs[:, -self._mask[idx] :])
outputs = tf.nest.map_structure(
output_fn, [idx for idx in range(self.n_models)]
)
return outputs
[docs] @tf.function
def preprocessing_x(
self,
x: Union[None, Union[Union[tf.Tensor, tf.data.Dataset], Tuple[tf.Tensor, ...]]],
) -> Union[Tuple[None, None], Tuple[Callable, tuple]]:
"Apply mask inside `train_step`"
# Build masks from PoolNBEATS `build` method
self._maybe_build(x)
masked_x = None
if x is not None:
masked_x = [x[:, -self._mask[idx] :] for idx in range(self.n_models)]
return masked_x
[docs] @tf.function
def preprocessing_y(
self,
y: Union[None, Union[Union[tf.Tensor, tf.data.Dataset], Tuple[tf.Tensor, ...]]],
) -> Union[Tuple[None, None], Tuple[Callable, tuple]]:
"Apply mask inside `train_step`, `test_step`"
masked_y = None
if y is not None:
masked_y = [
(y[0][:, -self._mask[idx] :], y[1]) for idx in range(self.n_models)
]
return masked_y
[docs] @tf.function
def postprocessing_y(self, y):
"Apply mask inside `predict_step`"
inputs_reconstucted = [outputs[0] for outputs in y]
y = [outputs[1] for outputs in y]
if any(outputs.quantiles for outputs in y):
return inputs_reconstucted, y
return inputs_reconstucted, self.fn_agg(y, axis=0)