from typing import Tuple, List, Union
import numpy as np
import abc
import tensorflow as tf
from keras.layers import Dropout, InputSpec
from keras.backend import floatx
from ..utils import range_dims, convert_to_list
from .base_layer import UnivariateLayer
from ..asserts import greater_or_equal, equal_length, is_between
from ..constant import TENSOR_TYPE
[docs]class BaseBlock(UnivariateLayer):
"""
Base class of a nbeats block.
Your custom block needs to inherit from it.
Parameters
----------
label_width : int
Horizon time to forecast.
n_neurons : int
Number of neurons in the fully connected layers.
drop_rate : float
Rate of the dropout layer.
This is used to estimate the epistemic error.
Expected a value between 0 and 1. Default to 0.
name : str
The name of the layer. It defines also the `block_type` attribute.
Attributes
----------
label_width : int
input_width : int
input_spec : `ÌnputSpec`
drop_rate : float
is_interpretable : bool
is_g_trainable : bool
block_type : str
Raises
------
ValueError
If `name` doesn't contain `Block`.
`drop_rate` is not between 0 and 1.
All others arguments are not strictly positive integers.
"""
NOT_INSPECT = ["call", "_output", "build"]
def __init__(
self,
label_width: int,
n_neurons: int,
drop_rate: float,
g_trainable: bool = False,
interpretable: bool = False,
block_type: str = "BaseBlock",
**kwargs: dict,
):
super().__init__(**kwargs)
self._label_width = label_width
self._drop_rate = drop_rate
self._n_neurons = n_neurons
self._is_g_trainable = g_trainable
self._is_interpretable = interpretable
self._block_type = block_type
# TODO: put input_spec in UnivariateLayer
def build(self, inputs_shape: tf.TensorShape) -> None:
"""See tensorflow documentation."""
dtype = tf.as_dtype(self.dtype or tf.float32())
if not (dtype.is_floating or dtype.is_complex):
raise TypeError(
f"Unable to build `{self.name}` layer with "
"non-floating point dtype %s" % (dtype,)
)
inputs_shape = tf.TensorShape(inputs_shape)
self._input_width = tf.compat.dimension_value(inputs_shape[-1])
if self.input_width is None:
raise ValueError(
f"The last dimension of the inputs"
f" should be defined. Found {self.input_width}."
)
self.input_spec = InputSpec(min_ndim=2, axes={-1: self.input_width})
# Computing fc layers
dim = self.input_width
self.fc_stack = []
for count in range(4):
self.fc_stack.append(
(
self.add_weight(
shape=self.n_variates + [dim, self._n_neurons],
name=f"fc_kernel_{self.name}_{count}",
),
self.add_weight(
shape=self.n_variates + [1, self._n_neurons],
initializer="zeros",
name=f"fc_bias_{self.name}_{count}",
),
)
)
dim = self._n_neurons
self.dropout = Dropout(self.drop_rate)
self.fc_forecast, self.forecast_coef = self._build_branch(
self.label_width, branch_name="forecast"
)
self.fc_backcast, self.backcast_coef = self._build_branch(
self.input_width, branch_name="backcast"
)
super().build(inputs_shape)
def _build_branch(self, output_last_dim: int, branch_name: str) -> None:
"""
Build forecast and backcast branches.
"""
coef = self.get_coefficients(output_last_dim, branch_name=branch_name)
# If the model is compiling with a loss error defining uncertainty then
# broadcast the output to take into account this uncertainty.
shape_fc = [self._n_neurons, coef.shape[-2]]
if branch_name == "forecast":
# We place quantiles after multivariates
shape_fc = self.get_additional_shapes(0) + shape_fc
elif branch_name == "backcast":
shape_fc = self.n_variates + shape_fc
# If multivariate inputs then we modify the shape of fc layers
fc = self.add_weight(shape=shape_fc, name=f"fc_{branch_name}_{self.name}")
return fc, coef
[docs] @abc.abstractmethod
def get_coefficients(self, output_last_dim: int, branch_name: str) -> tf.Tensor:
"""
Return the coefficients used in the forecast and backcast layer a.k.a g layer
by calling coefficient_factory.
This method needs to be overriden.
Raises
------
NotImplementedError
If not overriden
ValueError
Raise an error if the coefficients tensor shape is not equal to
(d0, ..., output_first_dim_forecast, label_width).
"""
raise NotImplementedError(
"When subclassing the `BaseBlock` class, you should "
"implement a `_get_forecast_coefficients` method."
)
[docs] @abc.abstractmethod
def coefficient_factory(self, *args: list, **kwargs: dict) -> tf.Tensor:
"""
Create the coefficients used in the last layer a.k.a g constrained layer.
This method needs to be overriden.
Raises
------
NotImplementedError
If not overriden
"""
raise NotImplementedError(
"When subclassing the `BaseBlock` class, you should "
"implement a `coefficient_factory` method."
)
def call(
self, inputs: TENSOR_TYPE, **kwargs: dict
) -> TENSOR_TYPE: # pylint: disable=arguments-differ
"""See tensorflow documentation."""
for kernel, bias in self.fc_stack:
# shape: (Batch_size, n_neurons)
inputs = tf.add(tf.matmul(inputs, kernel), bias)
inputs = tf.keras.activations.relu(inputs)
# TODO: change dropout to let the user choose
inputs = self.dropout(inputs, training=True)
# shape: (Batch_size, backcast)
reconstructed_inputs = self._output(
inputs, self.fc_backcast, self.backcast_coef
) # layers fc and coef created in _build_branch
# shape: (quantiles, Batch_size, forecast)
outputs = self._output(
inputs, self.fc_forecast, self.forecast_coef
) # layers fc and coef created in _build_branch
return reconstructed_inputs, outputs
def _output(
self, inputs: tf.Tensor, fc: tf.Tensor, coef: tf.Tensor
) -> Tuple[tf.Tensor]: # pylint: disable=arguments-differ
"""Call method."""
# shape: (Batch_size, output_first_dim_backcast)
theta = tf.matmul(inputs, fc)
return tf.matmul(theta, coef)
def compute_output_shape(
self, inputs_shape: tf.TensorShape
) -> Tuple[tf.TensorShape]:
"""See tensorflow documentation."""
inputs_shape = tf.TensorShape(inputs_shape)
inputs_shape = inputs_shape.with_rank_at_least(2)
last_dim_input = tf.compat.dimension_value(inputs_shape[-1])
if last_dim_input is None:
raise ValueError(
"The innermost dimension of inputs_shape must be defined, but saw: %s"
% (inputs_shape,)
)
output_shape_forecast = (
[inputs_shape[0]] + self.get_additional_shapes(0) + [self.label_width]
)
return [
inputs_shape,
tf.TensorShape(output_shape_forecast),
]
@property
def label_width(self) -> int:
"""Return the label_width."""
return self._label_width
@property
def input_width(self) -> int:
"""Return the input_width."""
return self._input_width
@property
def drop_rate(self) -> float:
"""Return the drop rate."""
return self._drop_rate
@property
def is_interpretable(self) -> bool:
"""Return True if the block is interpretable."""
return self._is_interpretable
@property
def is_g_trainable(self) -> bool:
"""Return True if the last layer is trainable."""
return self._is_g_trainable
@property
def block_type(self) -> str:
"""Return the block type. Default to `BaseBlock`."""
return self._block_type
def __repr__(self) -> str:
"""Return the representation."""
return self._block_type
def _val___init__(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
is_between(self.drop_rate, 0, 1, "drop_rate")
greater_or_equal(self.label_width, 0, "label_width")
greater_or_equal(self._n_neurons, 0, "n_neurons")
if "Block" not in self.block_type:
raise ValueError(f"`name` has to contain `Block`. Got {self.name}")
[docs]class TrendBlock(BaseBlock):
"""
Trend block definition.
This layer represents the smaller part of the nbeats model.
Final layers are constrained which define a polynomial function of small degree p.
Therefore it is possible to get explanation from this block.
Parameters
----------
label_width : int
Horizon time to forecast.
p_degree : int | float
Degree of the polynomial function.
n_neurons : int
Number of neurons in Fully connected layers.
drop_rate : float
Rate of the dropout layer. This is used to estimate the epistemic error.
Expected a value between 0 and 1. Default to 0.
Attributes
----------
label_width : int
input_width : int
p_degree : int
input_spec : `ÌnputSpec`
drop_rate : float
is_interpretable : bool
is_g_trainable : bool
block_type : str
Raises
------
ValueError
`drop_rate` is not between 0 and 1.
All others arguments are not strictly positive integers.
Examples
--------
>>> from autopycoin.layers import TrendBlock, SeasonalityBlock
>>> from autopycoin.models import Stack, NBEATS
>>> from autopycoin.losses import QuantileLossError
>>> trend_block = TrendBlock(label_width=10,
... p_degree=2,
... n_neurons=16,
... drop_rate=0.1,
... name="trend_block")
>>> seasonality_block = SeasonalityBlock(label_width=10,
... forecast_periods=[10],
... backcast_periods=[20],
... forecast_fourier_order=[10],
... backcast_fourier_order=[20],
... n_neurons=15,
... drop_rate=0.1,
... name="seasonality_block")
>>> trend_blocks = [trend_block for _ in range(3)]
>>> seasonality_blocks = [seasonality_block for _ in range(3)]
>>> trend_stacks = Stack(trend_blocks, name="trend_stack")
>>> seasonality_stacks = Stack(seasonality_blocks, name="seasonality_stack")
>>> model = NBEATS([trend_stacks, seasonality_stacks], name="interpretable_NBEATS")
Notes
-----
input shape:
N-D tensor with shape: (..., batch_size, time step).
The most common situation would be a 2D input with shape (batch_size, time step).
output shape:
N-D tensor with shape: (..., batch_size, units).
For instance, for a 2D input with shape (batch_size, units),
the output would have shape (batch_size, units).
With a QuantileLossError with 2 quantiles or higher the output would have shape (quantiles, batch_size, units).
If you add 2 variables, the output would have shape (variables, quantiles, batch_size, units).
"""
def __init__(
self,
label_width: int,
p_degree: Union[int, float] = 2,
n_neurons: int = 32,
drop_rate: float = 0.0,
**kwargs: dict,
):
super().__init__(
label_width=label_width,
n_neurons=n_neurons,
drop_rate=drop_rate,
g_trainable=False,
interpretable=True,
block_type="TrendBlock",
**kwargs,
)
# Shape (-1, 1) in order to broadcast label_width to all p degrees
self._p_degree = p_degree
[docs] def coefficient_factory(
self, output_last_dim: int, p_degrees: tf.Tensor
) -> tf.Tensor:
"""
Compute the coefficients used in the last layer a.k.a g layer.
Parameters
----------
output_last_dim : int
p_degree: int
Returns
-------
coefficients : `tensor with shape (p_degree, label_width)`
Coefficients of the g layer.
"""
return tf.math.pow(
(tf.range(output_last_dim, dtype=floatx()) / output_last_dim), p_degrees
)
[docs] def get_coefficients(self, output_last_dim: int, branch_name: str) -> tf.Tensor:
"""
Return the coefficients calculated by the `_coefficients_factory` method.
Parameters
----------
output_last_dim : int
branch_name : str
Returns
-------
coefficients : `weight with shape (p_degree, label_width)`
Coefficients of the g layer.
"""
# Set weights with calculated coef
coef = self.coefficient_factory(
output_last_dim, range_dims(self.p_degree + 1, shape=(-1, 1)),
)
return self.add_weight(
shape=coef.shape,
initializer=tf.constant_initializer(coef.numpy()),
trainable=self.is_g_trainable,
name=f"g_{branch_name}_{self.name}",
)
def get_config(self) -> dict:
"""get_condig method from tensorflow."""
config = super().get_config()
config.update(
{
"p_degree": self.p_degree,
"label_width": self.label_width,
"n_neurons": self._n_neurons,
"drop_rate": self.drop_rate,
}
)
return config
@property
def p_degree(self) -> int:
"""Return the degree of the trend equation."""
return self._p_degree
def _val___init__(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
"""Valid p_degree"""
greater_or_equal(self.p_degree, 0, "p_degree")
SEASONALITY_TYPE = Union[Union[int, float], List[Union[int, float]]]
[docs]class SeasonalityBlock(BaseBlock):
"""
Seasonality block definition.
This layer represents the smaller part of nbeats model.
Its internal layers are defining a fourier series.
We introduced notion of fourier orders and seasonality periods which is not
used in the original paper.
It is possible to get explanation from this block.
Parameters
----------
label_width : int
Horizon time to forecast.
forecast_periods : int | float | List[int | float]
Defines the periods used in the fourier equation.
Default to `label_width`/2 as describe in the original paper.
backcast_periods : int | float | List[int | float]
Compute the fourier serie period in the backcasting equation.
Default to `input_width`/2 as describe in the original paper.
forecast_fourier_order : int | float | List[int | float]
Compute the fourier orders.
Each element is the order of its respective period.
Default to `label_width`/2 as describe in the original paper.
backcast_fourier_order : int | float | List[int | float]
Compute the fourier orders.
Each element is the order of its respective back period.
Default to `input_width`/2 as describe in the original paper.
n_neurons : int
Number of neurons in the fully connected layers.
drop_rate : float
Rate of the dropout layer.
This is used to estimate the epistemic error.
Expected a value between 0 and 1. Default to 0.
Attributes
----------
label_width : int
input_width : int
input_spec : `InputSpec`
drop_rate : float
periods : int | float | list[int | float]
back_periods : int | float | list[int | float]
if not provided, then it is set during `build` method.
forecast_fourier_order : int | float | list[int | float]
backcast_fourier_order : int | float | list[int | float]
if not provided, then it is set during `build` method.
Raises
------
ValueError:
`drop_rate` not between 0 and 1.
`periods` and `forecast_fourier_order` or their elements are not strictly positive values
`back_periods` and `backcast_fourier_order` or their elements are not strictly positive values
`backcast_fourier_order` and `forecast_fourier_order` don't have the same length.
all others arguments are not strictly positive integers.
Examples
--------
>>> from autopycoin.layers import TrendBlock, SeasonalityBlock
>>> from autopycoin.models import Stack, NBEATS
>>> from autopycoin.losses import QuantileLossError
>>> trend_block = TrendBlock(label_width=10,
... p_degree=2,
... n_neurons=16,
... drop_rate=0.1,
... name="trend_block")
>>> seasonality_block = SeasonalityBlock(label_width=10,
... forecast_periods=[10],
... backcast_periods=[20],
... forecast_fourier_order=[10],
... backcast_fourier_order=[20],
... n_neurons=15,
... drop_rate=0.1,
... name="seasonality_block")
>>> trend_blocks = [trend_block for _ in range(3)]
>>> seasonality_blocks = [seasonality_block for _ in range(3)]
>>> trend_stacks = Stack(trend_blocks, name="trend_stack")
>>> seasonality_stacks = Stack(seasonality_blocks, name="seasonality_stack")
>>> # model definition and compiling
>>> model = NBEATS([trend_stacks, seasonality_stacks], name="interpretable_NBEATS")
Notes
-----
input shape:
N-D tensor with shape: (..., batch_size, time step).
The most common situation would be a 2D input with shape (batch_size, time step).
output shape:
N-D tensor with shape: (..., batch_size, units).
For instance, for a 2D input with shape (batch_size, units),
the output would have shape (batch_size, units).
With a QuantileLossError with 2 quantiles or higher the output would have shape (quantiles, batch_size, units).
If you add 2 variables, the output would have shape (variables, quantiles, batch_size, units).
"""
def __init__(
self,
label_width: int,
forecast_periods: SEASONALITY_TYPE = None,
backcast_periods: SEASONALITY_TYPE = None,
forecast_fourier_order: SEASONALITY_TYPE = None,
backcast_fourier_order: SEASONALITY_TYPE = None,
n_neurons: int = 32,
drop_rate: float = 0.0,
**kwargs: dict,
):
super(SeasonalityBlock, self).__init__(
label_width=label_width,
n_neurons=n_neurons,
drop_rate=drop_rate,
g_trainable=False,
interpretable=True,
block_type="SeasonalityBlock",
**kwargs,
)
# forecast periods and fourier order can be calculated if not provided
# backcast has to wait until `build` is called
self._forecast_periods = (
forecast_periods if forecast_periods else int(label_width / 2)
)
self._forecast_fourier_order = (
forecast_fourier_order if forecast_fourier_order else self._forecast_periods
)
# backcast_fourier_order and backcast_periods can't be calculated
self._backcast_periods = backcast_periods
self._backcast_fourier_order = backcast_fourier_order
def build(self, inputs_shape: tf.TensorShape):
"""Build method from tensorflow."""
# if None then set an default value based on the *input shape*
self._backcast_periods = self._backcast_periods or int(inputs_shape[-1] / 2)
self._backcast_fourier_order = (
self._backcast_fourier_order or self._backcast_periods
)
super().build(inputs_shape)
[docs] def coefficient_factory(
self,
output_last_dim: int,
periods: Union[int, float, List[Union[int, float]]],
fourier_orders: Union[int, float, List[Union[int, float]]],
) -> tf.Tensor:
"""
Compute the coefficients used in the last layer a.k.a g constrained layer.
Parameters
----------
output_last_dim : int
periods : int | float | Tuple[int | float]
fourier_orders : int | float | Tuple[int | float]
Returns
-------
coefficients : `tensor with shape (periods * fourier_orders, label_width)`
Coefficients of the g layer.
"""
# Shape (-1, 1) in order to broadcast periods to all time units
periods = tf.reshape(periods, shape=(-1, 1, 1))
periods = tf.cast(periods, dtype=floatx())
time_forecast = tf.range(output_last_dim, dtype=floatx())
seasonality = 2.0 * np.pi * time_forecast / periods
seasonality = (
tf.expand_dims(tf.ragged.range(fourier_orders, dtype=floatx()), axis=-1)
* seasonality
)
seasonality = tf.concat((tf.sin(seasonality), tf.cos(seasonality)), axis=0)
return seasonality.flat_values
[docs] def get_coefficients(self, output_last_dim: int, branch_name: str) -> tf.Tensor:
"""
Return the coefficients calculated by the `_coefficients_factory` method.
Parameters
----------
output_last_dim : int
branch_name : str
"""
periods = convert_to_list(getattr(self, branch_name + "_periods"))
fourier_order = convert_to_list(getattr(self, branch_name + "_fourier_order"))
# Set weights with calculated coef
coef = self.coefficient_factory(output_last_dim, periods, fourier_order)
return self.add_weight(
shape=coef.shape,
initializer=tf.constant_initializer(coef.numpy()),
trainable=self.is_g_trainable,
name=f"g_{branch_name}_{self.name}",
)
def get_config(self) -> dict:
"""get_config method from tensorflow."""
config = super().get_config()
config.update(
{
"label_width": self.label_width,
"forecast_periods": self.forecast_periods,
"backcast_periods": self.backcast_periods,
"forecast_fourier_order": self.forecast_fourier_order,
"backcast_fourier_order": self.backcast_fourier_order,
"n_neurons": self._n_neurons,
"drop_rate": self.drop_rate,
}
)
return config
@property
def forecast_periods(self) -> List[int]:
"""Return periods."""
return self._forecast_periods
@property
def backcast_periods(self) -> List[int]:
"""Return back periods."""
return self._backcast_periods
@property
def forecast_fourier_order(self) -> List[int]:
"""Return fourier order."""
return self._forecast_fourier_order
@property
def backcast_fourier_order(self) -> List[int]:
"""Return fourier order."""
return self._backcast_fourier_order
def _val___init__(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
greater_or_equal(self.forecast_periods, 0, "forecast_periods")
greater_or_equal(self.forecast_fourier_order, 0, "forecast_fourier_order")
equal_length(
self.forecast_periods,
self.forecast_fourier_order,
"forecast_periods",
"forecast_fourier_order",
)
if self.backcast_periods is not None:
greater_or_equal(self.backcast_periods, 0, "backcast_periods")
if self.backcast_fourier_order is not None:
greater_or_equal(self.backcast_fourier_order, 0, "backcast_fourier_order")
equal_length(
self.backcast_periods,
self.backcast_fourier_order,
"backcast_periods",
"backcast_fourier_order",
)
[docs]class GenericBlock(BaseBlock):
"""
Generic block definition as described in the paper.
This layer represents the smaller part of a nbeats model.
We can't have explanation from this block because g coefficients are learnt.
Parameters
----------
label_width : int
Horizon time to forecast.
g_forecast_neurons : int
Dimensionality if the gf layer.
g_backcast_neurons : int
Dimensionality if the gb layer.
n_neurons : int
Number of neurons in Fully connected layers.
drop_rate : float
Rate of the dropout layer. This is used to estimate the epistemic error.
Expected a value between 0 and 1. Default to 0.1.
Attributes
----------
label_width : int
input_width : int
input_spec : `InputSpec`
drop_rate : float
Examples
--------
>>> from autopycoin.layers import GenericBlock
>>> from autopycoin.models import Stack, NBEATS
>>> from autopycoin.losses import QuantileLossError
>>> generic_block = GenericBlock(label_width=10,
... n_neurons=16,
... g_forecast_neurons=16,
... g_backcast_neurons=16,
... drop_rate=0.1,
... name="generic_block")
>>> generic_blocks = [generic_block for _ in range(3)]
>>> generic_stacks = Stack(generic_blocks, name="generic_stack")
>>> # Model definition and compiling
>>> model = NBEATS([generic_stacks, generic_stacks], name="generic_NBEATS")
Notes
-----
input shape:
N-D tensor with shape: (..., batch_size, time step).
The most common situation would be a 2D input with shape (batch_size, time step).
output shape:
N-D tensor with shape: (..., batch_size, units).
For instance, for a 2D input with shape (batch_size, units),
the output would have shape (batch_size, units).
With a QuantileLossError with 2 quantiles or higher the output would have shape (quantiles, batch_size, units).
If you add 2 variables, the output would have shape (variables, quantiles, batch_size, units).
"""
def __init__(
self,
label_width: int,
g_forecast_neurons: int = 32,
g_backcast_neurons: int = 32,
n_neurons: int = 32,
drop_rate: float = 0.1,
**kwargs: dict,
):
super().__init__(
label_width=label_width,
n_neurons=n_neurons,
drop_rate=drop_rate,
g_trainable=True,
interpretable=False,
block_type="GenericBlock",
**kwargs,
)
self._g_forecast_neurons = g_forecast_neurons
self._g_backcast_neurons = g_backcast_neurons
[docs] def coefficient_factory(self, output_last_dim: int, neurons: int) -> tf.Tensor:
"""
Compute the coefficients used in the last layer a.k.a g layer.
This function is used in `_get_forecast_coefficients` and
`_get_backcast_coefficients`.
Parameters
----------
output_last_dim : int
neurons : int
Returns
-------
coefficients : `tensor with shape (label_width, neurons)`
Coefficients of the g layer.
"""
coefficients = tf.keras.initializers.GlorotUniform(seed=42)(
shape=self.n_variates + [neurons, output_last_dim]
)
return coefficients
[docs] def get_coefficients(self, output_last_dim: int, branch_name: str) -> tf.Tensor:
"""
Return the coefficients used in forecast and backcast layer a.k.a g layer.
Parameters
----------
output_last_dim : int
branch_name : str
Returns
-------
coefficients: `Tensor` of shape (d0, ..., label_width)
"""
neurons = getattr(self, "g_" + branch_name + "_neurons")
# Set weights with calculated coef
coef = self.coefficient_factory(output_last_dim, neurons)
return self.add_weight(
shape=coef.shape,
initializer=tf.constant_initializer(coef.numpy()),
trainable=self.is_g_trainable,
name=f"g_{branch_name}_{self.name}",
)
def get_config(self) -> dict:
"""get_config method from tensorflow."""
config = super().get_config()
config.update(
{
"label_width": self.label_width,
"g_forecast_neurons": self.g_forecast_neurons,
"g_backcast_neurons": self.g_backcast_neurons,
"n_neurons": self._n_neurons,
"drop_rate": self.drop_rate,
}
)
return config
@property
def g_forecast_neurons(self):
"""Return the dimension of the gf layer."""
return self._g_forecast_neurons
@property
def g_backcast_neurons(self):
"""Return the dimension of the gb layer."""
return self._g_backcast_neurons
def _val___init__(
self, output: None, *args: list, **kwargs: dict
) -> None: # pylint: disable=unused-argument
greater_or_equal(self.g_forecast_neurons, 0, "g_forecast_neurons")
greater_or_equal(self.g_backcast_neurons, 0, "g_backcast_neurons")