diff --git a/src/common/perf.py b/src/common/perf.py index 8900b6e4..906b51dc 100644 --- a/src/common/perf.py +++ b/src/common/perf.py @@ -9,6 +9,7 @@ import threading import time import psutil +import numpy as np import tempfile import json @@ -189,10 +190,14 @@ def append_perf_metrics(self, perf_metrics): class PerfReportPlotter(): + PERF_DISK_NET_PLOT_BINS = 50 + PERF_DISK_NET_PLOT_FIGSIZE = (16,8) + """Once collected all perf reports from all nodes""" def __init__(self, metrics_logger): self.all_reports = {} self.metrics_logger = metrics_logger + self.logger = logging.getLogger(__name__) def save_to(self, perf_report_file_path=None): """Saves all reports into a json file""" @@ -210,107 +215,217 @@ def add_perf_reports(self, perf_reports, node): self.all_reports[node] = perf_reports def report_nodes_perf(self): + """Report performance via metrics/plots for all nodes (loop)""" # Currently reporting one metric per node for node in self.all_reports: - # CPU UTILIZATION - cpu_avg_utilization = [ report["cpu_pct_per_cpu_avg"] for report in self.all_reports[node] ] - - self.metrics_logger.log_metric( - "max_t_(cpu_pct_per_cpu_avg)", - max(cpu_avg_utilization), - step=node - ) - self.metrics_logger.log_metric( - "cpu_avg_utilization_pct", - sum(cpu_avg_utilization)/len(cpu_avg_utilization), - step=node - ) - self.metrics_logger.log_metric( - "cpu_avg_utilization_at100_pct", - sum( [ utilization >= 100.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0, - step=node - ) - self.metrics_logger.log_metric( - "cpu_avg_utilization_over80_pct", - sum( [ utilization >= 80.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0, - step=node - ) - self.metrics_logger.log_metric( - "cpu_avg_utilization_over40_pct", - sum( [ utilization >= 40.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0, - step=node - ) - self.metrics_logger.log_metric( - "cpu_avg_utilization_over20_pct", - sum( [ utilization >= 20.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0, - step=node - ) - self.metrics_logger.log_metric( - "max_t_(cpu_pct_per_cpu_min)", - max([ report["cpu_pct_per_cpu_min"] for report in self.all_reports[node] ]), - step=node - ) - self.metrics_logger.log_metric( - "max_t_(cpu_pct_per_cpu_max)", - max([ report["cpu_pct_per_cpu_max"] for report in self.all_reports[node] ]), - step=node - ) - - # "CPU HOURS" - job_internal_cpu_hours = (time.time() - self.all_reports[node][0]["timestamp"]) * psutil.cpu_count() / 60 / 60 - self.metrics_logger.log_metric( - "node_cpu_hours", - job_internal_cpu_hours, - step=node - ) - self.metrics_logger.log_metric( - "node_unused_cpu_hours", - job_internal_cpu_hours * (100.0 - sum(cpu_avg_utilization)/len(cpu_avg_utilization)) / 100.0, - step=node - ) - - # MEM - self.metrics_logger.log_metric( - "max_t_(mem_percent)", - max([ report["mem_percent"] for report in self.all_reports[node] ]), - step=node - ) - - # DISK - self.metrics_logger.log_metric( - "max_t_disk_usage_percent", - max([ report["disk_usage_percent"] for report in self.all_reports[node] ]), - step=node - ) - self.metrics_logger.log_metric( - "total_disk_io_read_mb", - max([ report["disk_io_read_mb"] for report in self.all_reports[node] ]), - step=node - ) - self.metrics_logger.log_metric( - "total_disk_io_write_mb", - max([ report["disk_io_write_mb"] for report in self.all_reports[node] ]), - step=node - ) - - # NET I/O - self.metrics_logger.log_metric( - "total_net_io_lo_sent_mb", - max([ report["net_io_lo_sent_mb"] for report in self.all_reports[node] ]), - step=node - ) - self.metrics_logger.log_metric( - "total_net_io_ext_sent_mb", - max([ report["net_io_ext_sent_mb"] for report in self.all_reports[node] ]), - step=node - ) - self.metrics_logger.log_metric( - "total_net_io_lo_recv_mb", - max([ report["net_io_lo_recv_mb"] for report in self.all_reports[node] ]), - step=node - ) - self.metrics_logger.log_metric( - "total_net_io_ext_recv_mb", - max([ report["net_io_ext_recv_mb"] for report in self.all_reports[node] ]), - step=node - ) + self.report_node_perf(node) + + def diff_at_partitions(self, a, p, t): + """Compute the difference end-begin for all partitions in an array. + + Args: + a (np.array): a np array of increasing values + p (int): a given number of partitions + t (np.array): np array of timestamps for a + + Returns: + diff_part (np.array): dimension p + """ + if len(a) > (2*p): + partitioned_diff = [] + partitioned_time = [] + index_partitions = np.array_split(np.arange(len(a)), p) + for part in index_partitions: + partitioned_diff.append(a[part[-1]] - a[part[0]]) # diff between last and first value of partition + partitioned_time.append(t[part[0]]) + return np.array(partitioned_diff), np.array(partitioned_time) + else: + return np.ediff1d(a), t[1::] + + def report_node_perf(self, node): + """Report performance via metrics/plots for one given node""" + # CPU UTILIZATION + cpu_avg_utilization = [ report["cpu_pct_per_cpu_avg"] for report in self.all_reports[node] ] + self.metrics_logger.log_metric( + "max_t_(cpu_pct_per_cpu_avg)", + max(cpu_avg_utilization), + step=node + ) + self.metrics_logger.log_metric( + "cpu_avg_utilization_pct", + sum(cpu_avg_utilization)/len(cpu_avg_utilization), + step=node + ) + self.metrics_logger.log_metric( + "cpu_avg_utilization_at100_pct", + sum( [ utilization >= 100.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0, + step=node + ) + self.metrics_logger.log_metric( + "cpu_avg_utilization_over80_pct", + sum( [ utilization >= 80.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0, + step=node + ) + self.metrics_logger.log_metric( + "cpu_avg_utilization_over40_pct", + sum( [ utilization >= 40.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0, + step=node + ) + self.metrics_logger.log_metric( + "cpu_avg_utilization_over20_pct", + sum( [ utilization >= 20.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0, + step=node + ) + self.metrics_logger.log_metric( + "max_t_(cpu_pct_per_cpu_min)", + max([ report["cpu_pct_per_cpu_min"] for report in self.all_reports[node] ]), + step=node + ) + self.metrics_logger.log_metric( + "max_t_(cpu_pct_per_cpu_max)", + max([ report["cpu_pct_per_cpu_max"] for report in self.all_reports[node] ]), + step=node + ) + + # "CPU HOURS" + job_internal_cpu_hours = (time.time() - self.all_reports[node][0]["timestamp"]) * psutil.cpu_count() / 60 / 60 + self.metrics_logger.log_metric( + "node_cpu_hours", + job_internal_cpu_hours, + step=node + ) + self.metrics_logger.log_metric( + "node_unused_cpu_hours", + job_internal_cpu_hours * (100.0 - sum(cpu_avg_utilization)/len(cpu_avg_utilization)) / 100.0, + step=node + ) + + # MEM + mem_utilization = [ report["mem_percent"] for report in self.all_reports[node] ] + self.metrics_logger.log_metric( + "max_t_(mem_percent)", + max([ report["mem_percent"] for report in self.all_reports[node] ]), + step=node + ) + + # DISK + self.metrics_logger.log_metric( + "max_t_disk_usage_percent", + max([ report["disk_usage_percent"] for report in self.all_reports[node] ]), + step=node + ) + + disk_io_read = np.array([ report["disk_io_read_mb"] for report in self.all_reports[node] ]) + self.metrics_logger.log_metric( + "total_disk_io_read_mb", + np.max(disk_io_read), + step=node + ) + disk_io_write = np.array([ report["disk_io_write_mb"] for report in self.all_reports[node] ]) + self.metrics_logger.log_metric( + "total_disk_io_write_mb", + np.max(disk_io_write), + step=node + ) + + # NET I/O + self.metrics_logger.log_metric( + "total_net_io_lo_sent_mb", + max([ report["net_io_lo_sent_mb"] for report in self.all_reports[node] ]), + step=node + ) + net_io_ext_sent = np.array([ report["net_io_ext_sent_mb"] for report in self.all_reports[node] ]) + self.metrics_logger.log_metric( + "total_net_io_ext_sent_mb", + np.max(net_io_ext_sent), + step=node + ) + self.metrics_logger.log_metric( + "total_net_io_lo_recv_mb", + max([ report["net_io_lo_recv_mb"] for report in self.all_reports[node] ]), + step=node + ) + net_io_ext_recv = np.array([ report["net_io_ext_recv_mb"] for report in self.all_reports[node] ]) + self.metrics_logger.log_metric( + "total_net_io_ext_recv_mb", + np.max(net_io_ext_recv), + step=node + ) + + # then plot everything + timestamps = np.array([ report["timestamp"] for report in self.all_reports[node] ]) + self.plot_all_perf(node, timestamps, disk_io_read, disk_io_write, net_io_ext_sent, net_io_ext_recv, cpu_avg_utilization, mem_utilization) + + def plot_all_perf(self, node, timestamps, disk_io_read, disk_io_write, net_io_ext_sent, net_io_ext_recv, cpu_utilization, mem_utilization): + """Plots mem/cpy/disk/net using matplotlib. + + Args: + node (int): node id + timestamps (np.array): timestamps for all following arrays + disk_io_read (np.array): disk read cumulative sum (increasing) + disk_io_write (np.array): disk write cumulative sum (increasing) + net_io_ext_sent (np.array): net sent cumulative sum (increasing) + net_io_ext_recv (np.array): net recv cumulative sum (increasing) + cpu_utilization (np.array): cpu utilization + mem_utilization (np.array): mem utilization + + Returns: + None + + NOTE: we're expecting all arrays to have same length. + """ + if len(timestamps) < 3: + self.logger.warning(f"Number of data points for perf is too low {len(timestamps)}, we will not print perf plot.") + return + + timestamps = timestamps - timestamps[0] + disk_io_read = disk_io_read - disk_io_read[0] # cumsum starting at 0 + disk_io_write = disk_io_write - disk_io_write[0] # cumsum starting at 0 + net_io_ext_sent = net_io_ext_sent - net_io_ext_sent[0] # cumsum starting at 0 + net_io_ext_recv = net_io_ext_recv - net_io_ext_recv[0] # cumsum starting at 0 + + # import matploblib just-in-time + import matplotlib.pyplot as plt + plt.switch_backend('agg') # to enable it in thread + + fig, ax = plt.subplots( + nrows=1, + ncols=1, + sharex=False, # both mem+cpu and disk+net will share the same x axis (job time) + figsize=PerfReportPlotter.PERF_DISK_NET_PLOT_FIGSIZE + ) + + reduced_disk_io_read, reduced_timestamps = self.diff_at_partitions(disk_io_read, PerfReportPlotter.PERF_DISK_NET_PLOT_BINS, timestamps) + reduced_disk_io_write, _ = self.diff_at_partitions(disk_io_write, PerfReportPlotter.PERF_DISK_NET_PLOT_BINS, timestamps) + reduced_net_io_ext_sent, _ = self.diff_at_partitions(net_io_ext_sent, PerfReportPlotter.PERF_DISK_NET_PLOT_BINS, timestamps) + reduced_net_io_ext_recv, _ = self.diff_at_partitions(net_io_ext_recv, PerfReportPlotter.PERF_DISK_NET_PLOT_BINS, timestamps) + + # identify a good normalization value for disk+io plots + max_disk_net_normalization_value = max( + max(reduced_disk_io_read), + max(reduced_disk_io_write), + max(reduced_net_io_ext_sent), + max(reduced_net_io_ext_recv), + ) * 1.05 # 5% more to keep some visual margin on the plot + + width = max(reduced_timestamps) / len(reduced_timestamps) / 5 # barplot width = 1/5 of timestamp unit + ax.bar(reduced_timestamps, reduced_disk_io_read, width=width, label=f"disk read", color="w", hatch='/', edgecolor="dimgray") + ax.bar(reduced_timestamps + width, reduced_disk_io_write, width=width, label=f"disk write", color="w", hatch='-', edgecolor="dimgray") + ax.bar(reduced_timestamps + 2*width, reduced_net_io_ext_sent, width=width, label=f"net recv", color="silver") + ax.bar(reduced_timestamps + 3*width, reduced_net_io_ext_recv, width=width, label=f"net sent", color="gray") + ax.set_ylim([0.0, max_disk_net_normalization_value]) + ax.legend(loc='upper left') + ax.set_ylabel('MB') + + ax2 = ax.twinx() # will superpose plot to ax, but show axis on right + ax2.plot(timestamps, cpu_utilization, label="cpu", color="r") + ax2.plot(timestamps, mem_utilization, label="mem", color="b", linestyle='dotted') + ax2.set_ylim([0.0, max(max(cpu_utilization), max(mem_utilization), 100.0)]) + ax2.legend(loc='upper right') + ax2.set_ylabel('%') + + ax.set_xlabel("time in seconds (job duration={:.2f}s)".format(timestamps[-1]-timestamps[0])) + plt.title(f"Perf plot for node {node}") + + # record in mlflow + self.metrics_logger.log_figure(fig, f"perf_plot_node{node}.png") diff --git a/src/scripts/data_processing/generate_data/conda_env.yaml b/src/scripts/data_processing/generate_data/conda_env.yaml index 223c34f7..9480a920 100644 --- a/src/scripts/data_processing/generate_data/conda_env.yaml +++ b/src/scripts/data_processing/generate_data/conda_env.yaml @@ -10,3 +10,4 @@ dependencies: - azureml-defaults==1.35.0 - azureml-mlflow==1.35.0 - psutil==5.8.0 + - matplotlib==3.4.3 diff --git a/src/scripts/data_processing/lightgbm_data2bin/conda_env.yml b/src/scripts/data_processing/lightgbm_data2bin/conda_env.yml index 2c403f37..3245a774 100644 --- a/src/scripts/data_processing/lightgbm_data2bin/conda_env.yml +++ b/src/scripts/data_processing/lightgbm_data2bin/conda_env.yml @@ -11,3 +11,4 @@ dependencies: - azureml-mlflow==1.35.0 - psutil==5.8.0 - lightgbm==3.2.1 + - matplotlib==3.4.3 diff --git a/src/scripts/data_processing/partition_data/conda_env.yml b/src/scripts/data_processing/partition_data/conda_env.yml index 39dabefc..f15a18f7 100644 --- a/src/scripts/data_processing/partition_data/conda_env.yml +++ b/src/scripts/data_processing/partition_data/conda_env.yml @@ -10,4 +10,5 @@ dependencies: - azureml-defaults==1.35.0 - azureml-mlflow==1.35.0 - psutil==5.8.0 + - matplotlib==3.4.3 diff --git a/src/scripts/inferencing/custom_win_cli/conda_env.yaml b/src/scripts/inferencing/custom_win_cli/conda_env.yaml index 78eed94f..a15650c6 100644 --- a/src/scripts/inferencing/custom_win_cli/conda_env.yaml +++ b/src/scripts/inferencing/custom_win_cli/conda_env.yaml @@ -8,3 +8,4 @@ dependencies: - azureml-defaults==1.35.0 - azureml-mlflow==1.35.0 - psutil==5.8.0 + - matplotlib==3.4.3 diff --git a/src/scripts/inferencing/treelite_python/conda_env.yaml b/src/scripts/inferencing/treelite_python/conda_env.yaml index b31a7368..0d08ea4c 100644 --- a/src/scripts/inferencing/treelite_python/conda_env.yaml +++ b/src/scripts/inferencing/treelite_python/conda_env.yaml @@ -12,3 +12,4 @@ dependencies: - treelite_runtime==2.1.0 - pandas>=1.1,<1.2 - numpy>=1.10,<1.20 + - matplotlib==3.4.3 diff --git a/src/scripts/model_transformation/treelite_compile/conda_env.yaml b/src/scripts/model_transformation/treelite_compile/conda_env.yaml index b31a7368..0d08ea4c 100644 --- a/src/scripts/model_transformation/treelite_compile/conda_env.yaml +++ b/src/scripts/model_transformation/treelite_compile/conda_env.yaml @@ -12,3 +12,4 @@ dependencies: - treelite_runtime==2.1.0 - pandas>=1.1,<1.2 - numpy>=1.10,<1.20 + - matplotlib==3.4.3 diff --git a/src/scripts/sample/conda_env.yaml b/src/scripts/sample/conda_env.yaml index 0201788d..2b11a210 100644 --- a/src/scripts/sample/conda_env.yaml +++ b/src/scripts/sample/conda_env.yaml @@ -8,3 +8,4 @@ dependencies: - azureml-defaults==1.35.0 - azureml-mlflow==1.35.0 - psutil==5.8.0 + - matplotlib==3.4.3 diff --git a/tests/common/test_component.py b/tests/common/test_component.py index 2865f9e3..5c7655cb 100644 --- a/tests/common/test_component.py +++ b/tests/common/test_component.py @@ -99,14 +99,15 @@ def __init__(self): def run(self, args, logger, metrics_logger, unknown_args): # don't do anything with metrics_logger.log_time_block("fake_time_block", step=1): - time.sleep(1) + time.sleep(5) @patch('mlflow.end_run') +@patch('mlflow.log_figure') @patch('mlflow.log_artifact') @patch('mlflow.log_metric') @patch('mlflow.set_tags') @patch('mlflow.start_run') -def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock, mlflow_log_metric_mock, mlflow_log_artifact_mock, mlflow_end_run_mock): +def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock, mlflow_log_metric_mock, mlflow_log_artifact_mock, mlflow_log_figure_mock, mlflow_end_run_mock): # just run main test_component = FakeSingleNodeScript.main( [ @@ -132,7 +133,8 @@ def test_single_node_script_metrics(mlflow_start_run_mock, mlflow_set_tags_mock, mlflow_log_metric_mock ) - mlflow_log_artifact_mock.assert_called_once() + mlflow_log_artifact_mock.assert_called_once() # perf data exported in json + mlflow_log_figure_mock.assert_called_once() # perf plot class FailingSingleNodeScript(SingleNodeScript): diff --git a/tests/common/test_distributed.py b/tests/common/test_distributed.py index 23424af1..80662093 100644 --- a/tests/common/test_distributed.py +++ b/tests/common/test_distributed.py @@ -22,16 +22,17 @@ def __init__(self): def run(self, args, logger, metrics_logger, unknown_args): # don't do anything with metrics_logger.log_time_block("fake_time_block", step=1): - time.sleep(1) + time.sleep(5) @patch('mlflow.end_run') +@patch('mlflow.log_figure') @patch('mlflow.log_artifact') @patch('mlflow.log_metric') @patch('mlflow.set_tags') @patch('mlflow.start_run') @patch('common.distributed.MultiNodeMPIDriver') -def test_multi_node_script(mpi_driver_mock, mlflow_start_run_mock, mlflow_set_tags_mock, mlflow_log_metric_mock, mlflow_log_artifact_mock, mlflow_end_run_mock): +def test_multi_node_script(mpi_driver_mock, mlflow_start_run_mock, mlflow_set_tags_mock, mlflow_log_metric_mock, mlflow_log_artifact_mock, mlflow_log_figure_mock, mlflow_end_run_mock): # fake mpi initialization + config mpi_driver_mock().get_multinode_config.return_value = multinode_config_class( 1, # world_size @@ -65,7 +66,8 @@ def test_multi_node_script(mpi_driver_mock, mlflow_start_run_mock, mlflow_set_ta mlflow_log_metric_mock ) - mlflow_log_artifact_mock.assert_called_once() + mlflow_log_artifact_mock.assert_called_once() # perf data exported in json + mlflow_log_figure_mock.assert_called_once() # perf plot class FailingMultiNodeScript(MultiNodeScript):