From 03c378ebbf5089d74d4a62340cb92e481541bfe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=20=C5=A0ircelj?= Date: Thu, 23 Sep 2021 16:27:42 +0200 Subject: [PATCH 1/5] Torch ann first version --- README.md | 18 ++-- src/lib/predictive_model.py | 176 ++++++++++++++++++++++++---------- src/lib/regression_metrics.py | 16 ++-- src/main.py | 101 ++++++++++--------- src/tests/test.py | 139 ++++++++++++++------------- 5 files changed, 268 insertions(+), 182 deletions(-) diff --git a/README.md b/README.md index 7bb9305..a1cacaa 100644 --- a/README.md +++ b/README.md @@ -20,17 +20,17 @@ The code is available in the `src/` directory. | `-p` | `--predict` | Start live predictions (via Kafka) | #### Config file: -Config file specifies the Kafka server address, which scikit algorithm to use, prediction horizons and sesnsors for which the model will be learned/loaded/saved/predicted. Config files are stored in `src/config/`. +Config file specifies the Kafka server address, which scikit algorithm to use, prediction horizons and senssors for which the model will be learned/loaded/saved/predicted. Config files are stored in `src/config/`. Parameters: - **bootstrap_servers**: string (or list of `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata - **algorithm**: string as scikit-learn model constructor with initialization parameters -- **evaluation_periode**: define time periode (in hours) for which the model will be evaluated during live predictions (evaluations metrics added to ouput record) -- **evaluation_split_point**: define training and testing spliting point in the dataset, for model evaluation during learning phase (fit takes twice as long time) +- **evaluation_period**: define time period (in hours) for which the model will be evaluated during live predictions (evaluations metrics added to output record) +- **evaluation_split_point**: define training and testing splitting point in the dataset, for model evaluation during learning phase (fit takes twice as long time) - **prediction_horizons**: list of prediction horizons (in hours) for which the model will be trained to predict for. -- **sesnors**: list of sensors for which this specific instance will train the models and will be making predictions. -- **retrain_period**: A number of recieved samples after which the model will be re-trained. This is an optional parameter. If it is not specified no re-training will be done. -- **samples_for_retrain**: A number of samples that will be used for re-training. If retrain_period is not specified this parameter will be ignored. This is an optional parameter. If it is not specified (and retrain_period is) the re-train will be done on all samples recieved since the component was started. +- **sensors**: list of sensors for which this specific instance will train the models and will be making predictions. +- **retrain_period**: A number of received samples after which the model will be re-trained. This is an optional parameter. If it is not specified no re-training will be done. +- **samples_for_retrain**: A number of samples that will be used for re-training. If retrain_period is not specified this parameter will be ignored. This is an optional parameter. If it is not specified (and retrain_period is) the re-train will be done on all samples received since the component was started. Example of config file: ```json @@ -63,13 +63,13 @@ Example of cluster config file: ["N7", "N8"] ``` -Alternetively, process managers like `PM2` or `pman` would be a better fit for the task than `tmux`. +Alternatively, process managers like `PM2` or `pman` would be a better fit for the task than `tmux`. ## Assumptions: - **Training data**: all the training files should be stored in a subfolder called `/data/fused`. Data should be stored as json objects per line (e.g. `{ "timestamp": 1459926000, "ftr_vector": [1, 2, 3]}`). Separate file for each sensor and prediction horizon. Files should be named the same as input kafka topics, that is `{sensor}_{horizon}h` (e.g. `sensor1_3h.json`) -- **Re-training data**: all the re-training data (if re-training is specified) will be stored in a subfolder called `/data/retrain_data` in the same form as training data. Seperate files will be made for each sensor and prediction horizon. The names of the files will be in the following form: `{sensor}_{horizon}h_retrain.json` (eg. `sensor1_3h_retrain.json`). +- **Re-training data**: all the re-training data (if re-training is specified) will be stored in a subfolder called `/data/retrain_data` in the same form as training data. Separate files will be made for each sensor and prediction horizon. The names of the files will be in the following form: `{sensor}_{horizon}h_retrain.json` (eg. `sensor1_3h_retrain.json`). - **Models**: all the models are stored in a subfolder called `/models`. Each sensor and horizon has its own model. The name of the models is composed of sensor name and prediction horizon, `model_{sensor}_{horizon}h` (e.g. `model_sensor1_3h`) -- **Input kafka topic**: The names of input kafka topics on which the prototype is listening for live data should be in the same format as trainng data file names, that is `features_{sensor}_{horizon}h`. +- **Input kafka topic**: The names of input kafka topics on which the prototype is listening for live data should be in the same format as training data file names, that is `features_{sensor}_{horizon}h`. - **Output kafka topic**: Predictions are sent on different topics based on a sensor names, that is `{sensor}` (e.g. `sensor1`). ## Examples: diff --git a/src/lib/predictive_model.py b/src/lib/predictive_model.py index b490715..541a7cb 100644 --- a/src/lib/predictive_model.py +++ b/src/lib/predictive_model.py @@ -10,6 +10,10 @@ import os import json from datetime import datetime +import torch +import numpy as np +from lib.regression_metrics import * + class PredictiveModel: """ @@ -17,18 +21,58 @@ class PredictiveModel: ref: http://scikit-learn.org/stable/supervised_learning.html#supervised-learning """ - def __init__(self, algorithm, sensor, prediction_horizon, evaluation_periode, error_metrics, split_point, - retrain_period = None, samples_for_retrain = None, retrain_file_location = None): + def __init__(self, + sensor, + prediction_horizon, + evaluation_period=512, + error_metrics=None, + split_point=0.8, + algorithm='torch', + encoder_length=None, + retrain_period=None, + samples_for_retrain=None, + retrain_file_location=None, + time_offset='H', + learning_rate=0.001, + batch_size=64, + training_rounds=100, + num_workers=1, + **kwargs): + + if not error_metrics: + self.err_metrics = [ + {'name': "R2 Score", 'short': "r2", 'function': sklearn.metrics.r2_score}, + {'name': "Mean Absolute Error", 'short': "mae", 'function': sklearn.metrics.mean_absolute_error}, + {'name': "Mean Squared Error", 'short': "mse", 'function': sklearn.metrics.mean_squared_error}, + {'name': "Root Mean Squared Error", 'short': "rmse", + 'function': lambda true, pred: math.sqrt(sklearn.metrics.mean_squared_error(true, pred))}, + {'name': "Mean Absolute Percentage Error", 'short': "mape", + 'function': mean_absolute_percentage_error} + ] + self.algorithm = algorithm - self.model = eval(self.algorithm) + if "torch" == algorithm: + self.model = self.TorchNetwork(self) + else: + self.model = eval(self.algorithm) self.sensor = sensor self.horizon = prediction_horizon - self.eval_periode = evaluation_periode + self.eval_period = evaluation_period self.split_point = split_point - self.err_metrics = error_metrics - self.measurements = collections.deque(maxlen=self.eval_periode) - self.predictions = collections.deque(maxlen=(self.eval_periode + self.horizon)) + self.encoder_length = encoder_length + self.measurements = collections.deque(maxlen=self.eval_period) + self.predictions = collections.deque(maxlen=(self.eval_period + self.horizon)) self.predictability = None + self.time_offset = time_offset + + # Alternate fit arguments for Torch networks + self.encoder_length = 1 + if encoder_length is not None: + self.encoder_length = encoder_length + self.learning_rate = learning_rate + self.batch_size = batch_size + self.training_rounds = training_rounds + self.num_workers = num_workers # Retrain configurations self.samples_for_retrain = samples_for_retrain @@ -37,8 +81,7 @@ def __init__(self, algorithm, sensor, prediction_horizon, evaluation_periode, er self.samples_in_train_file = 0 self.retrain_memory = {"timestamp": [], "ftr_vector": []} - - if(self.retrain_period is not None): + if self.retrain_period is not None: # Initialize file filename = "{}_{}h_retrain.json".format(sensor, prediction_horizon) self.train_file_path = os.path.join(retrain_file_location, filename) @@ -47,59 +90,58 @@ def __init__(self, algorithm, sensor, prediction_horizon, evaluation_periode, er def fit(self, filename): with open(filename) as data_file: - #data = pd.read_json(data_file) - data = pd.read_json(data_file, lines=True) # if not valid json + data = pd.read_json(data_file, lines=True) # if not valid json # set datetime as index - data.set_index('timestamp',inplace=True) - - # transform ftr_vector from array to seperate fields + data.set_index('timestamp', inplace=True) + # data.index = [datetime.fromtimestamp(i) for i in data.index] + # transform ftr_vector from array to separate fields data = data['ftr_vector'].apply(pd.Series) - - #print(data) + + # print(data) # get features all_features = list(data) # prepare target based on prediction horizon (first one is measurement to shift) measurements = data[[data.columns[0]]] - # this line removed duplicate target values; makes no sense ... - # removed by Klemen Kenda, 2020/09/09 - #measurements = measurements.loc[~measurements.duplicated(keep='first')] - data['target'] = measurements.shift(periods = -self.horizon, freq = 'H') - data = data.dropna() # No need for this any more + data['target'] = measurements.shift(periods=-self.horizon, freq=self.time_offset) + data = data.dropna() # No need for this any more # prepare learning data - X = data[all_features].values + x = data[all_features].values y = data['target'].values # fit the model - self.model.fit(X, y) + self.model.fit(x, y) # start evaluation # split data to training and testing set - split = int(X.shape[0] * self.split_point) - X_train = X[:split] + split = int(x.shape[0] * self.split_point) + x_train = x[:split] y_train = y[:split] - X_test = X[split:] + x_test = x[split:] y_test = y[split:] # train evaluation model - evaluation_model = eval(self.algorithm) - evaluation_model.fit(X_train, y_train) + if self.algorithm == 'torch': + evaluation_model = self.TorchNetwork(self) + else: + evaluation_model = eval(self.algorithm) + evaluation_model.fit(x_train, y_train) with open('performance_rf.txt', 'a+') as data_file: - data_file.truncate(); + data_file.truncate() - for rec in X_test: + for rec in x_test: start1 = time.time() - pred = evaluation_model.predict(rec.reshape(1,-1)) + pred = evaluation_model.predict(rec.reshape(1, -1)) end = time.time() latency = end - start1 # print(latency) data_file.write("{}\n".format(latency)) - # tesing predictions + # testing predictions true = y_test - pred = evaluation_model.predict(X_test) + pred = evaluation_model.predict(x_test) # calculate predictability fitness = sklearn.metrics.r2_score(true, pred) @@ -108,18 +150,54 @@ def fit(self, filename): # calculate evaluation scores output = {} for metrics in self.err_metrics: - error_name = metrics['short'] - if error_name == 'rmse': - output[error_name] = math.sqrt(sklearn.metrics.mean_squared_error(true, pred)) - else: - output[error_name] = metrics['function'](true, pred) + output[metrics['short']] = metrics['function'](true, pred) return output + class TorchNetwork: + + def __init__(self, parent): + self.parent = parent + self.ann = None + + def fit(self, x, y): + loss_function = torch.nn.MSELoss() # error/cost function f(ANN(x),y), i.e. degree of misprediction + dimensions = [np.shape(x)[1], (np.shape(x)[1] + 1) // 2, 1] + layers = (2 * len(dimensions) - 3) * [torch.nn.ReLU()] # layers of the ann + layers[::2] = [torch.nn.Linear(dimensions[k], dimensions[k + 1]) for k in range(len(dimensions) - 1)] + self.ann = torch.nn.Sequential(*layers) + + opt = torch.optim.Adam(self.ann.parameters(), + lr=self.parent.learning_rate) # F will adjust the ANN's weights to minimize f(ANN(x),y) + + train_batches = torch.utils.data.DataLoader( + # partition 3k-tuples randomly into batches of 64 for nt-thread parallelization + [[x[i], y[i]] for i in range(len(x))], batch_size=self.parent.batch_size, shuffle=True, + num_workers=self.parent.num_workers) + losses = [] + for training_round in range( + self.parent.training_rounds): # if the number of rounds is too small/large, we get underfitting/overfitting + i, loss = 0, 0 + print('Train epoch \t' + str(training_round), end=' ') + for x, y in train_batches: # randomly select a batch of 64 pairs x,y, each of length 3k,3l + opt.zero_grad() # Training pass; set the gradients to 0 before each loss calculation. + loss = loss_function(self.ann(x.float()), + y.unsqueeze(1).float()) # calculate the error/cost/loss, this is a 0-dim tensor (number) + loss.backward() # backpropagation: compute gradients (this is where the ANN learns) + if i % 1000 == 0: + print(round(loss.item(), 3), end='\t') + opt.step() # apply gradients to improve weights ann[k].weight.grad to minimize f + i, loss = i + 1, loss + loss.item() # item() converts a 0-dim tensor to a number + losses.append(loss) + print('loss:', losses[-1]) + + def predict(self, x): + return self.ann(torch.tensor(x).float()).detach().numpy() + def predict(self, ftr_vector, timestamp): prediction = self.model.predict(ftr_vector) # Retrain stuff - if(self.retrain_period is not None): + if self.retrain_period is not None: # Add current ftr_vector to file with open(self.train_file_path, 'r') as data_r: # get all lines @@ -127,22 +205,21 @@ def predict(self, ftr_vector, timestamp): # Create new line and append it new_line = "{\"timestamp\": " + str(timestamp) + ", \"ftr_vector\": " + str(ftr_vector[0]) + "}" # If not the first line add \n at the beginning - if(len(lines)!=0): + if len(lines) != 0: new_line = "\n" + new_line lines.append(new_line) # Truncate arrays to correct size - if(self.samples_for_retrain is not None and self.samples_for_retrain < len(lines)): - lines = lines[-self.samples_for_retrain:] - + if self.samples_for_retrain is not None and self.samples_for_retrain < len(lines): + lines = lines[-self.samples_for_retrain:] with open(self.train_file_path, 'w') as data_w: data_w.writelines(lines) - + self.samples_from_retrain += 1 # If conditions are satisfied retrain the model - if(self.samples_from_retrain%self.retrain_period == 0 and - (self.samples_for_retrain is None or self.samples_for_retrain == len(lines))): + if (self.samples_from_retrain % self.retrain_period == 0 and + (self.samples_for_retrain is None or self.samples_for_retrain == len(lines))): self.samples_from_retrain = 0 self.fit(filename=self.train_file_path) @@ -155,7 +232,8 @@ def evaluate(self, output, measurement): # check if buffers are full if len(self.predictions) < self.predictions.maxlen: - warn_text = "Warning: Not enough predictions for evaluation yet ({}/{})".format(len(self.predictions), self.predictions.maxlen) + warn_text = "Warning: Not enough predictions for evaluation yet ({}/{})".format(len(self.predictions), + self.predictions.maxlen) warnings.warn(warn_text) return output @@ -173,8 +251,8 @@ def evaluate(self, output, measurement): def save(self, filename): joblib.dump(self.model, filename, compress=3) - #print "Saved model to", filename + # print "Saved model to", filename def load(self, filename): self.model = joblib.load(filename) - #print "Loaded model from", filename + # print "Loaded model from", filename diff --git a/src/lib/regression_metrics.py b/src/lib/regression_metrics.py index fc727c5..374e967 100644 --- a/src/lib/regression_metrics.py +++ b/src/lib/regression_metrics.py @@ -1,8 +1,9 @@ import numpy as np import warnings + # MAPE - Mean Absolute Percentage Error -def mean_absolute_percentage_error(y_true, y_pred): +def mean_absolute_percentage_error(y_true, y_pred): """ Use of this metric is not recommended; for illustration only. See other regression metrics on sklearn docs: @@ -13,21 +14,22 @@ def mean_absolute_percentage_error(y_true, y_pred): >>> mean_absolute_percentage_error(y_true, y_pred) Out[]: 24.791666666666668 """ - + y_true = np.array(y_true) y_pred = np.array(y_pred) - + # Check if y_pred has any zero elements. If yes, remove them, and raise warning zero_indices = np.flatnonzero(y_true == 0) if (zero_indices.size != 0): y_true = np.delete(y_true, zero_indices) y_pred = np.delete(y_pred, zero_indices) - - #warning_msg = "Found {0} zero elements in y_pred. Removing {0} zero elements".format(len(zero_indices)) - #warnings.warn(warning_msg, RuntimeWarning) + + # warning_msg = "Found {0} zero elements in y_pred. Removing {0} zero elements".format(len(zero_indices)) + # warnings.warn(warning_msg, RuntimeWarning) return np.mean(np.abs((y_true - y_pred) / y_true)) * 100 + import numpy as np @@ -47,7 +49,7 @@ def mean_absolute_scaled_error(training_series, naive_training_series, testing_s testing_series = np.array(testing_series) prediction_series = np.array(prediction_series) n = training_series.shape[0] - #d = np.abs(np.diff(training_series)).sum() / (n - 1) + # d = np.abs(np.diff(training_series)).sum() / (n - 1) d = np.abs(training_series - naive_training_series).sum() / (n - 1) errors = np.abs(testing_series - prediction_series) diff --git a/src/main.py b/src/main.py index ac1499a..9788555 100644 --- a/src/main.py +++ b/src/main.py @@ -9,19 +9,20 @@ import threading import requests -# adding lib subdirectory -sys.path.insert(0,'./lib') - from sklearn.ensemble import RandomForestRegressor from sklearn.linear_model import Ridge import joblib -import regression_metrics as additional_metrics +from lib.regression_metrics import * from kafka import KafkaConsumer from kafka import KafkaProducer import numpy as np import pandas as pd -from predictive_model import PredictiveModel +from lib.predictive_model import PredictiveModel + +# adding lib subdirectory +sys.path.insert(0, './lib') + def get_model_file_name(sensor, horizon): subdir = 'models' @@ -33,6 +34,7 @@ def get_model_file_name(sensor, horizon): return filepath + def get_data_file_name(sensor, horizon): subdir = '../../data/fused' if not os.path.isdir(subdir): @@ -43,6 +45,7 @@ def get_data_file_name(sensor, horizon): return filepath + def get_input_data_topics(sensors, horizons): topics = [] for sensor in sensors: @@ -51,11 +54,12 @@ def get_input_data_topics(sensors, horizons): return topics + def ping_watchdog(): - interval = 60 # ping interval in seconds + interval = 60 # ping interval in seconds url = "localhost" - port= 3001 - path= "/ping?id=5&secret=b9347c25aba4d3ba6e8f61d05fd1c011" + port = 3001 + path = "/ping?id=5&secret=b9347c25aba4d3ba6e8f61d05fd1c011" try: r = requests.get("http://{}:{}{}".format(url, port, path)) @@ -66,6 +70,7 @@ def ping_watchdog(): threading.Timer(interval, ping_watchdog).start() + def main(): parser = argparse.ArgumentParser(description="Modeling component") @@ -81,9 +86,18 @@ def main(): "-f", "--fit", action='store_true', + dest="fit", help=u"Learning the model from dataset in subfolder '../../data/fused'", ) + parser.add_argument( + "-a", + "--alternate_it", + action='store_true', + dest="alternate_fit", + help=u"Learning the model from csv file in subfolder '../../data/fused'", + ) + parser.add_argument( "-s", "--save", @@ -115,14 +129,14 @@ def main(): ) # Display help if no arguments are defined - if len(sys.argv)==1: + if len(sys.argv) == 1: parser.print_help() sys.exit(1) # Parse input arguments args = parser.parse_args() - #Read config file + # Read config file with open("config/" + args.config) as data_file: conf = json.load(data_file) @@ -130,60 +144,44 @@ def main(): print("\n=== Init phase ===") models = {} - algorithm = conf['algorithm'] - sensors = conf['sensors'] - horizons = conf['prediction_horizons'] - evaluation_period = conf['evaluation_period'] - evaluation_split_point = conf['evaluation_split_point'] - error_metrics = [ - {'name': "R2 Score", 'short': "r2", 'function': sklearn.metrics.r2_score}, - {'name': "Mean Absolute Error", 'short': "mae", 'function': sklearn.metrics.mean_absolute_error}, - {'name': "Mean Squared Error", 'short': "mse", 'function': sklearn.metrics.mean_squared_error}, - {'name': "Root Mean Squared Error", 'short': "rmse", 'function': None}, - {'name': "Mean Absolute Percentage Error", 'short': "mape", 'function': additional_metrics.mean_absolute_percentage_error} - ] - - # Ifretrain period is defined read it from conf otherwise use None - if("retrain_period" in conf): - retrain_period = conf["retrain_period"] - if("samples_for_retrain" in conf): - samples_for_retrain = conf["samples_for_retrain"] + kwargs = dict() + sensors, horizons = None, None + + for key in conf: + if 'sensors' == key: + sensors = conf[key] + elif 'prediction_horizons' == key: + horizons = conf[key] else: - samples_for_retrain = None - else: - retrain_period = None - samples_for_retrain = None + kwargs[key] = conf[key] for sensor in sensors: models[sensor] = {} for horizon in horizons: - models[sensor][horizon] = PredictiveModel(algorithm, sensor, + models[sensor][horizon] = PredictiveModel(sensor, horizon, - evaluation_period, - error_metrics, - evaluation_split_point, - retrain_period, - samples_for_retrain, - os.path.join('.', 'test', 'retrain_data')) + **kwargs) print("Initializing model_{}_{}h".format(sensor, horizon)) # Model learning - if (args.fit): + if args.fit: print("\n=== Learning phase ===") for sensor in sensors: for horizon in horizons: start = time.time() data = get_data_file_name(sensor, horizon) - try: - score = models[sensor][horizon].fit(data) - except Exception as e: - print(e) + # try: + score = models[sensor][horizon].fit(data) end = time.time() - print("Model[{0}_{1}h] training time: {2:.1f}s, evaluations: {3})".format(sensor, horizon, end-start, str(score))) + print("Model[{0}_{1}h] training time: {2:.1f}s, evaluations: {3})".format(sensor, horizon, + end - start, + str(score))) + # except Exception as e: + # print(e) # Model saving - if (args.save): + if args.save: print("\n=== Saving phase ===") for sensor in sensors: @@ -194,7 +192,7 @@ def main(): print("Saved model", filename) # Model loading - if (args.load): + if args.load: print("\n=== Loading phase ===") for sensor in sensors: @@ -204,12 +202,12 @@ def main(): model.load(filename) print("Loaded model", filename) - if (args.watchdog): + if args.watchdog: print("\n=== Watchdog started ===") ping_watchdog() # Live predictions - if (args.predict): + if args.predict: print("\n=== Predictions phase ===") # Start Kafka consumer @@ -227,7 +225,7 @@ def main(): rec = eval(msg.value) timestamp = rec['timestamp'] ftr_vector = rec['ftr_vector'] - measurement = ftr_vector[0] # first feature is the target measurement + measurement = ftr_vector[0] # first feature is the target measurement topic = msg.topic @@ -247,7 +245,7 @@ def main(): 'predictability': model.predictability} # evaluation - output = model.evaluate(output, measurement) # appends evaluations to output + output = model.evaluate(output, measurement) # appends evaluations to output # send result to kafka topic output_topic = "predictions_{}".format(sensor) @@ -263,5 +261,6 @@ def main(): except Exception as e: print('Consumer error: ' + str(e)) + if __name__ == '__main__': main() diff --git a/src/tests/test.py b/src/tests/test.py index 8100e8e..9131a9f 100644 --- a/src/tests/test.py +++ b/src/tests/test.py @@ -2,16 +2,16 @@ # -*- coding: utf-8 -*- import sys import time -sys.path.insert(0,'../lib') - import unittest import sklearn.metrics -from predictive_model import PredictiveModel -from sklearn.ensemble import RandomForestRegressor +from lib.predictive_model import PredictiveModel import os import warnings +sys.path.insert(0, '../lib') + + def create_testing_file(): testset = """ {"timestamp": 1459926000, "ftr_vector": [1, 2, 3]} @@ -23,12 +23,12 @@ def create_testing_file(): {"timestamp": 1459947600, "ftr_vector": [1, 3, 2]} """ - #subdir = '../nrgStream-fusion/data' - #subdir = './test/data' + # subdir = '../nrgStream-fusion/data' + # subdir = './test/data' subdir = os.path.join('.', 'test', 'data') if not os.path.isdir(subdir): os.makedirs(subdir) - + filename = "N1_1h.json" filepath = os.path.join(subdir, filename) @@ -37,6 +37,7 @@ def create_testing_file(): return filepath + def create_testing_file_for_retrain(): testset = """ {"timestamp": 1459926000, "ftr_vector": [0]} @@ -51,7 +52,7 @@ def create_testing_file_for_retrain(): subdir = os.path.join('.', 'test', 'retrain_data') if not os.path.isdir(subdir): os.makedirs(subdir) - + filename = "N1_1h.json" filepath = os.path.join(subdir, filename) @@ -60,44 +61,47 @@ def create_testing_file_for_retrain(): return filepath -def create_model_instance(model_string, retrain_period = None, samples_for_retrain = None): - algorithm = model_string - sensor = "N1" - horizon = 1 - evaluation_period = 72 - evaluation_split_point = 0.8 - error_metrics = [ - {'name': "R2 Score", 'short': "r2", 'function': sklearn.metrics.r2_score}, - {'name': "Mean Absolute Error", 'short': "mae", 'function': sklearn.metrics.mean_absolute_error}, - {'name': "Mean Squared Error", 'short': "mse", 'function': sklearn.metrics.mean_squared_error}, - {'name': "Root Mean Squared Error", 'short': "rmse", 'function': None} - ] - model = PredictiveModel(algorithm, sensor, horizon, evaluation_period, - error_metrics, evaluation_split_point, retrain_period, - samples_for_retrain, os.path.join('.', 'test', 'retrain_data')) - - return model + +def create_model_instance(model_string, retrain_period=None, samples_for_retrain=None): + algorithm = model_string + sensor = "N1" + horizon = 1 + evaluation_period = 72 + evaluation_split_point = 0.8 + error_metrics = [ + {'name': "R2 Score", 'short': "r2", 'function': sklearn.metrics.r2_score}, + {'name': "Mean Absolute Error", 'short': "mae", 'function': sklearn.metrics.mean_absolute_error}, + {'name': "Mean Squared Error", 'short': "mse", 'function': sklearn.metrics.mean_squared_error}, + {'name': "Root Mean Squared Error", 'short': "rmse", 'function': None} + ] + model = PredictiveModel(algorithm, sensor, horizon, evaluation_period, + error_metrics, evaluation_split_point, retrain_period, + samples_for_retrain, os.path.join('.', 'test', 'retrain_data')) + + return model class SimpleWidgetTestCase(unittest.TestCase): def setUp(self): - self.model = create_model_instance("sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", - retrain_period=None, samples_for_retrain=None) + self.model = create_model_instance( + "sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", + retrain_period=None, samples_for_retrain=None) + class TestClassProperties(SimpleWidgetTestCase): - + def test_sensor(self): self.assertEqual(self.model.sensor, "N1") def test_horizon(self): self.assertEqual(self.model.horizon, 1) - def test_eval_periode(self): - self.assertEqual(self.model.eval_periode, 72) + def test_eval_period(self): + self.assertEqual(self.model.eval_period, 72) def test_split_point(self): - self.assertEqual(self.model.split_point, 0.8) + self.assertEqual(self.model.split_point, 0.8) class TestModelFunctionality(SimpleWidgetTestCase): @@ -127,7 +131,7 @@ def test_predict(self): prediction = self.model.predict([[1, 1, 1]], timestamp=time.time()) # check if prediction is valid - self.assertEqual(prediction[0], 1.96) + self.assertEqual(prediction[0], 1.96) os.remove(f) def test_retrain(self): @@ -135,20 +139,21 @@ def test_retrain(self): f = create_testing_file_for_retrain() start_timestamp = 1459951200 - model_for_retrain = create_model_instance("sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", - retrain_period=10, samples_for_retrain=10) + model_for_retrain = create_model_instance( + "sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", + retrain_period=10, samples_for_retrain=10) # Fit the model model_for_retrain.fit(f) # Run predict 5 times (as specified to trigger the retrain) for i in range(10): - timestamp = start_timestamp + i*60*60 + timestamp = start_timestamp + i * 60 * 60 p = model_for_retrain.predict([[1]], timestamp=timestamp) self.assertEqual(p[0], 0.) - + # If retrain was triggered 1. should be predicted - p = model_for_retrain.predict([[1]], start_timestamp + 10*60*60) + p = model_for_retrain.predict([[1]], start_timestamp + 10 * 60 * 60) self.assertEqual(p[0], 1.) os.remove(f) @@ -157,11 +162,12 @@ def test_retrain(self): def test_unlimited_retrain_file(self): # create train file and retrain file location f = create_testing_file_for_retrain() - # 1h past the lasti timestamp in the train file + # 1h past the last i timestamp in the train file start_timestamp = 1459951200 - model_for_retrain = create_model_instance("sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", - retrain_period=10, samples_for_retrain=None) + model_for_retrain = create_model_instance( + "sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", + retrain_period=10, samples_for_retrain=None) # Fit the model model_for_retrain.fit(f) @@ -172,9 +178,9 @@ def test_unlimited_retrain_file(self): with open(model_for_retrain.train_file_path, "r") as retrain_file: self.assertEqual(len(retrain_file.readlines()), i) - timestamp = start_timestamp + i*60*60 + timestamp = start_timestamp + i * 60 * 60 p = model_for_retrain.predict([[1]], timestamp=timestamp) - if(i<10): + if (i < 10): self.assertEqual(p[0], 0.) else: self.assertEqual(p[0], 1.) @@ -189,8 +195,9 @@ def test_retrain_not_enough_samples(self): f = create_testing_file_for_retrain() start_timestamp = 1459951200 - model_for_retrain = create_model_instance("sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", - retrain_period=5, samples_for_retrain=7) + model_for_retrain = create_model_instance( + "sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", + retrain_period=5, samples_for_retrain=7) # Fit the model model_for_retrain.fit(f) @@ -198,12 +205,12 @@ def test_retrain_not_enough_samples(self): # Run predict 10 times (at 5 retrain should not be triggered since # there is not onough data) for i in range(10): - timestamp = start_timestamp + i*60*60 + timestamp = start_timestamp + i * 60 * 60 p = model_for_retrain.predict([[1]], timestamp=timestamp) self.assertEqual(p[0], 0.) - + # If retrain was triggered 1. should be predicted - timestamp = start_timestamp + 10*60*60 + timestamp = start_timestamp + 10 * 60 * 60 p = model_for_retrain.predict([[1]], timestamp=timestamp) self.assertEqual(p[0], 1.) @@ -211,15 +218,13 @@ def test_retrain_not_enough_samples(self): os.remove(model_for_retrain.train_file_path) - class TestModelSerialization(SimpleWidgetTestCase): def test_save(self): - # file names model_file = os.path.join('.', 'test', 'data', 'model') dataset_file = create_testing_file() - + # first test if we get exception when trying to use unfitted model with self.assertRaises(Exception) as context: self.model.predict([[1, 1, 1]], timestamp=time.time()) @@ -243,13 +248,13 @@ def test_save(self): os.remove(dataset_file) def test_load(self): - # file names model_file = os.path.join('.', 'test', 'data', 'model') dataset_file = create_testing_file() - + # create saved file of dummy model - dummy_model = create_model_instance("sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)") + dummy_model = create_model_instance( + "sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)") dummy_model.fit(dataset_file) dummy_model.save(model_file) @@ -290,7 +295,7 @@ def test_evaluation_score(self): with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") - for a in range(1,80): + for a in range(1, 80): output = self.model.evaluate({'value': 0}, 1) # check evaluation results @@ -298,14 +303,14 @@ def test_evaluation_score(self): self.assertEqual(output['rmse'], 1) self.assertEqual(output['mae'], 1) self.assertEqual(output['r2'], 0) - + def test_perfect_score(self): # send model enough predictions to fill the buffers with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") - for a in range(1,80): + for a in range(1, 80): output = self.model.evaluate({'value': 1}, 1) # check evaluation results @@ -320,25 +325,26 @@ def test_evaluation_buffers(self): with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") - for a in range(1,100): + for a in range(1, 100): self.model.evaluate({'value': 1}, 1) # check buffers - self.assertEqual(len(self.model.measurements), self.model.eval_periode) + self.assertEqual(len(self.model.measurements), self.model.eval_period) self.assertEqual(len(self.model.predictions), self.model.eval_periode + self.model.horizon) - + def test_predictability_index(self): # fit the model - f = create_testing_file() + f = create_testing_file() score = self.model.fit(f) # test predictability index - self.assertAlmostEqual(self.model.predictability, score['r2']*100, 0) + self.assertAlmostEqual(self.model.predictability, score['r2'] * 100, 0) # clean up os.remove(f) + class LGBMTestCase(unittest.TestCase): def setUp(self): @@ -346,18 +352,19 @@ def setUp(self): class TestClassLGBMProperties(LGBMTestCase): - + def test_sensor(self): self.assertEqual(self.model.sensor, "N1") def test_horizon(self): self.assertEqual(self.model.horizon, 1) - def test_eval_periode(self): - self.assertEqual(self.model.eval_periode, 72) + def test_eval_period(self): + self.assertEqual(self.model.eval_period, 72) def test_split_point(self): - self.assertEqual(self.model.split_point, 0.8) - + self.assertEqual(self.model.split_point, 0.8) + + if __name__ == '__main__': unittest.main(verbosity=2) From e81edc9f393286b651af82b57315e02ab2e1ccf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=20=C5=A0ircelj?= Date: Mon, 27 Sep 2021 14:44:59 +0200 Subject: [PATCH 2/5] Updated model saving --- README.md | 55 ++++++++++----- src/lib/predictive_model.py | 126 ++++++++++++++++++---------------- src/lib/regression_metrics.py | 5 +- src/main.py | 120 +++++++++++++++----------------- src/requirements.txt | 18 ++--- src/tests/test.py | 65 +++++++++++------- 6 files changed, 210 insertions(+), 179 deletions(-) diff --git a/README.md b/README.md index a1cacaa..bfeb559 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Batch Learning Forecasting Component -The component enables using external predictive models from [Scikit Learn](http://scikit-learn.org/stable/index.html) library (for example [Random Forest Regressor](http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html)) implementation in a streaming scenario. Fitting, saving, loading and live prediction are enabled. Live predictions work via Kafka streams (reading feature vectors from Kafka and writing predictions to Kafka). +The component enables using external predictive models from [Scikit Learn](http://scikit-learn.org/stable/index.html) and [PyTorch](https://pytorch.org/) library (for example [Random Forest Regressor](http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html)) implementation in a streaming scenario. Fitting, saving, loading and live prediction are enabled. Live predictions work via Kafka streams (reading feature vectors from Kafka and writing predictions to Kafka). -The predictive model is designed in an decentralized fashion, meaning that several instances (submodels) will be created and used for each specific sensor and horizon (`#submodels = #sensors * #horiozons`). Decentralized architecture enables parallelization. +The predictive model is designed in an decentralized fashion, meaning that several instances (submodels) will be created and used for each specific sensor and horizon (`#submodels = #sensors * #horizons`). Decentralized architecture enables parallelization. The code is available in the `src/` directory. @@ -14,24 +14,32 @@ The code is available in the `src/` directory. |----------|-------------|------| | `-h` | `--help` | show help | | `-c CONFIG` | `--config CONFIG` | path to config file (example: `config.json`) | -| `-f` | `--fit` | Learning the model from dataset (in `/data/fused`)| +| `-f` | `--fit` | learning the model from dataset (in `/data/fused`)| | `-s` | `--save` | save model to file | | `-l` | `--load` | load model from file | -| `-p` | `--predict` | Start live predictions (via Kafka) | +| `-p` | `--predict` | start live predictions (via Kafka) | #### Config file: Config file specifies the Kafka server address, which scikit algorithm to use, prediction horizons and senssors for which the model will be learned/loaded/saved/predicted. Config files are stored in `src/config/`. Parameters: -- **bootstrap_servers**: string (or list of `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata -- **algorithm**: string as scikit-learn model constructor with initialization parameters -- **evaluation_period**: define time period (in hours) for which the model will be evaluated during live predictions (evaluations metrics added to output record) -- **evaluation_split_point**: define training and testing splitting point in the dataset, for model evaluation during learning phase (fit takes twice as long time) -- **prediction_horizons**: list of prediction horizons (in hours) for which the model will be trained to predict for. -- **sensors**: list of sensors for which this specific instance will train the models and will be making predictions. -- **retrain_period**: A number of received samples after which the model will be re-trained. This is an optional parameter. If it is not specified no re-training will be done. -- **samples_for_retrain**: A number of samples that will be used for re-training. If retrain_period is not specified this parameter will be ignored. This is an optional parameter. If it is not specified (and retrain_period is) the re-train will be done on all samples received since the component was started. +| Name | Type | Default | Description | +| --- | --- | --- | --- | +| **prediction_horizons**| list(integer) | | List of prediction horizons (in hours) for which the model will be trained to predict for.| +| **sensors**| list(string) | | List of sensors for which this specific instance will train the models and will be making predictions.| +| **bootstrap_servers**| string or list(string)| | String (or list of `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata.| +| **algorithm**| string | `torch` | String as either a scikit-learn model constructor with initialization parameters or a string `torch` to train using a pre defined neural network using PyTorch with architecture: \[torch.nn.Linear, torch.nn.ReLU, torch.nn.Linear\],| +| **evaluation_period**| integer | 512 | Define time period (in defined time offset that is hours by default) for which the model will be evaluated during live predictions (evaluations metrics added to output record).| +| **evaluation_split_point**| float | 0.8 | Define training and testing splitting point in the dataset, for model evaluation during learning phase (fit takes twice as long time).| +| **retrain_period**| integer | None | A number of received samples after which the model will be re-trained. This is an optional parameter. If it is not specified no re-training will be done.| +| **samples_for_retrain**| integer | None | A number of samples that will be used for re-training. If retrain_period is not specified this parameter will be ignored. This is an optional parameter. If it is not specified (and retrain_period is) the re-train will be done on all samples received since the component was started.| +| **time_offset**| string | H | [String alias](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases) to define the data time offsets. The aliases used in training and topic names are lowercase for backwards compatibility.| +| **learning_rate**| float| 4E-5 | Learning rate for the torch model.| +| **batch_size**| integer | 64 | Size of training batches for torch model.| +| **training_rounds**| integer | 100 | Training rounds for torch model.| +| **num_workers**| integer| 1 | Number of workers for torch model.| + Example of config file: ```json { @@ -66,10 +74,10 @@ Example of cluster config file: Alternatively, process managers like `PM2` or `pman` would be a better fit for the task than `tmux`. ## Assumptions: -- **Training data**: all the training files should be stored in a subfolder called `/data/fused`. Data should be stored as json objects per line (e.g. `{ "timestamp": 1459926000, "ftr_vector": [1, 2, 3]}`). Separate file for each sensor and prediction horizon. Files should be named the same as input kafka topics, that is `{sensor}_{horizon}h` (e.g. `sensor1_3h.json`) -- **Re-training data**: all the re-training data (if re-training is specified) will be stored in a subfolder called `/data/retrain_data` in the same form as training data. Separate files will be made for each sensor and prediction horizon. The names of the files will be in the following form: `{sensor}_{horizon}h_retrain.json` (eg. `sensor1_3h_retrain.json`). -- **Models**: all the models are stored in a subfolder called `/models`. Each sensor and horizon has its own model. The name of the models is composed of sensor name and prediction horizon, `model_{sensor}_{horizon}h` (e.g. `model_sensor1_3h`) -- **Input kafka topic**: The names of input kafka topics on which the prototype is listening for live data should be in the same format as training data file names, that is `features_{sensor}_{horizon}h`. +- **Training data**: all the training files should be stored in a subfolder called `/data/fused`. Data should be stored as json objects per line (e.g. `{ "timestamp": 1459926000, "ftr_vector": [1, 2, 3]}`). Separate file for each sensor and prediction horizon. Files should be named the same as input kafka topics, that is `{sensor}_{horizon}{time_offset.lower()}` (e.g. `sensor1_3h.json`). The target sensor is the first element of `ftr_vector`. +- **Re-training data**: all the re-training data (if re-training is specified) will be stored in a subfolder called `/data/retrain_data` in the same form as training data. Separate files will be made for each sensor and prediction horizon. The names of the files will be in the following form: `{sensor}_{horizon}{time_offset.lower()}_retrain.json` (eg. `sensor1_3h_retrain.json`). +- **Models**: all the models are stored in a subfolder called `/models`. Each sensor and horizon has its own model. The name of the models is composed of sensor name and prediction horizon, `model_{sensor}_{horizon}{time_offset.lower()}` (e.g. `model_sensor1_3h`) +- **Input kafka topic**: The names of input kafka topics on which the prototype is listening for live data should be in the same format as training data file names, that is `features_{sensor}_{horizon}{time_offset.lower()}`. - **Output kafka topic**: Predictions are sent on different topics based on a sensor names, that is `{sensor}` (e.g. `sensor1`). ## Examples: @@ -94,7 +102,7 @@ Alternatively, process managers like `PM2` or `pman` would be a better fit for t ## Requirements -* Python 3.6+ +* Python 3.9+ You can use `pip install -r requirements.txt` to install all the packages. @@ -103,7 +111,11 @@ Unit tests are available in `src/tests`. They are invoked with: `python test.py` ``` codespace:~/workspace/forecasting/src/tests$ python test.py -test_eval_periode (__main__.TestClassProperties) ... ok +test_eval_period (__main__.TestClassLGBMProperties) ... ok +test_horizon (__main__.TestClassLGBMProperties) ... ok +test_sensor (__main__.TestClassLGBMProperties) ... ok +test_split_point (__main__.TestClassLGBMProperties) ... ok +test_eval_period (__main__.TestClassProperties) ... ok test_horizon (__main__.TestClassProperties) ... ok test_sensor (__main__.TestClassProperties) ... ok test_split_point (__main__.TestClassProperties) ... ok @@ -114,11 +126,16 @@ test_perfect_score (__main__.TestModelEvaluation) ... ok test_predictability_index (__main__.TestModelEvaluation) ... ok test_fit (__main__.TestModelFunctionality) ... ok test_predict (__main__.TestModelFunctionality) ... ok +test_retrain (__main__.TestModelFunctionality) ... ok +test_retrain_not_enough_samples (__main__.TestModelFunctionality) ... ok +test_unlimited_retrain_file (__main__.TestModelFunctionality) ... ok test_load (__main__.TestModelSerialization) ... ok test_save (__main__.TestModelSerialization) ... ok +test_fit (__main__.TestPyTorchEvaluation) ... ok +test_predict (__main__.TestPyTorchEvaluation) ... ok ---------------------------------------------------------------------- -Ran 13 tests in 1.096s +Ran 22 tests in 5.482s OK ``` \ No newline at end of file diff --git a/src/lib/predictive_model.py b/src/lib/predictive_model.py index 541a7cb..570518c 100644 --- a/src/lib/predictive_model.py +++ b/src/lib/predictive_model.py @@ -1,6 +1,4 @@ import sklearn -import lightgbm -# from sklearn.externals import joblib import joblib import pandas as pd import collections @@ -8,44 +6,51 @@ import warnings import time import os -import json -from datetime import datetime import torch import numpy as np -from lib.regression_metrics import * +import sys +import lightgbm + +sys.path.insert(0, './lib') +from regression_metrics import mean_absolute_percentage_error class PredictiveModel: """ - Predictive model class is a wrapper for scikit learn regression models + Predictive model class is a wrapper for scikit learn regression models and PyTorch. ref: http://scikit-learn.org/stable/supervised_learning.html#supervised-learning + ref: https://pytorch.org/ """ + def rmse(self, true, pred): + return math.sqrt(sklearn.metrics.mean_squared_error(true, pred)) + def __init__(self, sensor, prediction_horizon, evaluation_period=512, - error_metrics=None, + err_metrics=None, split_point=0.8, algorithm='torch', - encoder_length=None, retrain_period=None, samples_for_retrain=None, retrain_file_location=None, time_offset='H', - learning_rate=0.001, + learning_rate=4 * 10 ** -5, batch_size=64, training_rounds=100, num_workers=1, **kwargs): - if not error_metrics: + self.err_metrics = err_metrics + if not err_metrics: + self.err_metrics = [ {'name': "R2 Score", 'short': "r2", 'function': sklearn.metrics.r2_score}, {'name': "Mean Absolute Error", 'short': "mae", 'function': sklearn.metrics.mean_absolute_error}, {'name': "Mean Squared Error", 'short': "mse", 'function': sklearn.metrics.mean_squared_error}, {'name': "Root Mean Squared Error", 'short': "rmse", - 'function': lambda true, pred: math.sqrt(sklearn.metrics.mean_squared_error(true, pred))}, + 'function': self.rmse}, {'name': "Mean Absolute Percentage Error", 'short': "mape", 'function': mean_absolute_percentage_error} ] @@ -59,16 +64,13 @@ def __init__(self, self.horizon = prediction_horizon self.eval_period = evaluation_period self.split_point = split_point - self.encoder_length = encoder_length self.measurements = collections.deque(maxlen=self.eval_period) self.predictions = collections.deque(maxlen=(self.eval_period + self.horizon)) self.predictability = None self.time_offset = time_offset - # Alternate fit arguments for Torch networks + # Torch model arguments self.encoder_length = 1 - if encoder_length is not None: - self.encoder_length = encoder_length self.learning_rate = learning_rate self.batch_size = batch_size self.training_rounds = training_rounds @@ -83,7 +85,7 @@ def __init__(self, if self.retrain_period is not None: # Initialize file - filename = "{}_{}h_retrain.json".format(sensor, prediction_horizon) + filename = "{}_{}{}_retrain.json".format(sensor, prediction_horizon, time_offset) self.train_file_path = os.path.join(retrain_file_location, filename) open(self.train_file_path, "w").close() @@ -93,11 +95,9 @@ def fit(self, filename): data = pd.read_json(data_file, lines=True) # if not valid json # set datetime as index data.set_index('timestamp', inplace=True) - # data.index = [datetime.fromtimestamp(i) for i in data.index] # transform ftr_vector from array to separate fields data = data['ftr_vector'].apply(pd.Series) - # print(data) # get features all_features = list(data) @@ -133,10 +133,9 @@ def fit(self, filename): for rec in x_test: start1 = time.time() - pred = evaluation_model.predict(rec.reshape(1, -1)) + evaluation_model.predict(rec.reshape(1, -1)) end = time.time() latency = end - start1 - # print(latency) data_file.write("{}\n".format(latency)) # testing predictions @@ -153,50 +152,11 @@ def fit(self, filename): output[metrics['short']] = metrics['function'](true, pred) return output - class TorchNetwork: - - def __init__(self, parent): - self.parent = parent - self.ann = None - - def fit(self, x, y): - loss_function = torch.nn.MSELoss() # error/cost function f(ANN(x),y), i.e. degree of misprediction - dimensions = [np.shape(x)[1], (np.shape(x)[1] + 1) // 2, 1] - layers = (2 * len(dimensions) - 3) * [torch.nn.ReLU()] # layers of the ann - layers[::2] = [torch.nn.Linear(dimensions[k], dimensions[k + 1]) for k in range(len(dimensions) - 1)] - self.ann = torch.nn.Sequential(*layers) - - opt = torch.optim.Adam(self.ann.parameters(), - lr=self.parent.learning_rate) # F will adjust the ANN's weights to minimize f(ANN(x),y) - - train_batches = torch.utils.data.DataLoader( - # partition 3k-tuples randomly into batches of 64 for nt-thread parallelization - [[x[i], y[i]] for i in range(len(x))], batch_size=self.parent.batch_size, shuffle=True, - num_workers=self.parent.num_workers) - losses = [] - for training_round in range( - self.parent.training_rounds): # if the number of rounds is too small/large, we get underfitting/overfitting - i, loss = 0, 0 - print('Train epoch \t' + str(training_round), end=' ') - for x, y in train_batches: # randomly select a batch of 64 pairs x,y, each of length 3k,3l - opt.zero_grad() # Training pass; set the gradients to 0 before each loss calculation. - loss = loss_function(self.ann(x.float()), - y.unsqueeze(1).float()) # calculate the error/cost/loss, this is a 0-dim tensor (number) - loss.backward() # backpropagation: compute gradients (this is where the ANN learns) - if i % 1000 == 0: - print(round(loss.item(), 3), end='\t') - opt.step() # apply gradients to improve weights ann[k].weight.grad to minimize f - i, loss = i + 1, loss + loss.item() # item() converts a 0-dim tensor to a number - losses.append(loss) - print('loss:', losses[-1]) - - def predict(self, x): - return self.ann(torch.tensor(x).float()).detach().numpy() - def predict(self, ftr_vector, timestamp): prediction = self.model.predict(ftr_vector) - # Retrain stuff + # TODO retraining should be done in asynchronous fashion to prevent long training time to freeze up the whole + # process of predictions. if self.retrain_period is not None: # Add current ftr_vector to file with open(self.train_file_path, 'r') as data_r: @@ -256,3 +216,47 @@ def save(self, filename): def load(self, filename): self.model = joblib.load(filename) # print "Loaded model from", filename + + class TorchNetwork: + """ + Wrapper for PyTorch. The class is designed to behave as a sklearn model with fit and predict methods. + """ + + def __init__(self, parent): + self.parent = parent + self.ann = None + + def fit(self, x, y): + loss_function = torch.nn.MSELoss() + dimensions = [np.shape(x)[1], (np.shape(x)[1] + 1) // 2, 1] + layers = (2 * len(dimensions) - 3) * [torch.nn.ReLU()] + layers[::2] = [torch.nn.Linear(dimensions[k], dimensions[k + 1]) for k in range(len(dimensions) - 1)] + self.ann = torch.nn.Sequential(*layers) + + opt = torch.optim.Adam(self.ann.parameters(), lr=self.parent.learning_rate) + + # partition 3k-tuples randomly into batches of 64 for nt-thread parallelization + train_batches = torch.utils.data.DataLoader( + [[x[i], y[i]] for i in range(len(x))], batch_size=self.parent.batch_size, shuffle=True, + num_workers=self.parent.num_workers) + + losses = [] + for training_round in range(self.parent.training_rounds): + i, loss = 0, 0 + + # randomly select a batch of 64 pairs x,y, each of length 3k,3l + for x, y in train_batches: + # Training pass; set the gradients to 0 before each loss calculation. + opt.zero_grad() + loss = loss_function(self.ann(x.float()), y.unsqueeze(1).float()) + + # backpropagation: compute gradients + loss.backward() + + # apply gradients to improve weights ann[k].weight.grad to minimize f + opt.step() + i, loss = i + 1, loss + loss.item() + losses.append(loss) + + def predict(self, x): + return self.ann(torch.tensor(x).float()).detach().tolist() diff --git a/src/lib/regression_metrics.py b/src/lib/regression_metrics.py index 374e967..3f134a0 100644 --- a/src/lib/regression_metrics.py +++ b/src/lib/regression_metrics.py @@ -20,7 +20,7 @@ def mean_absolute_percentage_error(y_true, y_pred): # Check if y_pred has any zero elements. If yes, remove them, and raise warning zero_indices = np.flatnonzero(y_true == 0) - if (zero_indices.size != 0): + if zero_indices.size != 0: y_true = np.delete(y_true, zero_indices) y_pred = np.delete(y_pred, zero_indices) @@ -30,9 +30,6 @@ def mean_absolute_percentage_error(y_true, y_pred): return np.mean(np.abs((y_true - y_pred) / y_true)) * 100 -import numpy as np - - # MASE - Mean Absolute Scaled Error def mean_absolute_scaled_error(training_series, naive_training_series, testing_series, prediction_series): """ diff --git a/src/main.py b/src/main.py index 9788555..993f3e7 100644 --- a/src/main.py +++ b/src/main.py @@ -5,52 +5,46 @@ import json import time import os.path -import sklearn import threading import requests - from sklearn.ensemble import RandomForestRegressor from sklearn.linear_model import Ridge -import joblib -from lib.regression_metrics import * +import sklearn.metrics from kafka import KafkaConsumer from kafka import KafkaProducer -import numpy as np -import pandas as pd -from lib.predictive_model import PredictiveModel -# adding lib subdirectory sys.path.insert(0, './lib') +from predictive_model import PredictiveModel -def get_model_file_name(sensor, horizon): +def get_model_file_name(sensor, horizon, time_period='h'): subdir = 'models' if not os.path.isdir(subdir): os.makedirs(subdir) - filename = "model_{}_{}h".format(sensor, horizon) + filename = "model_{}_{}{}".format(sensor, horizon, time_period) filepath = os.path.join(subdir, filename) return filepath -def get_data_file_name(sensor, horizon): +def get_data_file_name(sensor, horizon, time_period='h'): subdir = '../../data/fused' if not os.path.isdir(subdir): os.makedirs(subdir) - filename = "{}_{}h.json".format(sensor, horizon) + filename = "{}_{}{}.json".format(sensor, horizon, time_period) filepath = os.path.join(subdir, filename) return filepath -def get_input_data_topics(sensors, horizons): +def get_input_data_topics(sensors, horizons, time_period): topics = [] for sensor in sensors: for horizon in horizons: - topics.append("features_{}_{}h".format(sensor, horizon)) + topics.append("features_{}_{}{}".format(sensor, horizon, time_period)) return topics @@ -90,14 +84,6 @@ def main(): help=u"Learning the model from dataset in subfolder '../../data/fused'", ) - parser.add_argument( - "-a", - "--alternate_it", - action='store_true', - dest="alternate_fit", - help=u"Learning the model from csv file in subfolder '../../data/fused'", - ) - parser.add_argument( "-s", "--save", @@ -143,9 +129,13 @@ def main(): # Initialize models print("\n=== Init phase ===") - models = {} + models = dict() kwargs = dict() sensors, horizons = None, None + time_offset = 'h' + + if 'time_offset' in conf: + time_offset = conf['time_offset'].lower() for key in conf: if 'sensors' == key: @@ -154,6 +144,8 @@ def main(): horizons = conf[key] else: kwargs[key] = conf[key] + if 'time_offset' == key: + time_offset = conf['time_offset'].lower() for sensor in sensors: models[sensor] = {} @@ -161,7 +153,7 @@ def main(): models[sensor][horizon] = PredictiveModel(sensor, horizon, **kwargs) - print("Initializing model_{}_{}h".format(sensor, horizon)) + print("Initializing model_{}_{}{}".format(sensor, horizon, time_offset)) # Model learning if args.fit: @@ -171,14 +163,16 @@ def main(): for horizon in horizons: start = time.time() data = get_data_file_name(sensor, horizon) - # try: - score = models[sensor][horizon].fit(data) - end = time.time() - print("Model[{0}_{1}h] training time: {2:.1f}s, evaluations: {3})".format(sensor, horizon, - end - start, - str(score))) - # except Exception as e: - # print(e) + try: + score = models[sensor][horizon].fit(data) + end = time.time() + print("Model[{0}_{1}{2}] training time: {3:.1f}s, evaluations: {4})".format(sensor, + horizon, + time_offset, + end - start, + str(score))) + except Exception as e: + print(e) # Model saving if args.save: @@ -211,7 +205,7 @@ def main(): print("\n=== Predictions phase ===") # Start Kafka consumer - topics = get_input_data_topics(sensors, horizons) + topics = get_input_data_topics(sensors, horizons, time_offset) consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers']) consumer.subscribe(topics) print("Subscribed to topics: ", topics) @@ -221,45 +215,45 @@ def main(): value_serializer=lambda v: json.dumps(v).encode('utf-8')) for msg in consumer: - try: - rec = eval(msg.value) - timestamp = rec['timestamp'] - ftr_vector = rec['ftr_vector'] - measurement = ftr_vector[0] # first feature is the target measurement + # try: + rec = eval(msg.value) + timestamp = rec['timestamp'] + ftr_vector = rec['ftr_vector'] + measurement = ftr_vector[0] # first feature is the target measurement - topic = msg.topic + topic = msg.topic - # extract sensor and horizon info from topic name - horizon = int(topic.split("_")[-1][:-1]) - sensor = topic.split("_")[-2] + # extract sensor and horizon info from topic name + horizon = int(topic.split("_")[-1][:-len(time_offset)]) + sensor = topic.split("_")[-2] - # predictions - model = models[sensor][horizon] - predictions = model.predict([ftr_vector], timestamp) + # predictions + model = models[sensor][horizon] + predictions = model.predict([ftr_vector], timestamp) - # output record - output = {'stampm': timestamp, - 'value': predictions[0], - 'sensor_id': sensor, - 'horizon': horizon, - 'predictability': model.predictability} + # output record + output = {'stampm': timestamp, + 'value': predictions[0], + 'sensor_id': sensor, + 'horizon': horizon, + 'predictability': model.predictability} - # evaluation - output = model.evaluate(output, measurement) # appends evaluations to output + # evaluation + output = model.evaluate(output, measurement) # appends evaluations to output - # send result to kafka topic - output_topic = "predictions_{}".format(sensor) - future = producer.send(output_topic, output) + # send result to kafka topic + output_topic = "predictions_{}".format(sensor) + future = producer.send(output_topic, output) - print(output_topic + ": " + str(output)) - - try: - record_metadata = future.get(timeout=10) - except Exception as e: - print('Producer error: ' + str(e)) + print(output_topic + ": " + str(output)) + try: + future.get(timeout=10) except Exception as e: - print('Consumer error: ' + str(e)) + print('Producer error: ' + str(e)) + + # except Exception as e: + # print('Consumer error: ' + str(e)) if __name__ == '__main__': diff --git a/src/requirements.txt b/src/requirements.txt index 1c076ce..3c96233 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -1,16 +1,18 @@ certifi==2020.6.20 chardet==3.0.4 idna==2.10 -joblib==0.16.0 -kafka-python==2.0.1 -numpy==1.16.3 -pandas==1.1.2 +joblib == 1.0.1 +kafka_python == 2.0.2 +lightgbm == 3.2.1 +numpy == 1.21.2 +pandas == 1.3.2 python-dateutil==2.8.0 pytz==2019.1 -requests==2.24.0 -scikit-learn==0.23.2 -scipy==1.5.2 +requests == 2.26.0 +scikit_learn == 0.24.2 +scipy==1.7.1 six==1.12.0 sklearn==0.0 threadpoolctl==2.1.0 -urllib3==1.25.10 +torch == 1.9.0 +urllib3==1.25.10 \ No newline at end of file diff --git a/src/tests/test.py b/src/tests/test.py index 9131a9f..487c124 100644 --- a/src/tests/test.py +++ b/src/tests/test.py @@ -4,12 +4,14 @@ import time import unittest import sklearn.metrics -from lib.predictive_model import PredictiveModel +from sklearn.ensemble import RandomForestRegressor import os - import warnings +import math +import numpy as np sys.path.insert(0, '../lib') +from predictive_model import PredictiveModel def create_testing_file(): @@ -62,31 +64,21 @@ def create_testing_file_for_retrain(): return filepath -def create_model_instance(model_string, retrain_period=None, samples_for_retrain=None): - algorithm = model_string - sensor = "N1" - horizon = 1 - evaluation_period = 72 - evaluation_split_point = 0.8 - error_metrics = [ - {'name': "R2 Score", 'short': "r2", 'function': sklearn.metrics.r2_score}, - {'name': "Mean Absolute Error", 'short': "mae", 'function': sklearn.metrics.mean_absolute_error}, - {'name': "Mean Squared Error", 'short': "mse", 'function': sklearn.metrics.mean_squared_error}, - {'name': "Root Mean Squared Error", 'short': "rmse", 'function': None} - ] - model = PredictiveModel(algorithm, sensor, horizon, evaluation_period, - error_metrics, evaluation_split_point, retrain_period, - samples_for_retrain, os.path.join('.', 'test', 'retrain_data')) - +def create_model_instance(model_string, **kwargs): + model = PredictiveModel(algorithm=model_string, + sensor="N1", + prediction_horizon=1, + evaluation_period=72, + retrain_file_location=os.path.join('.', 'test', 'retrain_data'), + **kwargs) return model class SimpleWidgetTestCase(unittest.TestCase): def setUp(self): - self.model = create_model_instance( - "sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", - retrain_period=None, samples_for_retrain=None) + self.model = create_model_instance(model_string="sklearn.ensemble.RandomForestRegressor(n_estimators=100, n" + + "_jobs=16, random_state=0)") class TestClassProperties(SimpleWidgetTestCase): @@ -167,7 +159,7 @@ def test_unlimited_retrain_file(self): model_for_retrain = create_model_instance( "sklearn.ensemble.RandomForestRegressor(n_estimators=100, n_jobs=16, random_state=0)", - retrain_period=10, samples_for_retrain=None) + retrain_period=10) # Fit the model model_for_retrain.fit(f) @@ -180,7 +172,7 @@ def test_unlimited_retrain_file(self): timestamp = start_timestamp + i * 60 * 60 p = model_for_retrain.predict([[1]], timestamp=timestamp) - if (i < 10): + if i < 10: self.assertEqual(p[0], 0.) else: self.assertEqual(p[0], 1.) @@ -330,7 +322,7 @@ def test_evaluation_buffers(self): # check buffers self.assertEqual(len(self.model.measurements), self.model.eval_period) - self.assertEqual(len(self.model.predictions), self.model.eval_periode + self.model.horizon) + self.assertEqual(len(self.model.predictions), self.model.eval_period + self.model.horizon) def test_predictability_index(self): @@ -366,5 +358,30 @@ def test_split_point(self): self.assertEqual(self.model.split_point, 0.8) +class PyTorchTestCase(unittest.TestCase): + + def setUp(self): + self.model = create_model_instance( + "torch", + learning_rate=0.001, + batch_size=10, + training_rounds=2 + ) + + +class TestPyTorchEvaluation(PyTorchTestCase): + + def test_fit(self): + f = create_testing_file() + score = self.model.fit(f) + self.assertIsInstance(score, dict) + + def test_predict(self): + f = create_testing_file() + self.model.fit(f) + prediction = self.model.predict([[1, 1, 1]], timestamp=time.time()) + self.assertEqual(np.shape(prediction), (1, 1)) + + if __name__ == '__main__': unittest.main(verbosity=2) From b3a2f9ee4f9ddd28078514d14d4918d825c58ab8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=20=C5=A0ircelj?= Date: Tue, 12 Oct 2021 12:06:11 +0200 Subject: [PATCH 3/5] GMM imputer layer added. --- README.md | 48 +++++++++++-- src/lib/gmm_linear_layer.py | 122 ++++++++++++++++++++++++++++++++++ src/lib/predictive_model.py | 85 +++++++++-------------- src/lib/regression_metrics.py | 9 +++ src/lib/torch_network.py | 61 +++++++++++++++++ src/main.py | 87 ++++++++++++------------ 6 files changed, 306 insertions(+), 106 deletions(-) create mode 100644 src/lib/gmm_linear_layer.py create mode 100644 src/lib/torch_network.py diff --git a/README.md b/README.md index bfeb559..ccb6aeb 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,20 @@ # Batch Learning Forecasting Component -The component enables using external predictive models from [Scikit Learn](http://scikit-learn.org/stable/index.html) and [PyTorch](https://pytorch.org/) library (for example [Random Forest Regressor](http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html)) implementation in a streaming scenario. Fitting, saving, loading and live prediction are enabled. Live predictions work via Kafka streams (reading feature vectors from Kafka and writing predictions to Kafka). - -The predictive model is designed in an decentralized fashion, meaning that several instances (submodels) will be created and used for each specific sensor and horizon (`#submodels = #sensors * #horizons`). Decentralized architecture enables parallelization. +The component enables using external predictive models from [PyTorch](https://pytorch.org/) +and [Scikit Learn](http://scikit-learn.org/stable/index.html) library +(for example [Random Forest Regressor](http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html)) +implementation in a streaming scenario. Fitting, saving, loading and live prediction are enabled. Live predictions work +via Kafka streams (reading feature vectors from Kafka and writing predictions to Kafka). + +PyTorch models can have with an additional hidden layer that can process missing +data by replacing typical neuron's response in by its expected value using a Gaussian mixture model (GMM). The method is an +implementation from paper +[Processing of missing data by neural networks](https://arxiv.org/abs/1805.07405). +Original implementation in tensorflow is available on [this repository](https://github.com/lstruski/Processing-of-missing-data-by-neural-networks). + +The predictive model is designed in an decentralized fashion, meaning that several instances (submodels) +will be created and used for each specific sensor and horizon (`#submodels = #sensors * #horizons`). +Decentralized architecture enables parallelization. The code is available in the `src/` directory. @@ -18,15 +30,17 @@ The code is available in the `src/` directory. | `-s` | `--save` | save model to file | | `-l` | `--load` | load model from file | | `-p` | `--predict` | start live predictions (via Kafka) | +| `-w` | `--watchdog` | start watchdog pinging | #### Config file: -Config file specifies the Kafka server address, which scikit algorithm to use, prediction horizons and senssors for which the model will be learned/loaded/saved/predicted. Config files are stored in `src/config/`. +Config file specifies the Kafka server address, which algorithm to use, prediction horizons and sensors for which the model will be learned/loaded/saved/predicted. Config files are stored in `src/config/`. -Parameters: +General Parameters: | Name | Type | Default | Description | | --- | --- | --- | --- | -| **prediction_horizons**| list(integer) | | List of prediction horizons (in hours) for which the model will be trained to predict for.| +| **prediction_horizons**| list(integer) | | List of prediction horizons (in units specified in time_offset) for which the model will be trained to predict for.| +| **time_offset**| string | H | [String alias](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases) to define the data time offsets. The aliases used in training and topic names are lowercase for backwards compatibility.| | **sensors**| list(string) | | List of sensors for which this specific instance will train the models and will be making predictions.| | **bootstrap_servers**| string or list(string)| | String (or list of `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata.| | **algorithm**| string | `torch` | String as either a scikit-learn model constructor with initialization parameters or a string `torch` to train using a pre defined neural network using PyTorch with architecture: \[torch.nn.Linear, torch.nn.ReLU, torch.nn.Linear\],| @@ -34,11 +48,31 @@ Parameters: | **evaluation_split_point**| float | 0.8 | Define training and testing splitting point in the dataset, for model evaluation during learning phase (fit takes twice as long time).| | **retrain_period**| integer | None | A number of received samples after which the model will be re-trained. This is an optional parameter. If it is not specified no re-training will be done.| | **samples_for_retrain**| integer | None | A number of samples that will be used for re-training. If retrain_period is not specified this parameter will be ignored. This is an optional parameter. If it is not specified (and retrain_period is) the re-train will be done on all samples received since the component was started.| -| **time_offset**| string | H | [String alias](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases) to define the data time offsets. The aliases used in training and topic names are lowercase for backwards compatibility.| +| **watchdog_path**| string | None | Watchdog path. | +| **watchdog_interval**| integer | 60 | Delay in seconds between each Watchdog ping | +| **watchdog_url**| string | `localhost` | Watchdog url. | +| **watchdog_port**| integer | 3001 | Watchdog port. | +PyTorch parameters: + +| Name | Type | Default | Description | +| --- | --- | --- | --- | | **learning_rate**| float| 4E-5 | Learning rate for the torch model.| | **batch_size**| integer | 64 | Size of training batches for torch model.| | **training_rounds**| integer | 100 | Training rounds for torch model.| | **num_workers**| integer| 1 | Number of workers for torch model.| + +GMM Layer parameters: + +| Name | Type | Default | Description | +| --- | --- | --- | --- | +| **gmm_layer**| boolean| False | If `true` the gmm layer is added to the model. | +| **initial_imputer**| string | `simple` | Options are `simple` or `iterative` which uses either sklearn [SimpleImputer](https://scikit-learn.org/stable/modules/generated/sklearn.impute.SimpleImputer.html) or [IterativeImputer](https://scikit-learn.org/stable/modules/generated/sklearn.impute.IterativeImputer.html) | +| **max_iter**| integer| 15 | If the iterative imputer is chosen, this arguments defines maximum number of iterations for it.| +| **n_gmm**| integer| 5 | Number of components of GaussianMixture. If n_gmm is set to -1, then all values between min_n_gmm and max_n_gmm are checked and the one with the best BIC score is chosen. | +| **min_n_gmm**| integer| 1 | Minimum number of components for GMM if search is enabled. | +| **max_n_gmm**| integer| 10 | Maximum number of components for GMM if search is enabled.| +| **gmm_seed**| integer| None | Random state seed for GMM. | +| **verbose**| boolean| False | If set to `True` the progress and results of n_gmm parameter search is displayed.| Example of config file: ```json diff --git a/src/lib/gmm_linear_layer.py b/src/lib/gmm_linear_layer.py new file mode 100644 index 0000000..99a8f98 --- /dev/null +++ b/src/lib/gmm_linear_layer.py @@ -0,0 +1,122 @@ +import torch +from torch import nn +import numpy as np +from sklearn.mixture import GaussianMixture + + +def nr(x): + """ Helper function for GMM Linear layer. Works for matrix x of shape (n_samples, n_features). """ + return 1 / (np.sqrt(2 * np.pi)) * torch.exp(-torch.square(x) / 2) + x / 2 * (1 + torch.erf(x / np.sqrt(2))) + + +def linear_relu_missing_values(W, b, x, p, mean, cov): + """ Helper function for GMM Linear layer. It can take all samples at once, but it applies only one Gaussian. """ + m = torch.where(torch.isnan(x), mean, x) + sigma = torch.where(torch.isnan(x), torch.abs(cov), torch.tensor(0.0)) + return p * nr((m @ W + b) / torch.sqrt( + (torch.square(W) * torch.abs(sigma.view([sigma.shape[0], sigma.shape[1], 1]))).sum(axis=1))) + + +class GMMLinear(nn.Module): + """ Layer for processing missing data from the paper Processing of missing data by neural networks. """ + + def __init__(self, in_features, out_features, gmm_weights, gmm_means, gmm_covariances): + """ + in_features and out_features are number of input and output features, + other parameters are outputs of GaussianMixture. + """ + super(GMMLinear, self).__init__() + + self.in_features = in_features + self.out_features = out_features + + self.gmm_weights = nn.Parameter(torch.tensor(np.log(gmm_weights)).float()) + self.gmm_means = nn.Parameter(torch.tensor(gmm_means).float()) + self.gmm_covariances = nn.Parameter(torch.tensor(np.abs(gmm_covariances)).float()) + self.n_gmm = len(gmm_weights) + + if not self.gmm_means.shape == (self.n_gmm, self.in_features): + raise Exception('gmm_means does not match correct shape (n_components, n_features)') + if not self.gmm_covariances.shape == (self.n_gmm, self.in_features): + raise Exception("gmm_covariances does not match correct shape (n_components, n_features). \ + GaussianMixture must be called with parameter covariance_type='diag'.") + + # weight matrix and bias of this layer + self.W = nn.Parameter(torch.randn([self.in_features, self.out_features])) + self.b = nn.Parameter(torch.randn([self.out_features])) + + def forward(self, x): + indices_full = torch.logical_not(torch.isnan(x).any(axis=1)) + indices_missing = torch.isnan(x).any(axis=1) + x_full = x[indices_full] + x_missing = x[indices_missing] + p = nn.functional.softmax(self.gmm_weights, dim=0) + out_missing = linear_relu_missing_values(self.W, self.b, x_missing, p[0], self.gmm_means[0], + self.gmm_covariances[0]) + + for i in range(1, self.n_gmm): + out_missing += linear_relu_missing_values(self.W, self.b, x_missing, p[i], self.gmm_means[i], + self.gmm_covariances[i]) + + out_full = nn.functional.relu(x_full @ self.W + self.b) + + out = torch.zeros(size=(x.shape[0], self.out_features)) + out[indices_full] = out_full + out[indices_missing] = out_missing + + assert torch.logical_not(torch.any(torch.isnan(out))) + return out + + +def create_gmm_linear_layer(X, in_features, out_features, initial_imputer, n_gmm, min_n_gmm=1, max_n_gmm=10, + verbose=True, gmm_seed=None): + """ + Returns object of class GMMLinear. X is input data with missing values, which are imputed with initial_imputer + and then used as input to GaussianMixture. If initial_imputer is None, then we assume X is already imputed data + without missing values. n_gmm is number of components of GaussianMixture. If n_gmm is set to -1, + then all values between min_n_gmm and max_n_gmm are checked and the one with the best BIC score is chosen. + """ + if initial_imputer is not None: + x_imputed = initial_imputer.fit_transform(X) + else: + x_imputed = X + + if n_gmm == -1: + n_gmm = best_gmm_n_components(x_imputed, min_n_gmm, max_n_gmm, verbose) + if verbose: + print('Best n_components =', n_gmm) + + gmm = GaussianMixture(n_components=n_gmm, covariance_type='diag', random_state=gmm_seed).fit(x_imputed) + return GMMLinear(in_features, out_features, gmm.weights_, gmm.means_, gmm.covariances_) + + +def best_gmm_n_components(X, n_min, n_max, verbose=True, gmm_seed=None): + """ Returns best number of components for GaussianMixture (between n_min and n_max) based on BIC score. """ + min_n = -1 + min_bic = np.infty + for n_components in range(n_min, n_max + 1): + gmm = GaussianMixture(n_components=n_components, covariance_type='diag', random_state=gmm_seed).fit(X) + bic = gmm.bic(X) + if verbose: + print(f'n_components={n_components},\tBIC={bic}') + if min_bic > bic: + min_bic = bic + min_n = n_components + return min_n + + +def build_multilayer_model(X, dimensions, initial_imputer, n_gmm, gmm_seed): + """ + Returns neural network with first layer GMMLinear and the rest normal linear layers with ReLU activation. + Number and dimensions of layers are defined with array dimensions. + """ + gmm_layer = create_gmm_linear_layer(X, dimensions[0], dimensions[1], initial_imputer, n_gmm, min_n_gmm=1, + max_n_gmm=10, verbose=True, gmm_seed=gmm_seed) + + layers_list = [gmm_layer] + for i in range(1, len(dimensions) - 1): + if i > 1: + layers_list.append(nn.ReLU()) + layers_list.append(nn.Linear(dimensions[i], dimensions[i + 1])) + + return nn.Sequential(*layers_list) diff --git a/src/lib/predictive_model.py b/src/lib/predictive_model.py index 570518c..4837fe8 100644 --- a/src/lib/predictive_model.py +++ b/src/lib/predictive_model.py @@ -6,13 +6,14 @@ import warnings import time import os -import torch -import numpy as np import sys import lightgbm +from sklearn.experimental import enable_iterative_imputer +from sklearn.impute import IterativeImputer, SimpleImputer sys.path.insert(0, './lib') -from regression_metrics import mean_absolute_percentage_error +from regression_metrics import mean_absolute_percentage_error, rmse +from torch_network import TorchNetwork class PredictiveModel: @@ -22,9 +23,6 @@ class PredictiveModel: ref: https://pytorch.org/ """ - def rmse(self, true, pred): - return math.sqrt(sklearn.metrics.mean_squared_error(true, pred)) - def __init__(self, sensor, prediction_horizon, @@ -40,23 +38,30 @@ def __init__(self, batch_size=64, training_rounds=100, num_workers=1, + gmm_layer="False", + initial_imputer="simple", + max_iter=15, + n_gmm=5, + min_n_gmm=1, + max_n_gmm=10, + gmm_seed=None, + verbose=False, **kwargs): self.err_metrics = err_metrics if not err_metrics: - self.err_metrics = [ {'name': "R2 Score", 'short': "r2", 'function': sklearn.metrics.r2_score}, {'name': "Mean Absolute Error", 'short': "mae", 'function': sklearn.metrics.mean_absolute_error}, {'name': "Mean Squared Error", 'short': "mse", 'function': sklearn.metrics.mean_squared_error}, {'name': "Root Mean Squared Error", 'short': "rmse", - 'function': self.rmse}, + 'function': rmse}, {'name': "Mean Absolute Percentage Error", 'short': "mape", 'function': mean_absolute_percentage_error} ] self.algorithm = algorithm - if "torch" == algorithm: + if algorithm == "torch": self.model = self.TorchNetwork(self) else: self.model = eval(self.algorithm) @@ -76,6 +81,22 @@ def __init__(self, self.training_rounds = training_rounds self.num_workers = num_workers + # GMM layer arguments + self.gmm_layer = True + if gmm_layer.lower() == "false": + self.gmm_layer = False + + if initial_imputer.lower() == 'iterative': + self.initial_imputer = IterativeImputer(max_iter=max_iter) + else: + self.initial_imputer = SimpleImputer() + + self.n_gmm = n_gmm + self.min_n_gmm = min_n_gmm + self.max_n_gmm = max_n_gmm + self.gmm_seed = gmm_seed + self.verbose = verbose + # Retrain configurations self.samples_for_retrain = samples_for_retrain self.retrain_period = retrain_period @@ -211,52 +232,6 @@ def evaluate(self, output, measurement): def save(self, filename): joblib.dump(self.model, filename, compress=3) - # print "Saved model to", filename def load(self, filename): self.model = joblib.load(filename) - # print "Loaded model from", filename - - class TorchNetwork: - """ - Wrapper for PyTorch. The class is designed to behave as a sklearn model with fit and predict methods. - """ - - def __init__(self, parent): - self.parent = parent - self.ann = None - - def fit(self, x, y): - loss_function = torch.nn.MSELoss() - dimensions = [np.shape(x)[1], (np.shape(x)[1] + 1) // 2, 1] - layers = (2 * len(dimensions) - 3) * [torch.nn.ReLU()] - layers[::2] = [torch.nn.Linear(dimensions[k], dimensions[k + 1]) for k in range(len(dimensions) - 1)] - self.ann = torch.nn.Sequential(*layers) - - opt = torch.optim.Adam(self.ann.parameters(), lr=self.parent.learning_rate) - - # partition 3k-tuples randomly into batches of 64 for nt-thread parallelization - train_batches = torch.utils.data.DataLoader( - [[x[i], y[i]] for i in range(len(x))], batch_size=self.parent.batch_size, shuffle=True, - num_workers=self.parent.num_workers) - - losses = [] - for training_round in range(self.parent.training_rounds): - i, loss = 0, 0 - - # randomly select a batch of 64 pairs x,y, each of length 3k,3l - for x, y in train_batches: - # Training pass; set the gradients to 0 before each loss calculation. - opt.zero_grad() - loss = loss_function(self.ann(x.float()), y.unsqueeze(1).float()) - - # backpropagation: compute gradients - loss.backward() - - # apply gradients to improve weights ann[k].weight.grad to minimize f - opt.step() - i, loss = i + 1, loss + loss.item() - losses.append(loss) - - def predict(self, x): - return self.ann(torch.tensor(x).float()).detach().tolist() diff --git a/src/lib/regression_metrics.py b/src/lib/regression_metrics.py index 3f134a0..f4916f3 100644 --- a/src/lib/regression_metrics.py +++ b/src/lib/regression_metrics.py @@ -1,4 +1,6 @@ import numpy as np +import math +import sklearn import warnings @@ -51,3 +53,10 @@ def mean_absolute_scaled_error(training_series, naive_training_series, testing_s errors = np.abs(testing_series - prediction_series) return errors.mean() / d + + +def rmse(true, pred): + """ + Computes square of mean squared error metric on true and predicted inputs + """ + return math.sqrt(sklearn.metrics.mean_squared_error(true, pred)) diff --git a/src/lib/torch_network.py b/src/lib/torch_network.py new file mode 100644 index 0000000..4c355c0 --- /dev/null +++ b/src/lib/torch_network.py @@ -0,0 +1,61 @@ +import torch +import numpy as np +from sklearn.experimental import enable_iterative_imputer +from sklearn.impute import IterativeImputer, SimpleImputer + +sys.path.insert(0, './lib') +from gmm_linear_layer import create_gmm_linear_layer + + +class TorchNetwork: + """ + Wrapper for PyTorch. The class is designed to behave as a sklearn model with fit and predict methods. + """ + + def __init__(self, parent): + self.parent = parent + self.ann = None + + def fit(self, x, y): + loss_function = torch.nn.MSELoss() + dimensions = [np.shape(x)[1], (np.shape(x)[1] + 1) // 2, 1] + layers = (2 * len(dimensions) - 3) * [torch.nn.ReLU()] + layers[::2] = [torch.nn.Linear(dimensions[k], dimensions[k + 1]) for k in range(len(dimensions) - 1)] + + if self.parent.gmm_layer: + gmm_layer = create_gmm_linear_layer(x, np.shape(x)[1], np.shape(x)[1], self.parent.initial_imputer, + n_gmm=self.parent.n_gmm, + min_n_gmm=self.parent.min_n_gmm, + max_n_gmm=self.parent.max_n_gmm, + gmm_seed=self.parent.gmm_seed, + verbose=self.parent.verbose) + self.ann = torch.nn.Sequential(gmm_layer, *layers) + else: + self.ann = torch.nn.Sequential(*layers) + + opt = torch.optim.Adam(self.ann.parameters(), lr=self.parent.learning_rate) + + # partition 3k-tuples randomly into batches of 64 for nt-thread parallelization + train_batches = torch.utils.data.DataLoader( + [[x[i], y[i]] for i in range(len(x))], batch_size=self.parent.batch_size, shuffle=True, + num_workers=self.parent.num_workers) + + for training_round in range(self.parent.training_rounds): + print(f'Train epoch {training_round:3}', end="\t") + + loss = 0 + # randomly select a batch of 64 pairs x,y, each of length 3k,3l + for x, y in train_batches: + # Training pass; set the gradients to 0 before each loss calculation. + opt.zero_grad() + loss = loss_function(self.ann(x.float()), y.unsqueeze(1).float()) + + # backpropagation: compute gradients + loss.backward() + + # apply gradients to improve weights ann[k].weight.grad to minimize f + opt.step() + print(f'loss:{loss.item():12.3f}') + + def predict(self, x): + return self.ann(torch.tensor(x).float()).detach().tolist() diff --git a/src/main.py b/src/main.py index 993f3e7..6a81821 100644 --- a/src/main.py +++ b/src/main.py @@ -10,6 +10,7 @@ from sklearn.ensemble import RandomForestRegressor from sklearn.linear_model import Ridge import sklearn.metrics +import traceback from kafka import KafkaConsumer from kafka import KafkaProducer @@ -49,20 +50,15 @@ def get_input_data_topics(sensors, horizons, time_period): return topics -def ping_watchdog(): - interval = 60 # ping interval in seconds - url = "localhost" - port = 3001 - path = "/ping?id=5&secret=b9347c25aba4d3ba6e8f61d05fd1c011" - +def ping_watchdog(path, watchdog_interval=60, watchdog_url='localhost', watchdog_port=3001, **kwargs): try: - r = requests.get("http://{}:{}{}".format(url, port, path)) - except requests.exceptions.RequestException as e: # This is the correct syntax - print(e) + requests.get("http://{}:{}{}".format(watchdog_url, watchdog_port, path)) + except requests.exceptions.RequestException: + traceback.print_exc() else: print('Successful ping at ' + time.ctime()) - threading.Timer(interval, ping_watchdog).start() + threading.Timer(watchdog_interval, ping_watchdog).start() def main(): @@ -171,8 +167,8 @@ def main(): time_offset, end - start, str(score))) - except Exception as e: - print(e) + except Exception: + traceback.print_exc() # Model saving if args.save: @@ -197,8 +193,11 @@ def main(): print("Loaded model", filename) if args.watchdog: - print("\n=== Watchdog started ===") - ping_watchdog() + if "watchdog_path" in conf.keys: + print("\n=== Watchdog started ===") + ping_watchdog(conf.watchdog_path, **conf) + else: + print("Watchdog path missing") # Live predictions if args.predict: @@ -215,45 +214,45 @@ def main(): value_serializer=lambda v: json.dumps(v).encode('utf-8')) for msg in consumer: - # try: - rec = eval(msg.value) - timestamp = rec['timestamp'] - ftr_vector = rec['ftr_vector'] - measurement = ftr_vector[0] # first feature is the target measurement + try: + rec = eval(msg.value) + timestamp = rec['timestamp'] + ftr_vector = rec['ftr_vector'] + measurement = ftr_vector[0] # first feature is the target measurement + + topic = msg.topic - topic = msg.topic + # extract sensor and horizon info from topic name + horizon = int(topic.split("_")[-1][:-len(time_offset)]) + sensor = topic.split("_")[-2] - # extract sensor and horizon info from topic name - horizon = int(topic.split("_")[-1][:-len(time_offset)]) - sensor = topic.split("_")[-2] + # predictions + model = models[sensor][horizon] + predictions = model.predict([ftr_vector], timestamp) - # predictions - model = models[sensor][horizon] - predictions = model.predict([ftr_vector], timestamp) + # output record + output = {'stampm': timestamp, + 'value': predictions[0], + 'sensor_id': sensor, + 'horizon': horizon, + 'predictability': model.predictability} - # output record - output = {'stampm': timestamp, - 'value': predictions[0], - 'sensor_id': sensor, - 'horizon': horizon, - 'predictability': model.predictability} + # evaluation + output = model.evaluate(output, measurement) # appends evaluations to output - # evaluation - output = model.evaluate(output, measurement) # appends evaluations to output + # send result to kafka topic + output_topic = "predictions_{}".format(sensor) + future = producer.send(output_topic, output) - # send result to kafka topic - output_topic = "predictions_{}".format(sensor) - future = producer.send(output_topic, output) + print(output_topic + ": " + str(output)) - print(output_topic + ": " + str(output)) + try: + future.get(timeout=10) + except Exception as e: + print('Producer error: ' + str(e)) - try: - future.get(timeout=10) except Exception as e: - print('Producer error: ' + str(e)) - - # except Exception as e: - # print('Consumer error: ' + str(e)) + print('Consumer error: ' + str(e)) if __name__ == '__main__': From 47f712ddd1f7dcd3d8a6fb82f98024200dca5e85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=20=C5=A0ircelj?= Date: Wed, 13 Oct 2021 08:32:04 +0200 Subject: [PATCH 4/5] Update README.md --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ccb6aeb..1f28def 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ General Parameters: | **watchdog_interval**| integer | 60 | Delay in seconds between each Watchdog ping | | **watchdog_url**| string | `localhost` | Watchdog url. | | **watchdog_port**| integer | 3001 | Watchdog port. | + PyTorch parameters: | Name | Type | Default | Description | @@ -172,4 +173,4 @@ test_predict (__main__.TestPyTorchEvaluation) ... ok Ran 22 tests in 5.482s OK -``` \ No newline at end of file +``` From 2360a5b6e517bcacfdf3bae32b3286572662c3d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=20=C5=A0ircelj?= Date: Fri, 29 Oct 2021 12:04:01 +0200 Subject: [PATCH 5/5] Small import fix --- src/lib/predictive_model.py | 2 +- src/lib/torch_network.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib/predictive_model.py b/src/lib/predictive_model.py index 4837fe8..2bdce6d 100644 --- a/src/lib/predictive_model.py +++ b/src/lib/predictive_model.py @@ -62,7 +62,7 @@ def __init__(self, self.algorithm = algorithm if algorithm == "torch": - self.model = self.TorchNetwork(self) + self.model = TorchNetwork(self) else: self.model = eval(self.algorithm) self.sensor = sensor diff --git a/src/lib/torch_network.py b/src/lib/torch_network.py index 4c355c0..4917ecc 100644 --- a/src/lib/torch_network.py +++ b/src/lib/torch_network.py @@ -1,5 +1,6 @@ import torch import numpy as np +import sys from sklearn.experimental import enable_iterative_imputer from sklearn.impute import IterativeImputer, SimpleImputer