From 8f5992818c892ebcc9227ae1cfc643e23f37eb63 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 21:59:34 -0800 Subject: [PATCH 01/16] add batch+percentiles to treelite python --- .../inferencing/treelite_python/score.py | 92 ++++++++++++++++--- .../inferencing/treelite_python/spec.yaml | 8 +- 2 files changed, 84 insertions(+), 16 deletions(-) diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index 26fef4bc..75e12286 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -8,9 +8,11 @@ import sys import argparse import logging -import numpy +import numpy as np +import matplotlib.pyplot as plt +import time from distutils.util import strtobool -import pandas as pd +import csv import treelite, treelite_runtime # Add the right path to PYTHONPATH @@ -61,6 +63,8 @@ def get_arg_parser(cls, parser=None): group_params = parser.add_argument_group("Scoring parameters") group_params.add_argument("--num_threads", required=False, default=1, type=int, help="number of threads") + group_params.add_argument("--batch_size", + required=False, default=1, type=int, help="size of batches for predict call") return parser @@ -87,20 +91,80 @@ def run(self, args, logger, metrics_logger, unknown_args): args.output = os.path.join(args.output, "predictions.txt") - logger.info(f"Loading data for inferencing") - with metrics_logger.log_time_block("time_data_loading"): - my_data = pd.read_csv(args.data).to_numpy() - - predictor = treelite_runtime.Predictor( - args.so_path, - verbose=True, - nthread=args.num_threads + def batch_iterate(csv_data_file, batch_size): + with open(csv_data_file, "r") as i_file: + reader = csv.reader(i_file) + + batch = [] + for row in reader: + batch.append(row) + if len(batch) >= batch_size: + yield batch + batch = [] + if len(batch) >= batch_size: + yield batch + + # loading model + predictor = treelite_runtime.Predictor( + args.so_path, + verbose=True, + nthread=args.num_threads + ) + + # accumulate predictions and latencies + predictions = [] + time_inferencing_per_batch = [] + batch_lengths = [] + + # loop through batches + for batch in batch_iterate(args.data, batch_size=args.batch_size): + if len(batch) == 0: + break + batch_lengths.append(len(batch)) + + # transform into dense matrix + batch_data = np.array(batch) + batch_dmat = treelite_runtime.DMatrix(batch_data) + + # run prediction on batch + batch_start_time = time.time() + predictions.extend(predictor.predict(batch_dmat)) + time_inferencing_per_batch.append((time.time() - batch_start_time) * 1000000) # usecs + + if len(time_inferencing_per_batch) > 1: + batch_run_times = np.array(time_inferencing_per_batch) / np.array(batch_lengths) + metrics_logger.log_metric("batch_time_inferencing_p50_usecs", np.percentile(batch_run_times, 50)) + metrics_logger.log_metric("batch_time_inferencing_p75_usecs", np.percentile(batch_run_times, 75)) + metrics_logger.log_metric("batch_time_inferencing_p90_usecs", np.percentile(batch_run_times, 90)) + metrics_logger.log_metric("batch_time_inferencing_p95_usecs", np.percentile(batch_run_times, 95)) + metrics_logger.log_metric("batch_time_inferencing_p99_usecs", np.percentile(batch_run_times, 99)) + + # show the distribution prediction latencies + fig, ax = plt.subplots(1) + ax.hist(batch_run_times, bins=100) + ax.set_title("Latency-per-query histogram (log scale)") + plt.xlabel("usecs") + plt.ylabel("occurence") + plt.yscale('log') + + # record in mlflow + metrics_logger.log_figure(fig, "latency_log_histogram.png") + + if args.output: + np.savetxt( + args.output, + predictions, + fmt='%f', + delimiter=',', + newline='\n', + header='', + footer='', + comments='# ', + encoding=None ) - dmat = treelite_runtime.DMatrix(my_data) - logger.info(f"Running .predict()") - with metrics_logger.log_time_block("time_inferencing"): - predictor.predict(dmat) + + def get_arg_parser(parser=None): diff --git a/src/scripts/inferencing/treelite_python/spec.yaml b/src/scripts/inferencing/treelite_python/spec.yaml index 5e9b18f6..0f8ccf71 100644 --- a/src/scripts/inferencing/treelite_python/spec.yaml +++ b/src/scripts/inferencing/treelite_python/spec.yaml @@ -17,7 +17,10 @@ inputs: optional: false n_threads: type: Integer - optional: true + default: 1 + batch_size: + type: Integer + default: 1 verbose: type: Boolean default: False @@ -31,7 +34,8 @@ command: >- python score.py --data {inputs.data} --so_path {inputs.compiled_model} - [--num_threads {inputs.n_threads}] + --num_threads {inputs.n_threads} + --batch_size {inputs.batch_size} --verbose {inputs.verbose} [--custom_properties {inputs.custom_properties}] From 3a7bdcdeab9195fd1cf89247c9879f15cb7f2c9b Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 22:44:27 -0800 Subject: [PATCH 02/16] fix batch process --- .../inferencing/treelite_python/score.py | 33 +++++++++++-------- .../inferencing/treelite_python/spec.yaml | 2 +- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index 75e12286..f7529d49 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -97,11 +97,14 @@ def batch_iterate(csv_data_file, batch_size): batch = [] for row in reader: - batch.append(row) + cast_row = [ + float(col) for col in row + ] + batch.append(cast_row) if len(batch) >= batch_size: yield batch batch = [] - if len(batch) >= batch_size: + if len(batch) >= 0: yield batch # loading model @@ -124,15 +127,19 @@ def batch_iterate(csv_data_file, batch_size): # transform into dense matrix batch_data = np.array(batch) + batch_dmat = treelite_runtime.DMatrix(batch_data) # run prediction on batch - batch_start_time = time.time() + batch_start_time = time.monotonic() predictions.extend(predictor.predict(batch_dmat)) - time_inferencing_per_batch.append((time.time() - batch_start_time) * 1000000) # usecs + print(time.monotonic(), batch_start_time) + time_inferencing_per_batch.append((time.monotonic() - batch_start_time) * 1000000) # usecs + # compute metrics if len(time_inferencing_per_batch) > 1: batch_run_times = np.array(time_inferencing_per_batch) / np.array(batch_lengths) + print(time_inferencing_per_batch) metrics_logger.log_metric("batch_time_inferencing_p50_usecs", np.percentile(batch_run_times, 50)) metrics_logger.log_metric("batch_time_inferencing_p75_usecs", np.percentile(batch_run_times, 75)) metrics_logger.log_metric("batch_time_inferencing_p90_usecs", np.percentile(batch_run_times, 90)) @@ -140,15 +147,15 @@ def batch_iterate(csv_data_file, batch_size): metrics_logger.log_metric("batch_time_inferencing_p99_usecs", np.percentile(batch_run_times, 99)) # show the distribution prediction latencies - fig, ax = plt.subplots(1) - ax.hist(batch_run_times, bins=100) - ax.set_title("Latency-per-query histogram (log scale)") - plt.xlabel("usecs") - plt.ylabel("occurence") - plt.yscale('log') - - # record in mlflow - metrics_logger.log_figure(fig, "latency_log_histogram.png") + # fig, ax = plt.subplots(1) + # ax.hist(batch_run_times, bins=100) + # ax.set_title("Latency-per-query histogram (log scale)") + # plt.xlabel("usecs") + # plt.ylabel("occurence") + # plt.yscale('log') + + # # record in mlflow + # metrics_logger.log_figure(fig, "latency_log_histogram.png") if args.output: np.savetxt( diff --git a/src/scripts/inferencing/treelite_python/spec.yaml b/src/scripts/inferencing/treelite_python/spec.yaml index 0f8ccf71..d461fb95 100644 --- a/src/scripts/inferencing/treelite_python/spec.yaml +++ b/src/scripts/inferencing/treelite_python/spec.yaml @@ -20,7 +20,7 @@ inputs: default: 1 batch_size: type: Integer - default: 1 + default: 2 verbose: type: Boolean default: False From 13d53fec3c06b5d424b424ab306a536c985171e1 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 23:11:28 -0800 Subject: [PATCH 03/16] add sum --- .../inferencing/treelite_python/score.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index f7529d49..194d6b12 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -134,12 +134,12 @@ def batch_iterate(csv_data_file, batch_size): batch_start_time = time.monotonic() predictions.extend(predictor.predict(batch_dmat)) print(time.monotonic(), batch_start_time) - time_inferencing_per_batch.append((time.monotonic() - batch_start_time) * 1000000) # usecs + time_inferencing_per_batch.append((time.monotonic() - batch_start_time)) # usecs # compute metrics - if len(time_inferencing_per_batch) > 1: - batch_run_times = np.array(time_inferencing_per_batch) / np.array(batch_lengths) - print(time_inferencing_per_batch) + if len(time_inferencing_per_batch) > 0: + batch_run_times = np.array(time_inferencing_per_batch) * 1000000 / np.array(batch_lengths) + metrics_logger.log_metric("time_inferencing", np.sum(batch_run_times)) metrics_logger.log_metric("batch_time_inferencing_p50_usecs", np.percentile(batch_run_times, 50)) metrics_logger.log_metric("batch_time_inferencing_p75_usecs", np.percentile(batch_run_times, 75)) metrics_logger.log_metric("batch_time_inferencing_p90_usecs", np.percentile(batch_run_times, 90)) @@ -147,15 +147,15 @@ def batch_iterate(csv_data_file, batch_size): metrics_logger.log_metric("batch_time_inferencing_p99_usecs", np.percentile(batch_run_times, 99)) # show the distribution prediction latencies - # fig, ax = plt.subplots(1) - # ax.hist(batch_run_times, bins=100) - # ax.set_title("Latency-per-query histogram (log scale)") - # plt.xlabel("usecs") - # plt.ylabel("occurence") - # plt.yscale('log') - - # # record in mlflow - # metrics_logger.log_figure(fig, "latency_log_histogram.png") + fig, ax = plt.subplots(1) + ax.hist(batch_run_times, bins=100) + ax.set_title("Latency-per-query histogram (log scale)") + plt.xlabel("usecs") + plt.ylabel("occurence") + plt.yscale('log') + + # record in mlflow + metrics_logger.log_figure(fig, "latency_log_histogram.png") if args.output: np.savetxt( From 44cca17b2d57ee6fb213c503f900907a53b00368 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Tue, 23 Nov 2021 23:14:05 -0800 Subject: [PATCH 04/16] fix sum --- src/scripts/inferencing/treelite_python/score.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index 194d6b12..36be840b 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -139,7 +139,7 @@ def batch_iterate(csv_data_file, batch_size): # compute metrics if len(time_inferencing_per_batch) > 0: batch_run_times = np.array(time_inferencing_per_batch) * 1000000 / np.array(batch_lengths) - metrics_logger.log_metric("time_inferencing", np.sum(batch_run_times)) + metrics_logger.log_metric("time_inferencing", sum(time_inferencing_per_batch)) metrics_logger.log_metric("batch_time_inferencing_p50_usecs", np.percentile(batch_run_times, 50)) metrics_logger.log_metric("batch_time_inferencing_p75_usecs", np.percentile(batch_run_times, 75)) metrics_logger.log_metric("batch_time_inferencing_p90_usecs", np.percentile(batch_run_times, 90)) From cd49a139c8146e38a39f8026821129d40658699b Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 09:07:02 -0800 Subject: [PATCH 05/16] add matplotlib in deps --- src/scripts/inferencing/treelite_python/conda_env.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/scripts/inferencing/treelite_python/conda_env.yaml b/src/scripts/inferencing/treelite_python/conda_env.yaml index f61c752b..3ffb9ba8 100644 --- a/src/scripts/inferencing/treelite_python/conda_env.yaml +++ b/src/scripts/inferencing/treelite_python/conda_env.yaml @@ -11,3 +11,4 @@ dependencies: - treelite_runtime==2.1.0 - pandas>=1.1,<1.2 - numpy>=1.10,<1.20 + - matplotlib==3.4.3 From d898a85b63a0e05d870a49a652fe7ec6955fc2d8 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:21:54 -0800 Subject: [PATCH 06/16] implement common code for the routines --- src/common/io.py | 41 ++++++++++ src/common/metrics.py | 66 ++++++++++++++++ .../inferencing/lightgbm_c_api/score.py | 25 ++----- .../inferencing/lightgbm_python/score.py | 29 ++++--- .../inferencing/treelite_python/score.py | 75 +++++-------------- .../inferencing/treelite_python/spec.yaml | 2 +- 6 files changed, 153 insertions(+), 85 deletions(-) diff --git a/src/common/io.py b/src/common/io.py index 84898249..8378a6d6 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -8,6 +8,7 @@ import os import argparse import logging +import csv def input_file_path(path): """ Argparse type to resolve input path as single file from directory. @@ -225,3 +226,43 @@ def run(self, input_path, output_path): self.split_by_append(input_files, output_path, self.number) else: raise NotImplementedError(f"Mode {self.mode} not implemented.") + + +class CustomLightGBMDataBatchIterator(): + def __init__(self, file_path, batch_size=0, file_format="csv", **kwargs): + self.file_path = file_path + self.batch_size = batch_size + self.file_format = file_format + self.reader_options = kwargs + + def __iter__(self): + return self + + def __next__(self): + if self.file_format == "csv": + with open(self.file_path, "r") as i_file: + reader = csv.reader(i_file, **self.reader_options) + + batch = [] + if self.batch_size == 0: + # use the entire file as a batch + batch = [ + [ + float(col) for col in row # convert all values to float for lightgbm + ] for row in reader + ] + elif self.batch_size > 1: + # create batches + for row in reader: + batch.append( + [ float(col) for col in row ] # convert all values to float for lightgbm + ) + if len(batch) >= self.batch_size: + yield batch + batch = [] # reset batch + else: + raise ValueError("batch_size must be >= 0") + + # any remaining batch, or whole file + if len(batch) >= 0: + yield batch diff --git a/src/common/metrics.py b/src/common/metrics.py index 89cca615..3276a7ce 100644 --- a/src/common/metrics.py +++ b/src/common/metrics.py @@ -160,6 +160,72 @@ def log_time_block(self, metric_name): # see class below with proper __enter__ and __exit__ return LogTimeBlock(metric_name) + def log_inferencing_latencies(self, time_per_batch, batch_length=1, factor_to_usecs=1000000.0): + """Logs prediction latencies (for inferencing) with lots of fancy metrics and plots. + + Args: + time_per_batch_list (List[float]): time per inferencing batch + batch_lengths (Union[List[int],int]): length of each batch (List or constant) + factor_to_usecs (float): factor to apply to time_per_batch to convert to microseconds + """ + if isinstance(batch_length, list): + sum_batch_lengths = sum(batch_length) + else: + sum_batch_lengths = batch_length*len(time_per_batch) + + # log metadata + self.log_metric("prediction_batches", len(time_per_batch)) + self.log_metric("prediction_queries", sum_batch_lengths) + + if len(time_per_batch) > 0: + self.log_metric("prediction_latency_avg", (sum(time_per_batch) * factor_to_usecs)/sum_batch_lengths) # usecs + + # if there's more than 1 batch, compute percentiles + if len(time_per_batch) > 1: + import numpy as np + import matplotlib.pyplot as plt + + # latency per batch + batch_run_times = np.array(time_per_batch) * factor_to_usecs + self.log_metric("batch_latency_p50_usecs", np.percentile(batch_run_times, 50)) + self.log_metric("batch_latency_p75_usecs", np.percentile(batch_run_times, 75)) + self.log_metric("batch_latency_p90_usecs", np.percentile(batch_run_times, 90)) + self.log_metric("batch_latency_p95_usecs", np.percentile(batch_run_times, 95)) + self.log_metric("batch_latency_p99_usecs", np.percentile(batch_run_times, 99)) + + # show the distribution prediction latencies + fig, ax = plt.subplots(1) + ax.hist(batch_run_times, bins=100) + ax.set_title("Latency-per-batch histogram (log scale)") + plt.xlabel("usecs") + plt.ylabel("occurence") + plt.yscale('log') + + # record in mlflow + self.log_figure(fig, "batch_latency_log_histogram.png") + + # latency per query + if isinstance(batch_length, list): + prediction_latencies = np.array(time_per_batch) * factor_to_usecs / np.array(batch_length) + else: + prediction_latencies = np.array(time_per_batch) * factor_to_usecs / batch_length + + self.log_metric("prediction_latency_p50_usecs", np.percentile(prediction_latencies, 50)) + self.log_metric("prediction_latency_p75_usecs", np.percentile(prediction_latencies, 75)) + self.log_metric("prediction_latency_p90_usecs", np.percentile(prediction_latencies, 90)) + self.log_metric("prediction_latency_p95_usecs", np.percentile(prediction_latencies, 95)) + self.log_metric("prediction_latency_p99_usecs", np.percentile(prediction_latencies, 99)) + + # show the distribution prediction latencies + fig, ax = plt.subplots(1) + ax.hist(prediction_latencies, bins=100) + ax.set_title("Latency-per-prediction histogram (log scale)") + plt.xlabel("usecs") + plt.ylabel("occurence") + plt.yscale('log') + + # record in mlflow + self.log_figure(fig, "prediction_latency_log_histogram.png") ######################## diff --git a/src/scripts/inferencing/lightgbm_c_api/score.py b/src/scripts/inferencing/lightgbm_c_api/score.py index 1353daa6..32a6ecb7 100644 --- a/src/scripts/inferencing/lightgbm_c_api/score.py +++ b/src/scripts/inferencing/lightgbm_c_api/score.py @@ -210,25 +210,12 @@ def run(self, args, logger, metrics_logger, unknown_args): else: logger.warning(f"log metric {line} does not match expected pattern {row_pattern}") - - if len(time_inferencing_per_query) > 1: - batch_run_times = np.array(time_inferencing_per_query) - metrics_logger.log_metric("batch_time_inferencing_p50_usecs", np.percentile(batch_run_times, 50)) - metrics_logger.log_metric("batch_time_inferencing_p75_usecs", np.percentile(batch_run_times, 75)) - metrics_logger.log_metric("batch_time_inferencing_p90_usecs", np.percentile(batch_run_times, 90)) - metrics_logger.log_metric("batch_time_inferencing_p95_usecs", np.percentile(batch_run_times, 95)) - metrics_logger.log_metric("batch_time_inferencing_p99_usecs", np.percentile(batch_run_times, 99)) - - # show the distribution prediction latencies - fig, ax = plt.subplots(1) - ax.hist(batch_run_times, bins=100) - ax.set_title("Latency-per-query histogram (log scale)") - plt.xlabel("usecs") - plt.ylabel("occurence") - plt.yscale('log') - - # record in mlflow - metrics_logger.log_figure(fig, "latency_log_histogram.png") + # use helper to log latency with the right metric names + metrics_logger.log_inferencing_latencies( + time_inferencing_per_query, + batch_length=1, # in this exec, each row is just 1 prediction call + factor_to_usecs=1.0 # values are already in usecs + ) if args.output: np.savetxt( diff --git a/src/scripts/inferencing/lightgbm_python/score.py b/src/scripts/inferencing/lightgbm_python/score.py index bc69dbab..2f1d20b7 100644 --- a/src/scripts/inferencing/lightgbm_python/score.py +++ b/src/scripts/inferencing/lightgbm_python/score.py @@ -8,9 +8,10 @@ import sys import argparse import logging +import time +import numpy as np from distutils.util import strtobool import lightgbm -import numpy # Add the right path to PYTHONPATH # so that you can import from common.* @@ -104,15 +105,24 @@ def run(self, args, logger, metrics_logger, unknown_args): ) logger.info(f"Running .predict()") - with metrics_logger.log_time_block("time_inferencing"): - predictions_array = booster.predict( - data=inference_raw_data, - num_threads=args.num_threads, - predict_disable_shape_check=bool(args.predict_disable_shape_check) - ) - + batch_start_time = time.monotonic() + predictions_array = booster.predict( + data=inference_raw_data, + num_threads=args.num_threads, + predict_disable_shape_check=bool(args.predict_disable_shape_check) + ) + prediction_time = (time.monotonic() - batch_start_time) + metrics_logger.log_metric("time_inferencing", prediction_time) + + # use helper to log latency with the right metric names + metrics_logger.log_inferencing_latencies( + [prediction_time], # only one big batch + batch_length=inference_data.num_data(), + factor_to_usecs=1000000.0 # values are in seconds + ) + if args.output: - numpy.savetxt( + np.savetxt( args.output, predictions_array, fmt='%f', @@ -135,4 +145,3 @@ def main(cli_args=None): if __name__ == "__main__": main() - diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index 36be840b..e95fb8a9 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -8,11 +8,9 @@ import sys import argparse import logging -import numpy as np -import matplotlib.pyplot as plt import time +import numpy as np from distutils.util import strtobool -import csv import treelite, treelite_runtime # Add the right path to PYTHONPATH @@ -20,20 +18,20 @@ COMMON_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")) if COMMON_ROOT not in sys.path: - print(f"Adding {COMMON_ROOT} to PYTHONPATH") + logging.info(f"Adding {COMMON_ROOT} to PYTHONPATH") sys.path.append(str(COMMON_ROOT)) # useful imports from common from common.components import RunnableScript -from common.io import input_file_path +from common.io import input_file_path, CustomLightGBMDataBatchIterator class TreeLightInferencingScript(RunnableScript): def __init__(self): super().__init__( - task = 'score', + task = "score", framework = 'treelite_python', - framework_version = treelite.__version__ + framework_version = "PYTHON_API."+str(treelite.__version__) ) @classmethod @@ -64,7 +62,7 @@ def get_arg_parser(cls, parser=None): group_params.add_argument("--num_threads", required=False, default=1, type=int, help="number of threads") group_params.add_argument("--batch_size", - required=False, default=1, type=int, help="size of batches for predict call") + required=False, default=0, type=int, help="size of batches for predict call") return parser @@ -83,31 +81,14 @@ def run(self, args, logger, metrics_logger, unknown_args): num_threads=args.num_threads ) + # make sure the output argument exists if args.output: - # make sure the output argument exists os.makedirs(args.output, exist_ok=True) # and create your own file inside the output args.output = os.path.join(args.output, "predictions.txt") - - def batch_iterate(csv_data_file, batch_size): - with open(csv_data_file, "r") as i_file: - reader = csv.reader(i_file) - - batch = [] - for row in reader: - cast_row = [ - float(col) for col in row - ] - batch.append(cast_row) - if len(batch) >= batch_size: - yield batch - batch = [] - if len(batch) >= 0: - yield batch - - # loading model + logger.info(f"Loading model from {args.model}") predictor = treelite_runtime.Predictor( args.so_path, verbose=True, @@ -120,42 +101,29 @@ def batch_iterate(csv_data_file, batch_size): batch_lengths = [] # loop through batches - for batch in batch_iterate(args.data, batch_size=args.batch_size): + for batch in CustomLightGBMDataBatchIterator(args.data, batch_size=args.batch_size, file_format="csv"): if len(batch) == 0: break batch_lengths.append(len(batch)) - # transform into dense matrix + # transform into dense matrix for treelite batch_data = np.array(batch) - batch_dmat = treelite_runtime.DMatrix(batch_data) # run prediction on batch batch_start_time = time.monotonic() predictions.extend(predictor.predict(batch_dmat)) - print(time.monotonic(), batch_start_time) time_inferencing_per_batch.append((time.monotonic() - batch_start_time)) # usecs - - # compute metrics - if len(time_inferencing_per_batch) > 0: - batch_run_times = np.array(time_inferencing_per_batch) * 1000000 / np.array(batch_lengths) - metrics_logger.log_metric("time_inferencing", sum(time_inferencing_per_batch)) - metrics_logger.log_metric("batch_time_inferencing_p50_usecs", np.percentile(batch_run_times, 50)) - metrics_logger.log_metric("batch_time_inferencing_p75_usecs", np.percentile(batch_run_times, 75)) - metrics_logger.log_metric("batch_time_inferencing_p90_usecs", np.percentile(batch_run_times, 90)) - metrics_logger.log_metric("batch_time_inferencing_p95_usecs", np.percentile(batch_run_times, 95)) - metrics_logger.log_metric("batch_time_inferencing_p99_usecs", np.percentile(batch_run_times, 99)) - - # show the distribution prediction latencies - fig, ax = plt.subplots(1) - ax.hist(batch_run_times, bins=100) - ax.set_title("Latency-per-query histogram (log scale)") - plt.xlabel("usecs") - plt.ylabel("occurence") - plt.yscale('log') - - # record in mlflow - metrics_logger.log_figure(fig, "latency_log_histogram.png") + + # log overall time + metrics_logger.log_metrics("time_inferencing", sum(time_inferencing_per_batch)) + + # use helper to log latency with the right metric names + metrics_logger.log_inferencing_latencies( + time_inferencing_per_batch, + batch_length=batch_lengths, + factor_to_usecs=1000000.0 # values are in seconds + ) if args.output: np.savetxt( @@ -171,9 +139,6 @@ def batch_iterate(csv_data_file, batch_size): ) - - - def get_arg_parser(parser=None): """ To ensure compatibility with shrike unit tests """ return TreeLightInferencingScript.get_arg_parser(parser) diff --git a/src/scripts/inferencing/treelite_python/spec.yaml b/src/scripts/inferencing/treelite_python/spec.yaml index d461fb95..24ec53c5 100644 --- a/src/scripts/inferencing/treelite_python/spec.yaml +++ b/src/scripts/inferencing/treelite_python/spec.yaml @@ -20,7 +20,7 @@ inputs: default: 1 batch_size: type: Integer - default: 2 + default: 0 # default: whole file as a batch verbose: type: Boolean default: False From af501b5edb117ab23126336c86fdefb63986922c Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:23:47 -0800 Subject: [PATCH 07/16] fix typo --- src/scripts/inferencing/treelite_python/score.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index e95fb8a9..0c6bc52f 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -88,7 +88,7 @@ def run(self, args, logger, metrics_logger, unknown_args): # and create your own file inside the output args.output = os.path.join(args.output, "predictions.txt") - logger.info(f"Loading model from {args.model}") + logger.info(f"Loading model from {args.so_path}") predictor = treelite_runtime.Predictor( args.so_path, verbose=True, From a789ceea86aeac4928d36700f567569a635285f4 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:30:57 -0800 Subject: [PATCH 08/16] fix iterable --- src/common/io.py | 7 +++---- src/scripts/inferencing/treelite_python/score.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/common/io.py b/src/common/io.py index 8378a6d6..5f1409a4 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -235,10 +235,7 @@ def __init__(self, file_path, batch_size=0, file_format="csv", **kwargs): self.file_format = file_format self.reader_options = kwargs - def __iter__(self): - return self - - def __next__(self): + def iter(self): if self.file_format == "csv": with open(self.file_path, "r") as i_file: reader = csv.reader(i_file, **self.reader_options) @@ -266,3 +263,5 @@ def __next__(self): # any remaining batch, or whole file if len(batch) >= 0: yield batch + elif: + raise NotImplementedError("file_format={self.file_format} is not implemented yet.") diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index 0c6bc52f..51f0d2bd 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -101,7 +101,7 @@ def run(self, args, logger, metrics_logger, unknown_args): batch_lengths = [] # loop through batches - for batch in CustomLightGBMDataBatchIterator(args.data, batch_size=args.batch_size, file_format="csv"): + for batch in CustomLightGBMDataBatchIterator(args.data, batch_size=args.batch_size, file_format="csv").iter(): if len(batch) == 0: break batch_lengths.append(len(batch)) From 8c47266befffeccb062c6dab2c43bb8013b1aa51 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:42:09 -0800 Subject: [PATCH 09/16] update specs with num_threads and batch_size --- src/common/tasks.py | 5 +++++ src/scripts/inferencing/lightgbm_c_api/spec.yaml | 4 ++-- src/scripts/inferencing/lightgbm_cli/spec.yaml | 6 +++--- src/scripts/inferencing/lightgbm_python/spec.yaml | 6 +++--- src/scripts/inferencing/treelite_python/spec.yaml | 6 +++--- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/common/tasks.py b/src/common/tasks.py index 616f1172..d8b5d39b 100644 --- a/src/common/tasks.py +++ b/src/common/tasks.py @@ -24,10 +24,15 @@ class inferencing_task: @dataclass class inferencing_variants: + # framework framework: str = MISSING build: Optional[str] = None os: str = "Linux" # linux or windows, linux by default + # parameters + batch_size: int = 0 # use whole file as batch + num_threads: int = 1 # use only one thread + @dataclass class data_generation_task: task: str = MISSING diff --git a/src/scripts/inferencing/lightgbm_c_api/spec.yaml b/src/scripts/inferencing/lightgbm_c_api/spec.yaml index 59d9bfad..fd664142 100644 --- a/src/scripts/inferencing/lightgbm_c_api/spec.yaml +++ b/src/scripts/inferencing/lightgbm_c_api/spec.yaml @@ -18,7 +18,7 @@ inputs: type: Boolean description: "control whether or not LightGBM raises an error when you try to predict on data with a different number of features than the training data" default: False - n_threads: + num_threads: type: Integer default: 1 verbose: @@ -37,7 +37,7 @@ command: >- python score.py --data {inputs.data} --model {inputs.model} - --num_threads {inputs.n_threads} + --num_threads {inputs.num_threads} --output {outputs.predictions} --predict_disable_shape_check {inputs.predict_disable_shape_check} --verbose {inputs.verbose} diff --git a/src/scripts/inferencing/lightgbm_cli/spec.yaml b/src/scripts/inferencing/lightgbm_cli/spec.yaml index 9b51d5ab..99c82466 100644 --- a/src/scripts/inferencing/lightgbm_cli/spec.yaml +++ b/src/scripts/inferencing/lightgbm_cli/spec.yaml @@ -18,9 +18,9 @@ inputs: type: Boolean description: "control whether or not LightGBM raises an error when you try to predict on data with a different number of features than the training data" optional: true - n_threads: + num_threads: type: Integer - optional: true + default: 1 lightgbm_exec_path: type: String optional: true @@ -37,7 +37,7 @@ command: >- python score.py --data {inputs.data} --model {inputs.model} - [--num_threads {inputs.n_threads}] + --num_threads {inputs.n_threads} [--lightgbm_exec_path {inputs.lightgbm_exec_path}] [--predict_disable_shape_check {inputs.predict_disable_shape_check}] [--verbose {inputs.verbose}] diff --git a/src/scripts/inferencing/lightgbm_python/spec.yaml b/src/scripts/inferencing/lightgbm_python/spec.yaml index 7fcbebca..cf40e260 100644 --- a/src/scripts/inferencing/lightgbm_python/spec.yaml +++ b/src/scripts/inferencing/lightgbm_python/spec.yaml @@ -18,9 +18,9 @@ inputs: type: Boolean description: "control whether or not LightGBM raises an error when you try to predict on data with a different number of features than the training data" default: False - n_threads: + num_threads: type: Integer - optional: true + default: 1 verbose: type: Boolean default: False @@ -38,7 +38,7 @@ command: >- --data {inputs.data} --model {inputs.model} --output {outputs.predictions} - [--num_threads {inputs.n_threads}] + --num_threads {inputs.num_threads} --predict_disable_shape_check {inputs.predict_disable_shape_check} --verbose {inputs.verbose} [--custom_properties {inputs.custom_properties}] diff --git a/src/scripts/inferencing/treelite_python/spec.yaml b/src/scripts/inferencing/treelite_python/spec.yaml index 24ec53c5..d1e7a804 100644 --- a/src/scripts/inferencing/treelite_python/spec.yaml +++ b/src/scripts/inferencing/treelite_python/spec.yaml @@ -15,12 +15,12 @@ inputs: type: AnyDirectory description: directory to the model optional: false - n_threads: + num_threads: type: Integer default: 1 batch_size: type: Integer - default: 0 # default: whole file as a batch + default: 0 # default: use whole file as a batch verbose: type: Boolean default: False @@ -34,7 +34,7 @@ command: >- python score.py --data {inputs.data} --so_path {inputs.compiled_model} - --num_threads {inputs.n_threads} + --num_threads {inputs.num_threads} --batch_size {inputs.batch_size} --verbose {inputs.verbose} [--custom_properties {inputs.custom_properties}] From 0fb495d9e7d8e2db37d321625b0c1974480ad0fb Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:43:59 -0800 Subject: [PATCH 10/16] add params in pipeline --- pipelines/azureml/pipelines/lightgbm_inferencing.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pipelines/azureml/pipelines/lightgbm_inferencing.py b/pipelines/azureml/pipelines/lightgbm_inferencing.py index dca94266..e4091d57 100644 --- a/pipelines/azureml/pipelines/lightgbm_inferencing.py +++ b/pipelines/azureml/pipelines/lightgbm_inferencing.py @@ -130,6 +130,8 @@ def lightgbm_inferencing_pipeline_function(benchmark_custom_properties, data, mo inferencing_step = treelite_score_module( data = data, compiled_model = treelite_compile_step.outputs.compiled_model, + num_threads = variant.num_threads, + batch_size = variant.batch_size, verbose = False, custom_properties = custom_properties ) @@ -140,6 +142,8 @@ def lightgbm_inferencing_pipeline_function(benchmark_custom_properties, data, mo inferencing_step = lightgbm_c_api_score_module( data = data, model = model, + num_threads = variant.num_threads, + # batch_size = variant.batch_size, # not supported yet predict_disable_shape_check = predict_disable_shape_check, verbose = False, custom_properties = custom_properties @@ -151,6 +155,8 @@ def lightgbm_inferencing_pipeline_function(benchmark_custom_properties, data, mo inferencing_step = lightgbm_cli_score_module( data = data, model = model, + num_threads = variant.num_threads, + # batch_size = variant.batch_size, # not supported yet predict_disable_shape_check = predict_disable_shape_check, verbose = False, custom_properties = custom_properties @@ -162,6 +168,8 @@ def lightgbm_inferencing_pipeline_function(benchmark_custom_properties, data, mo inferencing_step = lightgbm_python_score_module( data = data, model = model, + num_threads = variant.num_threads, + # batch_size = variant.batch_size, # not supported yet predict_disable_shape_check = predict_disable_shape_check, verbose = False, custom_properties = custom_properties From ab9849ce75828c150f44245cd4d3dc899328a57c Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:44:45 -0800 Subject: [PATCH 11/16] fix typo --- src/common/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/io.py b/src/common/io.py index 5f1409a4..2e8317ad 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -263,5 +263,5 @@ def iter(self): # any remaining batch, or whole file if len(batch) >= 0: yield batch - elif: + else: raise NotImplementedError("file_format={self.file_format} is not implemented yet.") From 5a115590d888864cc1ee0605c111d41f85dc8750 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:46:02 -0800 Subject: [PATCH 12/16] log param --- src/scripts/inferencing/treelite_python/score.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index 51f0d2bd..7b97c95b 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -78,7 +78,8 @@ def run(self, args, logger, metrics_logger, unknown_args): """ # record relevant parameters metrics_logger.log_parameters( - num_threads=args.num_threads + num_threads=args.num_threads, + batch_size=args.batch_size, ) # make sure the output argument exists From c98241d1ec55c3a196e967de6a24c0b33b8ee9a3 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:49:16 -0800 Subject: [PATCH 13/16] log param --- src/scripts/inferencing/treelite_python/score.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scripts/inferencing/treelite_python/score.py b/src/scripts/inferencing/treelite_python/score.py index 7b97c95b..e846c9a5 100644 --- a/src/scripts/inferencing/treelite_python/score.py +++ b/src/scripts/inferencing/treelite_python/score.py @@ -117,7 +117,7 @@ def run(self, args, logger, metrics_logger, unknown_args): time_inferencing_per_batch.append((time.monotonic() - batch_start_time)) # usecs # log overall time - metrics_logger.log_metrics("time_inferencing", sum(time_inferencing_per_batch)) + metrics_logger.log_metric("time_inferencing", sum(time_inferencing_per_batch)) # use helper to log latency with the right metric names metrics_logger.log_inferencing_latencies( From 205adb455d8665e9f89259e8454306a18afb9197 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 10:58:55 -0800 Subject: [PATCH 14/16] add params in configs --- .../experiments/benchmarks/lightgbm-inferencing.yaml | 10 +++++++++- .../azureml/conf/experiments/lightgbm-inferencing.yaml | 9 +++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/pipelines/azureml/conf/experiments/benchmarks/lightgbm-inferencing.yaml b/pipelines/azureml/conf/experiments/benchmarks/lightgbm-inferencing.yaml index dfe29347..abd67da4 100644 --- a/pipelines/azureml/conf/experiments/benchmarks/lightgbm-inferencing.yaml +++ b/pipelines/azureml/conf/experiments/benchmarks/lightgbm-inferencing.yaml @@ -28,7 +28,7 @@ module_loader: # module loading params lightgbm_inferencing: # name of your particular benchmark - benchmark_name: "benchmark-inferencing-20211109.3" # need to be provided at runtime! + benchmark_name: "benchmark-inferencing-20211124.1" # need to be provided at runtime! tasks: - data: @@ -82,11 +82,19 @@ lightgbm_inferencing: variants: - framework: lightgbm_python # v3.3.0 via pypi + num_threads: 1 + - framework: lightgbm_c_api # v3.3.0 with C API prediction + - framework: lightgbm_c_api # v3.3.0 with C API prediction build: docker/lightgbm-custom/v330_patch_cpu_mpi_build.dockerfile + - framework: lightgbm_c_api # v3.2.1 with C API prediction build: docker/lightgbm-v3.2.1/linux_cpu_mpi_build.dockerfile + - framework: lightgbm_c_api # v3.2.1 with C API prediction build: docker/lightgbm-custom/v321_patch_cpu_mpi_build.dockerfile + - framework: treelite_python # v1.3.0 + num_threads: 1 + batch_size: 0 # use whole file as batch diff --git a/pipelines/azureml/conf/experiments/lightgbm-inferencing.yaml b/pipelines/azureml/conf/experiments/lightgbm-inferencing.yaml index 79712320..9647e538 100644 --- a/pipelines/azureml/conf/experiments/lightgbm-inferencing.yaml +++ b/pipelines/azureml/conf/experiments/lightgbm-inferencing.yaml @@ -43,12 +43,21 @@ lightgbm_inferencing: # list all inferencing frameworks and their builds variants: - framework: lightgbm_python # v3.3.0 via pypi + num_threads: 1 + - framework: lightgbm_c_api # v3.3.0 with C API prediction + - framework: lightgbm_c_api # v3.3.0 with C API prediction build: docker/lightgbm-custom/v330_patch_cpu_mpi_build.dockerfile + - framework: lightgbm_c_api # v3.2.1 with C API prediction build: docker/lightgbm-v3.2.1/linux_cpu_mpi_build.dockerfile + - framework: lightgbm_c_api # v3.2.1 with C API prediction build: docker/lightgbm-custom/v321_patch_cpu_mpi_build.dockerfile + - framework: treelite_python # v1.3.0 + num_threads: 1 + batch_size: 0 # use whole file as batch + From e90a98bbb1ea904fe16e9ae37307535eb77da573 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Wed, 24 Nov 2021 11:03:27 -0800 Subject: [PATCH 15/16] fix name --- src/scripts/inferencing/lightgbm_cli/spec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scripts/inferencing/lightgbm_cli/spec.yaml b/src/scripts/inferencing/lightgbm_cli/spec.yaml index 99c82466..41bfa111 100644 --- a/src/scripts/inferencing/lightgbm_cli/spec.yaml +++ b/src/scripts/inferencing/lightgbm_cli/spec.yaml @@ -37,7 +37,7 @@ command: >- python score.py --data {inputs.data} --model {inputs.model} - --num_threads {inputs.n_threads} + --num_threads {inputs.num_threads} [--lightgbm_exec_path {inputs.lightgbm_exec_path}] [--predict_disable_shape_check {inputs.predict_disable_shape_check}] [--verbose {inputs.verbose}] From 51c375b1e019e7eeae2a1d7417bf0ae0d5e94296 Mon Sep 17 00:00:00 2001 From: Jeff Omhover Date: Fri, 3 Dec 2021 11:02:11 -0800 Subject: [PATCH 16/16] merge and resolve --- src/common/metrics.py | 64 ------------------------------------------- 1 file changed, 64 deletions(-) diff --git a/src/common/metrics.py b/src/common/metrics.py index 55b44ca3..909761f6 100644 --- a/src/common/metrics.py +++ b/src/common/metrics.py @@ -172,70 +172,6 @@ def log_time_block(self, metric_name, step=None): # see class below with proper __enter__ and __exit__ return LogTimeBlock(metric_name, step=step) - def log_inferencing_latencies(self, time_per_batch, batch_length=1, factor_to_usecs=1000000.0): - """Logs prediction latencies (for inferencing) with lots of fancy metrics and plots. - - Args: - time_per_batch_list (List[float]): time per inferencing batch - batch_lengths (Union[List[int],int]): length of each batch (List or constant) - factor_to_usecs (float): factor to apply to time_per_batch to convert to microseconds - """ - if isinstance(batch_length, list): - sum_batch_lengths = sum(batch_length) - else: - sum_batch_lengths = batch_length*len(time_per_batch) - - # log metadata - self.log_metric("prediction_batches", len(time_per_batch)) - self.log_metric("prediction_queries", sum_batch_lengths) - - if len(time_per_batch) > 0: - self.log_metric("prediction_latency_avg", (sum(time_per_batch) * factor_to_usecs)/sum_batch_lengths) # usecs - - # if there's more than 1 batch, compute percentiles - if len(time_per_batch) > 1: - import numpy as np - import matplotlib.pyplot as plt - - # latency per batch - batch_run_times = np.array(time_per_batch) * factor_to_usecs - self.log_metric("batch_latency_p50_usecs", np.percentile(batch_run_times, 50)) - self.log_metric("batch_latency_p75_usecs", np.percentile(batch_run_times, 75)) - self.log_metric("batch_latency_p90_usecs", np.percentile(batch_run_times, 90)) - self.log_metric("batch_latency_p95_usecs", np.percentile(batch_run_times, 95)) - self.log_metric("batch_latency_p99_usecs", np.percentile(batch_run_times, 99)) - - # show the distribution prediction latencies - fig, ax = plt.subplots(1) - ax.hist(batch_run_times, bins=100) - ax.set_title("Latency-per-batch histogram (log scale)") - plt.xlabel("usecs") - plt.ylabel("occurence") - plt.yscale('log') - - # record in mlflow - self.log_figure(fig, "batch_latency_log_histogram.png") - - # latency per query - if isinstance(batch_length, list): - prediction_latencies = np.array(time_per_batch) * factor_to_usecs / np.array(batch_length) - else: - prediction_latencies = np.array(time_per_batch) * factor_to_usecs / batch_length - - self.log_metric("prediction_latency_p50_usecs", np.percentile(prediction_latencies, 50)) - self.log_metric("prediction_latency_p75_usecs", np.percentile(prediction_latencies, 75)) - self.log_metric("prediction_latency_p90_usecs", np.percentile(prediction_latencies, 90)) - self.log_metric("prediction_latency_p95_usecs", np.percentile(prediction_latencies, 95)) - self.log_metric("prediction_latency_p99_usecs", np.percentile(prediction_latencies, 99)) - - # show the distribution prediction latencies - fig, ax = plt.subplots(1) - ax.hist(prediction_latencies, bins=100) - ax.set_title("Latency-per-prediction histogram (log scale)") - plt.xlabel("usecs") - plt.ylabel("occurence") - plt.yscale('log') - def log_inferencing_latencies(self, time_per_batch, batch_length=1, factor_to_usecs=1000000.0): """Logs prediction latencies (for inferencing) with lots of fancy metrics and plots.