Skip to content
This repository was archived by the owner on Apr 8, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 217 additions & 102 deletions src/common/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import threading
import time
import psutil
import numpy as np
import tempfile
import json

Expand Down Expand Up @@ -189,10 +190,14 @@ def append_perf_metrics(self, perf_metrics):


class PerfReportPlotter():
PERF_DISK_NET_PLOT_BINS = 50
PERF_DISK_NET_PLOT_FIGSIZE = (16,8)

"""Once collected all perf reports from all nodes"""
def __init__(self, metrics_logger):
self.all_reports = {}
self.metrics_logger = metrics_logger
self.logger = logging.getLogger(__name__)

def save_to(self, perf_report_file_path=None):
"""Saves all reports into a json file"""
Expand All @@ -210,107 +215,217 @@ def add_perf_reports(self, perf_reports, node):
self.all_reports[node] = perf_reports

def report_nodes_perf(self):
"""Report performance via metrics/plots for all nodes (loop)"""
# Currently reporting one metric per node
for node in self.all_reports:
# CPU UTILIZATION
cpu_avg_utilization = [ report["cpu_pct_per_cpu_avg"] for report in self.all_reports[node] ]

self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_avg)",
max(cpu_avg_utilization),
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_pct",
sum(cpu_avg_utilization)/len(cpu_avg_utilization),
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_at100_pct",
sum( [ utilization >= 100.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_over80_pct",
sum( [ utilization >= 80.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_over40_pct",
sum( [ utilization >= 40.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_over20_pct",
sum( [ utilization >= 20.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
step=node
)
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_min)",
max([ report["cpu_pct_per_cpu_min"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_max)",
max([ report["cpu_pct_per_cpu_max"] for report in self.all_reports[node] ]),
step=node
)

# "CPU HOURS"
job_internal_cpu_hours = (time.time() - self.all_reports[node][0]["timestamp"]) * psutil.cpu_count() / 60 / 60
self.metrics_logger.log_metric(
"node_cpu_hours",
job_internal_cpu_hours,
step=node
)
self.metrics_logger.log_metric(
"node_unused_cpu_hours",
job_internal_cpu_hours * (100.0 - sum(cpu_avg_utilization)/len(cpu_avg_utilization)) / 100.0,
step=node
)

# MEM
self.metrics_logger.log_metric(
"max_t_(mem_percent)",
max([ report["mem_percent"] for report in self.all_reports[node] ]),
step=node
)

# DISK
self.metrics_logger.log_metric(
"max_t_disk_usage_percent",
max([ report["disk_usage_percent"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"total_disk_io_read_mb",
max([ report["disk_io_read_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"total_disk_io_write_mb",
max([ report["disk_io_write_mb"] for report in self.all_reports[node] ]),
step=node
)

# NET I/O
self.metrics_logger.log_metric(
"total_net_io_lo_sent_mb",
max([ report["net_io_lo_sent_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"total_net_io_ext_sent_mb",
max([ report["net_io_ext_sent_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"total_net_io_lo_recv_mb",
max([ report["net_io_lo_recv_mb"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"total_net_io_ext_recv_mb",
max([ report["net_io_ext_recv_mb"] for report in self.all_reports[node] ]),
step=node
)
self.report_node_perf(node)

def diff_at_partitions(self, a, p, t):
"""Compute the difference end-begin for all partitions in an array.

Args:
a (np.array): a np array of increasing values
p (int): a given number of partitions
t (np.array): np array of timestamps for a

Returns:
diff_part (np.array): dimension p
"""
if len(a) > (2*p):
partitioned_diff = []
partitioned_time = []
index_partitions = np.array_split(np.arange(len(a)), p)
for part in index_partitions:
partitioned_diff.append(a[part[-1]] - a[part[0]]) # diff between last and first value of partition
partitioned_time.append(t[part[0]])
return np.array(partitioned_diff), np.array(partitioned_time)
else:
return np.ediff1d(a), t[1::]

def report_node_perf(self, node):
"""Report performance via metrics/plots for one given node"""
# CPU UTILIZATION
cpu_avg_utilization = [ report["cpu_pct_per_cpu_avg"] for report in self.all_reports[node] ]
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_avg)",
max(cpu_avg_utilization),
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_pct",
sum(cpu_avg_utilization)/len(cpu_avg_utilization),
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_at100_pct",
sum( [ utilization >= 100.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_over80_pct",
sum( [ utilization >= 80.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_over40_pct",
sum( [ utilization >= 40.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
step=node
)
self.metrics_logger.log_metric(
"cpu_avg_utilization_over20_pct",
sum( [ utilization >= 20.0 for utilization in cpu_avg_utilization])/len(cpu_avg_utilization)*100.0,
step=node
)
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_min)",
max([ report["cpu_pct_per_cpu_min"] for report in self.all_reports[node] ]),
step=node
)
self.metrics_logger.log_metric(
"max_t_(cpu_pct_per_cpu_max)",
max([ report["cpu_pct_per_cpu_max"] for report in self.all_reports[node] ]),
step=node
)

# "CPU HOURS"
job_internal_cpu_hours = (time.time() - self.all_reports[node][0]["timestamp"]) * psutil.cpu_count() / 60 / 60
self.metrics_logger.log_metric(
"node_cpu_hours",
job_internal_cpu_hours,
step=node
)
self.metrics_logger.log_metric(
"node_unused_cpu_hours",
job_internal_cpu_hours * (100.0 - sum(cpu_avg_utilization)/len(cpu_avg_utilization)) / 100.0,
step=node
)

# MEM
mem_utilization = [ report["mem_percent"] for report in self.all_reports[node] ]
self.metrics_logger.log_metric(
"max_t_(mem_percent)",
max([ report["mem_percent"] for report in self.all_reports[node] ]),
step=node
)

# DISK
self.metrics_logger.log_metric(
"max_t_disk_usage_percent",
max([ report["disk_usage_percent"] for report in self.all_reports[node] ]),
step=node
)

disk_io_read = np.array([ report["disk_io_read_mb"] for report in self.all_reports[node] ])
self.metrics_logger.log_metric(
"total_disk_io_read_mb",
np.max(disk_io_read),
step=node
)
disk_io_write = np.array([ report["disk_io_write_mb"] for report in self.all_reports[node] ])
self.metrics_logger.log_metric(
"total_disk_io_write_mb",
np.max(disk_io_write),
step=node
)

# NET I/O
self.metrics_logger.log_metric(
"total_net_io_lo_sent_mb",
max([ report["net_io_lo_sent_mb"] for report in self.all_reports[node] ]),
step=node
)
net_io_ext_sent = np.array([ report["net_io_ext_sent_mb"] for report in self.all_reports[node] ])
self.metrics_logger.log_metric(
"total_net_io_ext_sent_mb",
np.max(net_io_ext_sent),
step=node
)
self.metrics_logger.log_metric(
"total_net_io_lo_recv_mb",
max([ report["net_io_lo_recv_mb"] for report in self.all_reports[node] ]),
step=node
)
net_io_ext_recv = np.array([ report["net_io_ext_recv_mb"] for report in self.all_reports[node] ])
self.metrics_logger.log_metric(
"total_net_io_ext_recv_mb",
np.max(net_io_ext_recv),
step=node
)

# then plot everything
timestamps = np.array([ report["timestamp"] for report in self.all_reports[node] ])
self.plot_all_perf(node, timestamps, disk_io_read, disk_io_write, net_io_ext_sent, net_io_ext_recv, cpu_avg_utilization, mem_utilization)

def plot_all_perf(self, node, timestamps, disk_io_read, disk_io_write, net_io_ext_sent, net_io_ext_recv, cpu_utilization, mem_utilization):
"""Plots mem/cpy/disk/net using matplotlib.

Args:
node (int): node id
timestamps (np.array): timestamps for all following arrays
disk_io_read (np.array): disk read cumulative sum (increasing)
disk_io_write (np.array): disk write cumulative sum (increasing)
net_io_ext_sent (np.array): net sent cumulative sum (increasing)
net_io_ext_recv (np.array): net recv cumulative sum (increasing)
cpu_utilization (np.array): cpu utilization
mem_utilization (np.array): mem utilization

Returns:
None

NOTE: we're expecting all arrays to have same length.
"""
if len(timestamps) < 3:
self.logger.warning(f"Number of data points for perf is too low {len(timestamps)}, we will not print perf plot.")
return

timestamps = timestamps - timestamps[0]
disk_io_read = disk_io_read - disk_io_read[0] # cumsum starting at 0
disk_io_write = disk_io_write - disk_io_write[0] # cumsum starting at 0
net_io_ext_sent = net_io_ext_sent - net_io_ext_sent[0] # cumsum starting at 0
net_io_ext_recv = net_io_ext_recv - net_io_ext_recv[0] # cumsum starting at 0

# import matploblib just-in-time
import matplotlib.pyplot as plt
plt.switch_backend('agg') # to enable it in thread

fig, ax = plt.subplots(
nrows=1,
ncols=1,
sharex=False, # both mem+cpu and disk+net will share the same x axis (job time)
figsize=PerfReportPlotter.PERF_DISK_NET_PLOT_FIGSIZE
)

reduced_disk_io_read, reduced_timestamps = self.diff_at_partitions(disk_io_read, PerfReportPlotter.PERF_DISK_NET_PLOT_BINS, timestamps)
reduced_disk_io_write, _ = self.diff_at_partitions(disk_io_write, PerfReportPlotter.PERF_DISK_NET_PLOT_BINS, timestamps)
reduced_net_io_ext_sent, _ = self.diff_at_partitions(net_io_ext_sent, PerfReportPlotter.PERF_DISK_NET_PLOT_BINS, timestamps)
reduced_net_io_ext_recv, _ = self.diff_at_partitions(net_io_ext_recv, PerfReportPlotter.PERF_DISK_NET_PLOT_BINS, timestamps)

# identify a good normalization value for disk+io plots
max_disk_net_normalization_value = max(
max(reduced_disk_io_read),
max(reduced_disk_io_write),
max(reduced_net_io_ext_sent),
max(reduced_net_io_ext_recv),
) * 1.05 # 5% more to keep some visual margin on the plot

width = max(reduced_timestamps) / len(reduced_timestamps) / 5 # barplot width = 1/5 of timestamp unit
ax.bar(reduced_timestamps, reduced_disk_io_read, width=width, label=f"disk read", color="w", hatch='/', edgecolor="dimgray")
ax.bar(reduced_timestamps + width, reduced_disk_io_write, width=width, label=f"disk write", color="w", hatch='-', edgecolor="dimgray")
ax.bar(reduced_timestamps + 2*width, reduced_net_io_ext_sent, width=width, label=f"net recv", color="silver")
ax.bar(reduced_timestamps + 3*width, reduced_net_io_ext_recv, width=width, label=f"net sent", color="gray")
ax.set_ylim([0.0, max_disk_net_normalization_value])
ax.legend(loc='upper left')
ax.set_ylabel('MB')

ax2 = ax.twinx() # will superpose plot to ax, but show axis on right
ax2.plot(timestamps, cpu_utilization, label="cpu", color="r")
ax2.plot(timestamps, mem_utilization, label="mem", color="b", linestyle='dotted')
ax2.set_ylim([0.0, max(max(cpu_utilization), max(mem_utilization), 100.0)])
ax2.legend(loc='upper right')
ax2.set_ylabel('%')

ax.set_xlabel("time in seconds (job duration={:.2f}s)".format(timestamps[-1]-timestamps[0]))
plt.title(f"Perf plot for node {node}")

# record in mlflow
self.metrics_logger.log_figure(fig, f"perf_plot_node{node}.png")
1 change: 1 addition & 0 deletions src/scripts/data_processing/generate_data/conda_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dependencies:
- azureml-defaults==1.35.0
- azureml-mlflow==1.35.0
- psutil==5.8.0
- matplotlib==3.4.3
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ dependencies:
- azureml-mlflow==1.35.0
- psutil==5.8.0
- lightgbm==3.2.1
- matplotlib==3.4.3
1 change: 1 addition & 0 deletions src/scripts/data_processing/partition_data/conda_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ dependencies:
- azureml-defaults==1.35.0
- azureml-mlflow==1.35.0
- psutil==5.8.0
- matplotlib==3.4.3

1 change: 1 addition & 0 deletions src/scripts/inferencing/custom_win_cli/conda_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ dependencies:
- azureml-defaults==1.35.0
- azureml-mlflow==1.35.0
- psutil==5.8.0
- matplotlib==3.4.3
1 change: 1 addition & 0 deletions src/scripts/inferencing/treelite_python/conda_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dependencies:
- treelite_runtime==2.1.0
- pandas>=1.1,<1.2
- numpy>=1.10,<1.20
- matplotlib==3.4.3
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dependencies:
- treelite_runtime==2.1.0
- pandas>=1.1,<1.2
- numpy>=1.10,<1.20
- matplotlib==3.4.3
1 change: 1 addition & 0 deletions src/scripts/sample/conda_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ dependencies:
- azureml-defaults==1.35.0
- azureml-mlflow==1.35.0
- psutil==5.8.0
- matplotlib==3.4.3
Loading