From 633089f5800c3b7472a2d7237daebfc2adf322d8 Mon Sep 17 00:00:00 2001 From: Dhananjoy Das Date: Fri, 1 Mar 2019 16:13:05 -0800 Subject: [PATCH 1/5] [REF-5768] component to Read file into DataFrame --- .../file_to_dataframe/__init__.py | 0 .../file_to_dataframe/component.json | 38 +++++++++++++++ .../fileConnectors/file_to_dataframe/main.py | 47 +++++++++++++++++++ 3 files changed, 85 insertions(+) create mode 100644 components/Python/fileConnectors/file_to_dataframe/__init__.py create mode 100644 components/Python/fileConnectors/file_to_dataframe/component.json create mode 100644 components/Python/fileConnectors/file_to_dataframe/main.py diff --git a/components/Python/fileConnectors/file_to_dataframe/__init__.py b/components/Python/fileConnectors/file_to_dataframe/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/components/Python/fileConnectors/file_to_dataframe/component.json b/components/Python/fileConnectors/file_to_dataframe/component.json new file mode 100644 index 0000000..bf22bdc --- /dev/null +++ b/components/Python/fileConnectors/file_to_dataframe/component.json @@ -0,0 +1,38 @@ +{ + "engineType": "Python", + "language": "Python", + "userStandalone": false, + "name": "file_to_dataframe", + "label": "Source File to DataFrame", + "program": "main.py", + "componentClass": "MCenterComponentAdapter", + "modelBehavior": "Auxiliary", + "useMLOps": true, + "inputInfo": [{ + "description": "File to read contents", + "label": "File-Name", + "defaultComponent": "", + "type": "str", + "group": "data" + }], + "outputInfo": [ + { + "description": "Pandas Dataframe", + "label": "dataframe", + "defaultComponent": "", + "type": "dataframe", + "group": "data" + } + ], + "group": "Connectors", + "arguments": [ + { + "key": "file-path", + "label": "Dataset file to read", + "type": "str", + "description": "File to use for loading DataSet into DataFrame", + "optional": true + } + ], + "version": 1 +} diff --git a/components/Python/fileConnectors/file_to_dataframe/main.py b/components/Python/fileConnectors/file_to_dataframe/main.py new file mode 100644 index 0000000..591d398 --- /dev/null +++ b/components/Python/fileConnectors/file_to_dataframe/main.py @@ -0,0 +1,47 @@ +from __future__ import print_function + +import argparse +import sys +import time +import os +import pandas + +from parallelm.components import ConnectableComponent +from parallelm.mlops.stats.multi_line_graph import MultiLineGraph +from parallelm.mlops import mlops as mlops + +class MCenterComponentAdapter(ConnectableComponent): + """ + Adapter for read_file_to_df + """ + + def __init__(self, engine): + super(self.__class__, self).__init__(engine) + + def _materialize(self, parent_data_objs, user_data): + file_path = str(parent_data_objs[0]) + if file_path is None: + file_path = self._params.get('file_path') + return [read_file_to_df(file_path)] + + +def read_file_to_df(filepath): + """ + Read file and return DataFrame + """ + mlops.init() + if not os.path.exists(filepath): + print("stderr- failed to find {}".format(filepath), file=sys.stderr) + raise Exception("file path does not exist: {}".format(filepath)) + + test_data = pandas.read_csv(filepath) + mlops.done() + return test_data + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--file-path", default='/tmp/test-data.csv', help="Dataset to read") + options = parser.parse_args() + return options + From 245dd1513044f98a5b3871e58276fbf691d6ed67 Mon Sep 17 00:00:00 2001 From: Dhananjoy Das Date: Fri, 1 Mar 2019 16:20:19 -0800 Subject: [PATCH 2/5] [REF-5768] component: Save DataFrame to file --- .../dataframe_to_file/__init__.py | 0 .../dataframe_to_file/component.json | 40 ++++++++++++++++ .../fileConnectors/dataframe_to_file/main.py | 47 +++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 components/Python/fileConnectors/dataframe_to_file/__init__.py create mode 100644 components/Python/fileConnectors/dataframe_to_file/component.json create mode 100644 components/Python/fileConnectors/dataframe_to_file/main.py diff --git a/components/Python/fileConnectors/dataframe_to_file/__init__.py b/components/Python/fileConnectors/dataframe_to_file/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/components/Python/fileConnectors/dataframe_to_file/component.json b/components/Python/fileConnectors/dataframe_to_file/component.json new file mode 100644 index 0000000..a977283 --- /dev/null +++ b/components/Python/fileConnectors/dataframe_to_file/component.json @@ -0,0 +1,40 @@ +{ + "engineType": "Python", + "language": "Python", + "userStandalone": false, + "name": "dataframe_to_file", + "label": "Sink DataFrame to File", + "program": "main.py", + "componentClass": "MCenterComponentAdapter", + "modelBehavior": "Auxiliary", + "useMLOps": true, + "inputInfo": [ + { + "description": "Pandas DataFrame", + "label": "dataframe", + "defaultComponent": "", + "type": "dataframe", + "group": "data" + } + ], + "outputInfo": [ + { + "description": "File name", + "label": "filename", + "defaultComponent": "", + "type": "str", + "group": "data" + } + ], + "group": "Sinks", + "arguments": [ + { + "key": "file-path", + "label": "save to file", + "type": "str", + "description": "Save DataFrame to file", + "optional": true + } + ], + "version": 1 +} diff --git a/components/Python/fileConnectors/dataframe_to_file/main.py b/components/Python/fileConnectors/dataframe_to_file/main.py new file mode 100644 index 0000000..b0bc22e --- /dev/null +++ b/components/Python/fileConnectors/dataframe_to_file/main.py @@ -0,0 +1,47 @@ +from __future__ import print_function + +import argparse +import sys +import time +import os +import pandas + +from parallelm.components import ConnectableComponent +from parallelm.mlops.stats.multi_line_graph import MultiLineGraph +from parallelm.mlops import mlops as mlops + +class MCenterComponentAdapter(ConnectableComponent): + """ + Adapter for df_to_file + """ + + def __init__(self, engine): + super(self.__class__, self).__init__(engine) + + def _materialize(self, parent_data_objs, user_data): + df_results = parent_data_objs[0] + results_path = self._params.get('file_path') + return [df_to_file(df_results, results_path)] + + +def df_to_file(df_predict_results, filepath): + """ + Save DataFrame to file + """ + prog_start_time = time.time() + mlops.init() + suffix_time_stamp = str(int(time.time())) + save_file = filepath + '.' + suffix_time_stamp + sfile = open(save_file, 'w+') + pandas.DataFrame(df_predict_results).to_csv(save_file) + sfile.close() + mlops.done() + return save_file + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--file-path", default='/tmp/results', help="Save DataFrame to file") + options = parser.parse_args() + return options + From b197067aef476cf177df2e6fda9cf58366a39fbd Mon Sep 17 00:00:00 2001 From: Dhananjoy Das Date: Fri, 1 Mar 2019 16:34:19 -0800 Subject: [PATCH 3/5] [REF-5768] component: S3 boto3 component to fetch file --- .../fileConnectors/s3_file_source/__init__.py | 0 .../s3_file_source/component.json | 92 +++++++++++++++++++ .../fileConnectors/s3_file_source/main.py | 56 +++++++++++ 3 files changed, 148 insertions(+) create mode 100644 components/Python/fileConnectors/s3_file_source/__init__.py create mode 100644 components/Python/fileConnectors/s3_file_source/component.json create mode 100644 components/Python/fileConnectors/s3_file_source/main.py diff --git a/components/Python/fileConnectors/s3_file_source/__init__.py b/components/Python/fileConnectors/s3_file_source/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/components/Python/fileConnectors/s3_file_source/component.json b/components/Python/fileConnectors/s3_file_source/component.json new file mode 100644 index 0000000..12650a1 --- /dev/null +++ b/components/Python/fileConnectors/s3_file_source/component.json @@ -0,0 +1,92 @@ +{ + "engineType": "Python", + "userStandalone": false, + "language": "Python", + "name": "s3_file_source", + "label": "S3 File Source", + "description": "Import a bucket/key from S3 to a file", + "program": "main.py", + "componentClass": "S3FileSource", + "group": "Connectors", + "useMLOps": true, + "inputInfo": [], + "outputInfo": [ + { + "description": "File name", + "label": "File-Name", + "defaultComponent": "", + "type": "str", + "group": "data" + } + ], + "arguments": [ + { + "key": "aws_access_key_id", + "label": "AWS Access Key", + "description": "AWS Access Key", + "type": "str", + "optional": false + }, + { + "key": "aws_secret_access_key", + "label": "AWS secret access key", + "description": "AWS secret access key", + "type": "str", + "optional": false + }, + { + "key": "region", + "label": "AWS Region", + "description": "AWS Region", + "type": "str", + "optional": false + }, + { + "key": "bucket", + "label": "S3 bucket name", + "description": "S3 bucket name", + "type": "str", + "optional": false + }, + { + "key": "key", + "label": "S3 Key", + "description": "S3 Key", + "type": "str", + "optional": false + }, + { + "key": "get_file_size", + "label": "Get File Size MB", + "description": "Report S3 file size in MB", + "type": "boolean", + "defaultValue": 1, + "optional": true + }, + { + "key": "get_fetch_time", + "label": "Get fetch time", + "description": "Report fetch latency (msec)", + "type": "boolean", + "defaultValue": 1, + "optional": true + }, + { + "key": "get_line_count", + "label": "Get fetch time", + "description": "Report line count", + "type": "boolean", + "defaultValue": 1, + "optional": true + }, + { + "key": "parent_directory", + "label": "Parent Directory", + "description": "Parent directory to use for storing the file", + "type": "str", + "defaultValue": "/tmp", + "optional": false + } + ], + "version": 1 +} diff --git a/components/Python/fileConnectors/s3_file_source/main.py b/components/Python/fileConnectors/s3_file_source/main.py new file mode 100644 index 0000000..75a6f8e --- /dev/null +++ b/components/Python/fileConnectors/s3_file_source/main.py @@ -0,0 +1,56 @@ +from __future__ import print_function + +import argparse +import os +import boto3 +import uuid +import time +from itertools import islice + +from parallelm.components import ConnectableComponent +from parallelm.ml_engine.python_engine import PythonEngine +from parallelm.mlops import mlops as mlops + +class S3FileSource(ConnectableComponent): + + def __init__(self, engine): + super(self.__class__, self).__init__(engine) + + def _materialize(self, parent_data_objs, user_data): + + file_path = self._fetch_file() + return [file_path] + + def _fetch_file(self): + file_size = line_count = 0 + + # Initialize mlops + mlops.init() + + client = boto3.client( + 's3', + aws_access_key_id=self._params["aws_access_key_id"], + aws_secret_access_key=self._params["aws_secret_access_key"], + ) + + if self._params["get_file_size"]: + resp_obj = client.head_object(Bucket=self._params["bucket"], Key=self._params["key"]) + file_size = resp_obj['ContentLength'] / (1024 * 1024) + mlops.set_stat("s3.inputFileSizeMB", file_size) + + file_path = os.path.join(self._params["parent_directory"], "s3_file_" + str(uuid.uuid4())) + + fetch_start_time = time.time() + client.download_file(self._params["bucket"], self._params["key"], file_path) + fetch_elapsed_time = time.time() - fetch_start_time + if self._params["get_fetch_time"]: + mlops.set_stat("s3.inputFetchTimemsec", fetch_elapsed_time) + + # get line-count for the file (loads file in memory) + # should help keep prediction latency NOT be IO-bound + if self._params["get_line_count"]: + line_count = len(open(file_path).readlines()) + if not self._params["get_rand_n_samples"]: + mlops.set_stat("s3.inputFileLineCount", line_count) + + return file_path From d42bfd07ced1d75c78088d61077f1953ee24b1f4 Mon Sep 17 00:00:00 2001 From: Dhananjoy Das Date: Fri, 1 Mar 2019 17:03:37 -0800 Subject: [PATCH 4/5] [REF-5768] component for basic cleanup (feature engineering) --- .../featureEng/df_label_encoding/__init__.py | 0 .../df_label_encoding/component.json | 32 +++++++++++ .../featureEng/df_label_encoding/main.py | 55 +++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 components/Python/featureEng/df_label_encoding/__init__.py create mode 100644 components/Python/featureEng/df_label_encoding/component.json create mode 100644 components/Python/featureEng/df_label_encoding/main.py diff --git a/components/Python/featureEng/df_label_encoding/__init__.py b/components/Python/featureEng/df_label_encoding/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/components/Python/featureEng/df_label_encoding/component.json b/components/Python/featureEng/df_label_encoding/component.json new file mode 100644 index 0000000..c32312f --- /dev/null +++ b/components/Python/featureEng/df_label_encoding/component.json @@ -0,0 +1,32 @@ +{ + "engineType": "Python", + "language": "Python", + "userStandalone": false, + "name": "df_label_encoding", + "label": "Feature Engineering: Label Encoding DataFrame", + "program": "main.py", + "componentClass": "MCenterComponentAdapter", + "modelBehavior": "Auxiliary", + "useMLOps": true, + "inputInfo": [ + { + "description": "Pandas Dataframe", + "label": "dataframe", + "defaultComponent": "", + "type": "dataframe", + "group": "data" + } + ], + "outputInfo": [ + { + "description": "Pandas Dataframe", + "label": "dataframe", + "defaultComponent": "", + "type": "dataframe", + "group": "data" + } + ], + "group": "FeatureEng", + "arguments": [], + "version": 1 +} diff --git a/components/Python/featureEng/df_label_encoding/main.py b/components/Python/featureEng/df_label_encoding/main.py new file mode 100644 index 0000000..79013f4 --- /dev/null +++ b/components/Python/featureEng/df_label_encoding/main.py @@ -0,0 +1,55 @@ +from __future__ import print_function + +import argparse +import sys +import time +import os +import pandas +import numpy as np + +from sklearn.exceptions import NotFittedError +from sklearn import preprocessing +from parallelm.components import ConnectableComponent +from parallelm.mlops import mlops as mlops +from parallelm.mlops.stats.bar_graph import BarGraph +from parallelm.mlops.stats.multi_line_graph import MultiLineGraph +from parallelm.mlops.predefined_stats import PredefinedStats + + +class MCenterComponentAdapter(ConnectableComponent): + """ + Adapter for the do_label_encoding + """ + + def __init__(self, engine): + super(self.__class__, self).__init__(engine) + + def _materialize(self, parent_data_objs, user_data): + df_data = parent_data_objs[0] + return[do_label_encoding(df_data)] + + +def do_label_encoding(df_data): + """ + Cleanup: (Feature Engineering) + a) simple label encoding, convert string to real values + b) remove NaN's drop rows with NaN + """ + for column in df_data.columns: + if df_data[column].dtype == type(object): + le = preprocessing.LabelEncoder() + df_data[column] = le.fit_transform(df_data[column]) + + df_data = df_data.dropna() + + # Initialize MLOps Library + mlops.init() + + # Output Health Statistics to MCenter + # MLOps API to report the distribution statistics of each feature + mlops.set_data_distribution_stat(df_data) + + # Terminate MLOPs + mlops.done() + return df_data + From a38cb1fc1a586353bbde9b771d66445823d4ce35 Mon Sep 17 00:00:00 2001 From: Dhananjoy Das Date: Fri, 1 Mar 2019 17:22:32 -0800 Subject: [PATCH 5/5] [REF-5768] component generic Scikit-learn predictor --- .../scikit_learn_predictor/__init__.py | 0 .../scikit_learn_predictor/component.json | 41 +++++++ .../scikit_learn_predictor/main.py | 105 ++++++++++++++++++ 3 files changed, 146 insertions(+) create mode 100644 components/Python/Scikit-learn/scikit_learn_predictor/__init__.py create mode 100644 components/Python/Scikit-learn/scikit_learn_predictor/component.json create mode 100644 components/Python/Scikit-learn/scikit_learn_predictor/main.py diff --git a/components/Python/Scikit-learn/scikit_learn_predictor/__init__.py b/components/Python/Scikit-learn/scikit_learn_predictor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/components/Python/Scikit-learn/scikit_learn_predictor/component.json b/components/Python/Scikit-learn/scikit_learn_predictor/component.json new file mode 100644 index 0000000..d4ad15e --- /dev/null +++ b/components/Python/Scikit-learn/scikit_learn_predictor/component.json @@ -0,0 +1,41 @@ +{ + "engineType": "Python", + "language": "Python", + "userStandalone": false, + "name": "scikit_learn_predictor", + "label": "Scikit Learn Predictor", + "program": "main.py", + "componentClass": "MCenterComponentAdapter", + "modelBehavior": "ModelConsumer", + "useMLOps": true, + "inputInfo": [ + { + "description": "Pandas Dataframe", + "label": "dataframe", + "defaultComponent": "", + "type": "dataframe", + "group": "data" + } + ], + "outputInfo": [ + { + "description": "Pandas Dataframe", + "label": "dataframe", + "defaultComponent": "", + "type": "dataframe", + "group": "data" + } + ], + "group": "Algorithms", + "arguments": [ + { + "key": "input-model", + "label": "Model input file", + "type": "str", + "description": "File to use for loading the model", + "optional": true, + "tag": "input_model_path" + } + ], + "version": 1 +} diff --git a/components/Python/Scikit-learn/scikit_learn_predictor/main.py b/components/Python/Scikit-learn/scikit_learn_predictor/main.py new file mode 100644 index 0000000..f669eab --- /dev/null +++ b/components/Python/Scikit-learn/scikit_learn_predictor/main.py @@ -0,0 +1,105 @@ +from __future__ import print_function + +import argparse +import sys +import time +import os +import pickle +import pandas as pd +import numpy as np + +from sklearn.exceptions import NotFittedError +from parallelm.components import ConnectableComponent +from parallelm.mlops import mlops as mlops +from parallelm.mlops.stats.bar_graph import BarGraph +from parallelm.mlops.stats.multi_line_graph import MultiLineGraph +from parallelm.mlops.predefined_stats import PredefinedStats + + +class MCenterComponentAdapter(ConnectableComponent): + """ + Adapter for the do_predict + """ + def __init__(self, engine): + super(self.__class__, self).__init__(engine) + + def _materialize(self, parent_data_objs, user_data): + df_infer_data = parent_data_objs[0] + input_model = self._params.get("input-model") + return[do_predict(df_infer_data, input_model)] + + +def do_predict(df_infer, input_model): + """ + Perform predictions: + a) load scikit-learn model + b) on dataset run predict, collect stats assocaited to predictions + c) obtain class probability stats + d) report stats using mlops APIs + """ + prog_start_time = time.time() + mlops.init() + model = pickle.load(open(input_model, "rb")) + data_features = df_infer.values + + # Start timer (inference) + inference_start_time = time.time() + # Predict labels + predict_results = model.predict(df_infer) + # End timer (inference) + inference_elapsed_time = time.time() - inference_start_time + + # Predict probability + class_probability = model.predict_proba(df_infer) + maximum_prob = np.max(class_probability, axis=1) + # Tag samples that are below a certain probability and write to a file + confidence = 0.7 + + low_prob_predictions = predict_results[np.where(maximum_prob < confidence)] + unique_elements_low, counts_elements_low = np.unique(low_prob_predictions, return_counts=True) + unique_elements_low = [str(i) for i in unique_elements_low] + # self._logger.info("Low confidence predictions: \n {0} \n with frequency {1}".format(unique_elements_low, counts_elements_low)) + + # ########## Start of MCenter instrumentation ############## + # # BarGraph showing distribution of low confidence labels + bar = BarGraph().name("Low confidence label distribution").cols(unique_elements_low).data(counts_elements_low.tolist()) + # self._logger.info("Low bar : ", type(bar), "->", bar) + mlops.set_stat(bar) + + # ########## End of MCenter instrumentation ################ + # + # # Samples with high probability + high_prob_predictions = predict_results[np.where(maximum_prob > confidence)] + unique_elements_high, counts_elements_high = np.unique(high_prob_predictions, return_counts=True) + unique_elements_high = [str(i) for i in unique_elements_high] + # self._logger.info("High confidence predictions: \n {0} \n with frequency {1}".format(unique_elements_high, + # counts_elements_low)) + + # ########## Start of MCenter instrumentation ############## + # # BarGraph showing distribution of high confidence labels + bar = BarGraph().name("High confidence label distribution").cols(unique_elements_high).data(counts_elements_high.tolist()) + # self._logger.info("High bar : ", type(bar), "->", bar) + mlops.set_stat(bar) + ########## End of MCenter instrumentation ################ + + ######## Report PM stats ########### + mlops.set_stat(PredefinedStats.PREDICTIONS_COUNT, len(data_features)) + prog_elapsed_time = time.time() - prog_start_time + mlt_time = MultiLineGraph().name("Time Elapsed").labels(["Program Time", "Inference Time"]) + mlt_time.data([prog_elapsed_time, inference_elapsed_time]) + mlops.set_stat(mlt_time) + ### End of PM stats reporting ##### + + mlops.done() + + # return predict_results + return class_probability + + +def parse_args(): + + parser = argparse.ArgumentParser() + parser.add_argument("--input-model", help="Path to load model from") + options = parser.parse_args() + return options +