Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 78 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
# Batch Learning Forecasting Component

The component enables using external predictive models from [Scikit Learn](http://scikit-learn.org/stable/index.html) library (for example [Random Forest Regressor](http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html)) implementation in a streaming scenario. Fitting, saving, loading and live prediction are enabled. Live predictions work via Kafka streams (reading feature vectors from Kafka and writing predictions to Kafka).

The predictive model is designed in an decentralized fashion, meaning that several instances (submodels) will be created and used for each specific sensor and horizon (`#submodels = #sensors * #horiozons`). Decentralized architecture enables parallelization.
The component enables using external predictive models from [PyTorch](https://pytorch.org/)
and [Scikit Learn](http://scikit-learn.org/stable/index.html) library
(for example [Random Forest Regressor](http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html))
implementation in a streaming scenario. Fitting, saving, loading and live prediction are enabled. Live predictions work
via Kafka streams (reading feature vectors from Kafka and writing predictions to Kafka).

PyTorch models can have with an additional hidden layer that can process missing
data by replacing typical neuron's response in by its expected value using a Gaussian mixture model (GMM). The method is an
implementation from paper
[Processing of missing data by neural networks](https://arxiv.org/abs/1805.07405).
Original implementation in tensorflow is available on [this repository](https://github.com/lstruski/Processing-of-missing-data-by-neural-networks).

The predictive model is designed in an decentralized fashion, meaning that several instances (submodels)
will be created and used for each specific sensor and horizon (`#submodels = #sensors * #horizons`).
Decentralized architecture enables parallelization.

The code is available in the `src/` directory.

Expand All @@ -14,24 +26,55 @@ The code is available in the `src/` directory.
|----------|-------------|------|
| `-h` | `--help` | show help |
| `-c CONFIG` | `--config CONFIG` | path to config file (example: `config.json`) |
| `-f` | `--fit` | Learning the model from dataset (in `/data/fused`)|
| `-f` | `--fit` | learning the model from dataset (in `/data/fused`)|
| `-s` | `--save` | save model to file |
| `-l` | `--load` | load model from file |
| `-p` | `--predict` | Start live predictions (via Kafka) |
| `-p` | `--predict` | start live predictions (via Kafka) |
| `-w` | `--watchdog` | start watchdog pinging |

#### Config file:
Config file specifies the Kafka server address, which scikit algorithm to use, prediction horizons and sesnsors for which the model will be learned/loaded/saved/predicted. Config files are stored in `src/config/`.

Parameters:
- **bootstrap_servers**: string (or list of `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata
- **algorithm**: string as scikit-learn model constructor with initialization parameters
- **evaluation_periode**: define time periode (in hours) for which the model will be evaluated during live predictions (evaluations metrics added to ouput record)
- **evaluation_split_point**: define training and testing spliting point in the dataset, for model evaluation during learning phase (fit takes twice as long time)
- **prediction_horizons**: list of prediction horizons (in hours) for which the model will be trained to predict for.
- **sesnors**: list of sensors for which this specific instance will train the models and will be making predictions.
- **retrain_period**: A number of recieved samples after which the model will be re-trained. This is an optional parameter. If it is not specified no re-training will be done.
- **samples_for_retrain**: A number of samples that will be used for re-training. If retrain_period is not specified this parameter will be ignored. This is an optional parameter. If it is not specified (and retrain_period is) the re-train will be done on all samples recieved since the component was started.

Config file specifies the Kafka server address, which algorithm to use, prediction horizons and sensors for which the model will be learned/loaded/saved/predicted. Config files are stored in `src/config/`.

General Parameters:

| Name | Type | Default | Description |
| --- | --- | --- | --- |
| **prediction_horizons**| list(integer) | | List of prediction horizons (in units specified in time_offset) for which the model will be trained to predict for.|
| **time_offset**| string | H | [String alias](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases) to define the data time offsets. The aliases used in training and topic names are lowercase for backwards compatibility.|
| **sensors**| list(string) | | List of sensors for which this specific instance will train the models and will be making predictions.|
| **bootstrap_servers**| string or list(string)| | String (or list of `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata.|
| **algorithm**| string | `torch` | String as either a scikit-learn model constructor with initialization parameters or a string `torch` to train using a pre defined neural network using PyTorch with architecture: \[torch.nn.Linear, torch.nn.ReLU, torch.nn.Linear\],|
| **evaluation_period**| integer | 512 | Define time period (in defined time offset that is hours by default) for which the model will be evaluated during live predictions (evaluations metrics added to output record).|
| **evaluation_split_point**| float | 0.8 | Define training and testing splitting point in the dataset, for model evaluation during learning phase (fit takes twice as long time).|
| **retrain_period**| integer | None | A number of received samples after which the model will be re-trained. This is an optional parameter. If it is not specified no re-training will be done.|
| **samples_for_retrain**| integer | None | A number of samples that will be used for re-training. If retrain_period is not specified this parameter will be ignored. This is an optional parameter. If it is not specified (and retrain_period is) the re-train will be done on all samples received since the component was started.|
| **watchdog_path**| string | None | Watchdog path. |
| **watchdog_interval**| integer | 60 | Delay in seconds between each Watchdog ping |
| **watchdog_url**| string | `localhost` | Watchdog url. |
| **watchdog_port**| integer | 3001 | Watchdog port. |

PyTorch parameters:

| Name | Type | Default | Description |
| --- | --- | --- | --- |
| **learning_rate**| float| 4E-5 | Learning rate for the torch model.|
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate PyTorch specific configuration parameters.

| **batch_size**| integer | 64 | Size of training batches for torch model.|
| **training_rounds**| integer | 100 | Training rounds for torch model.|
| **num_workers**| integer| 1 | Number of workers for torch model.|

GMM Layer parameters:

| Name | Type | Default | Description |
| --- | --- | --- | --- |
| **gmm_layer**| boolean| False | If `true` the gmm layer is added to the model. |
| **initial_imputer**| string | `simple` | Options are `simple` or `iterative` which uses either sklearn [SimpleImputer](https://scikit-learn.org/stable/modules/generated/sklearn.impute.SimpleImputer.html) or [IterativeImputer](https://scikit-learn.org/stable/modules/generated/sklearn.impute.IterativeImputer.html) |
| **max_iter**| integer| 15 | If the iterative imputer is chosen, this arguments defines maximum number of iterations for it.|
| **n_gmm**| integer| 5 | Number of components of GaussianMixture. If n_gmm is set to -1, then all values between min_n_gmm and max_n_gmm are checked and the one with the best BIC score is chosen. |
| **min_n_gmm**| integer| 1 | Minimum number of components for GMM if search is enabled. |
| **max_n_gmm**| integer| 10 | Maximum number of components for GMM if search is enabled.|
| **gmm_seed**| integer| None | Random state seed for GMM. |
| **verbose**| boolean| False | If set to `True` the progress and results of n_gmm parameter search is displayed.|

Example of config file:
```json
{
Expand Down Expand Up @@ -63,13 +106,13 @@ Example of cluster config file:
["N7", "N8"]
```

Alternetively, process managers like `PM2` or `pman` would be a better fit for the task than `tmux`.
Alternatively, process managers like `PM2` or `pman` would be a better fit for the task than `tmux`.

## Assumptions:
- **Training data**: all the training files should be stored in a subfolder called `/data/fused`. Data should be stored as json objects per line (e.g. `{ "timestamp": 1459926000, "ftr_vector": [1, 2, 3]}`). Separate file for each sensor and prediction horizon. Files should be named the same as input kafka topics, that is `{sensor}_{horizon}h` (e.g. `sensor1_3h.json`)
- **Re-training data**: all the re-training data (if re-training is specified) will be stored in a subfolder called `/data/retrain_data` in the same form as training data. Seperate files will be made for each sensor and prediction horizon. The names of the files will be in the following form: `{sensor}_{horizon}h_retrain.json` (eg. `sensor1_3h_retrain.json`).
- **Models**: all the models are stored in a subfolder called `/models`. Each sensor and horizon has its own model. The name of the models is composed of sensor name and prediction horizon, `model_{sensor}_{horizon}h` (e.g. `model_sensor1_3h`)
- **Input kafka topic**: The names of input kafka topics on which the prototype is listening for live data should be in the same format as trainng data file names, that is `features_{sensor}_{horizon}h`.
- **Training data**: all the training files should be stored in a subfolder called `/data/fused`. Data should be stored as json objects per line (e.g. `{ "timestamp": 1459926000, "ftr_vector": [1, 2, 3]}`). Separate file for each sensor and prediction horizon. Files should be named the same as input kafka topics, that is `{sensor}_{horizon}{time_offset.lower()}` (e.g. `sensor1_3h.json`). The target sensor is the first element of `ftr_vector`.
- **Re-training data**: all the re-training data (if re-training is specified) will be stored in a subfolder called `/data/retrain_data` in the same form as training data. Separate files will be made for each sensor and prediction horizon. The names of the files will be in the following form: `{sensor}_{horizon}{time_offset.lower()}_retrain.json` (eg. `sensor1_3h_retrain.json`).
- **Models**: all the models are stored in a subfolder called `/models`. Each sensor and horizon has its own model. The name of the models is composed of sensor name and prediction horizon, `model_{sensor}_{horizon}{time_offset.lower()}` (e.g. `model_sensor1_3h`)
- **Input kafka topic**: The names of input kafka topics on which the prototype is listening for live data should be in the same format as training data file names, that is `features_{sensor}_{horizon}{time_offset.lower()}`.
- **Output kafka topic**: Predictions are sent on different topics based on a sensor names, that is `{sensor}` (e.g. `sensor1`).

## Examples:
Expand All @@ -94,7 +137,7 @@ Alternetively, process managers like `PM2` or `pman` would be a better fit for t

## Requirements

* Python 3.6+
* Python 3.9+
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 3.9?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.9 is the version which I have installed and the version on which the code was tested. The sk-learn code should work on both versions as I didn't change that but I can't guarantee for any of new pip packages used. Do I find compatible versions of packages for 3.6 python and update this also along with requirements.txt?


You can use `pip install -r requirements.txt` to install all the packages.

Expand All @@ -103,7 +146,11 @@ Unit tests are available in `src/tests`. They are invoked with: `python test.py`

```
codespace:~/workspace/forecasting/src/tests$ python test.py
test_eval_periode (__main__.TestClassProperties) ... ok
test_eval_period (__main__.TestClassLGBMProperties) ... ok
test_horizon (__main__.TestClassLGBMProperties) ... ok
test_sensor (__main__.TestClassLGBMProperties) ... ok
test_split_point (__main__.TestClassLGBMProperties) ... ok
test_eval_period (__main__.TestClassProperties) ... ok
test_horizon (__main__.TestClassProperties) ... ok
test_sensor (__main__.TestClassProperties) ... ok
test_split_point (__main__.TestClassProperties) ... ok
Expand All @@ -114,11 +161,16 @@ test_perfect_score (__main__.TestModelEvaluation) ... ok
test_predictability_index (__main__.TestModelEvaluation) ... ok
test_fit (__main__.TestModelFunctionality) ... ok
test_predict (__main__.TestModelFunctionality) ... ok
test_retrain (__main__.TestModelFunctionality) ... ok
test_retrain_not_enough_samples (__main__.TestModelFunctionality) ... ok
test_unlimited_retrain_file (__main__.TestModelFunctionality) ... ok
test_load (__main__.TestModelSerialization) ... ok
test_save (__main__.TestModelSerialization) ... ok
test_fit (__main__.TestPyTorchEvaluation) ... ok
test_predict (__main__.TestPyTorchEvaluation) ... ok

----------------------------------------------------------------------
Ran 13 tests in 1.096s
Ran 22 tests in 5.482s

OK
```
```
122 changes: 122 additions & 0 deletions src/lib/gmm_linear_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import torch
from torch import nn
import numpy as np
from sklearn.mixture import GaussianMixture


def nr(x):
""" Helper function for GMM Linear layer. Works for matrix x of shape (n_samples, n_features). """
return 1 / (np.sqrt(2 * np.pi)) * torch.exp(-torch.square(x) / 2) + x / 2 * (1 + torch.erf(x / np.sqrt(2)))


def linear_relu_missing_values(W, b, x, p, mean, cov):
""" Helper function for GMM Linear layer. It can take all samples at once, but it applies only one Gaussian. """
m = torch.where(torch.isnan(x), mean, x)
sigma = torch.where(torch.isnan(x), torch.abs(cov), torch.tensor(0.0))
return p * nr((m @ W + b) / torch.sqrt(
(torch.square(W) * torch.abs(sigma.view([sigma.shape[0], sigma.shape[1], 1]))).sum(axis=1)))


class GMMLinear(nn.Module):
""" Layer for processing missing data from the paper Processing of missing data by neural networks. """

def __init__(self, in_features, out_features, gmm_weights, gmm_means, gmm_covariances):
"""
in_features and out_features are number of input and output features,
other parameters are outputs of GaussianMixture.
"""
super(GMMLinear, self).__init__()

self.in_features = in_features
self.out_features = out_features

self.gmm_weights = nn.Parameter(torch.tensor(np.log(gmm_weights)).float())
self.gmm_means = nn.Parameter(torch.tensor(gmm_means).float())
self.gmm_covariances = nn.Parameter(torch.tensor(np.abs(gmm_covariances)).float())
self.n_gmm = len(gmm_weights)

if not self.gmm_means.shape == (self.n_gmm, self.in_features):
raise Exception('gmm_means does not match correct shape (n_components, n_features)')
if not self.gmm_covariances.shape == (self.n_gmm, self.in_features):
raise Exception("gmm_covariances does not match correct shape (n_components, n_features). \
GaussianMixture must be called with parameter covariance_type='diag'.")

# weight matrix and bias of this layer
self.W = nn.Parameter(torch.randn([self.in_features, self.out_features]))
self.b = nn.Parameter(torch.randn([self.out_features]))

def forward(self, x):
indices_full = torch.logical_not(torch.isnan(x).any(axis=1))
indices_missing = torch.isnan(x).any(axis=1)
x_full = x[indices_full]
x_missing = x[indices_missing]
p = nn.functional.softmax(self.gmm_weights, dim=0)
out_missing = linear_relu_missing_values(self.W, self.b, x_missing, p[0], self.gmm_means[0],
self.gmm_covariances[0])

for i in range(1, self.n_gmm):
out_missing += linear_relu_missing_values(self.W, self.b, x_missing, p[i], self.gmm_means[i],
self.gmm_covariances[i])

out_full = nn.functional.relu(x_full @ self.W + self.b)

out = torch.zeros(size=(x.shape[0], self.out_features))
out[indices_full] = out_full
out[indices_missing] = out_missing

assert torch.logical_not(torch.any(torch.isnan(out)))
return out


def create_gmm_linear_layer(X, in_features, out_features, initial_imputer, n_gmm, min_n_gmm=1, max_n_gmm=10,
verbose=True, gmm_seed=None):
"""
Returns object of class GMMLinear. X is input data with missing values, which are imputed with initial_imputer
and then used as input to GaussianMixture. If initial_imputer is None, then we assume X is already imputed data
without missing values. n_gmm is number of components of GaussianMixture. If n_gmm is set to -1,
then all values between min_n_gmm and max_n_gmm are checked and the one with the best BIC score is chosen.
"""
if initial_imputer is not None:
x_imputed = initial_imputer.fit_transform(X)
else:
x_imputed = X

if n_gmm == -1:
n_gmm = best_gmm_n_components(x_imputed, min_n_gmm, max_n_gmm, verbose)
if verbose:
print('Best n_components =', n_gmm)

gmm = GaussianMixture(n_components=n_gmm, covariance_type='diag', random_state=gmm_seed).fit(x_imputed)
return GMMLinear(in_features, out_features, gmm.weights_, gmm.means_, gmm.covariances_)


def best_gmm_n_components(X, n_min, n_max, verbose=True, gmm_seed=None):
""" Returns best number of components for GaussianMixture (between n_min and n_max) based on BIC score. """
min_n = -1
min_bic = np.infty
for n_components in range(n_min, n_max + 1):
gmm = GaussianMixture(n_components=n_components, covariance_type='diag', random_state=gmm_seed).fit(X)
bic = gmm.bic(X)
if verbose:
print(f'n_components={n_components},\tBIC={bic}')
if min_bic > bic:
min_bic = bic
min_n = n_components
return min_n


def build_multilayer_model(X, dimensions, initial_imputer, n_gmm, gmm_seed):
"""
Returns neural network with first layer GMMLinear and the rest normal linear layers with ReLU activation.
Number and dimensions of layers are defined with array dimensions.
"""
gmm_layer = create_gmm_linear_layer(X, dimensions[0], dimensions[1], initial_imputer, n_gmm, min_n_gmm=1,
max_n_gmm=10, verbose=True, gmm_seed=gmm_seed)

layers_list = [gmm_layer]
for i in range(1, len(dimensions) - 1):
if i > 1:
layers_list.append(nn.ReLU())
layers_list.append(nn.Linear(dimensions[i], dimensions[i + 1]))

return nn.Sequential(*layers_list)
Loading