From 980f01a007297f8e0226b63fe399e7458294d547 Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Fri, 11 Jul 2025 11:35:34 +0200 Subject: [PATCH] Refactor PerformanceMonitor using dataclasses --- .../src/utils/performance_monitor.py | 148 ++++++++++++------ .../test/unit/test_performance_monitor.py | 20 +-- 2 files changed, 112 insertions(+), 56 deletions(-) diff --git a/simulation_bridge/src/utils/performance_monitor.py b/simulation_bridge/src/utils/performance_monitor.py index 0555e1d0..9803934b 100644 --- a/simulation_bridge/src/utils/performance_monitor.py +++ b/simulation_bridge/src/utils/performance_monitor.py @@ -5,6 +5,7 @@ from dataclasses import dataclass, field from pathlib import Path from typing import Dict, Optional, Any, List +from enum import Enum import psutil @@ -16,6 +17,7 @@ @dataclass class PerformanceMetrics: + """Metrics collected for a single operation.""" client_id: str client_protocol: str operation_id: str @@ -34,6 +36,31 @@ class PerformanceMetrics: output_overheads: List[float] = field(default_factory=list) total_overheads: List[float] = field(default_factory=list) + +@dataclass +class _Paths: + """Internal storage for output paths.""" + output_dir: Path = Path("performance_logs") + csv_path: Path = field( + default_factory=lambda: Path("performance_logs/performance_metrics.csv")) + + +@dataclass +class _RuntimeState: + """Internal runtime state for the monitor.""" + metrics_by_operation_id: Dict[str, PerformanceMetrics] = field( + default_factory=dict) + metrics_history: List[PerformanceMetrics] = field(default_factory=list) + process: Optional[psutil.Process] = None + + +class EventType(Enum): + """Supported performance events.""" + CORE_RECEIVED_INPUT = "core_received_input" + CORE_SENT_INPUT = "core_sent_input" + CORE_RECEIVED_RESULT = "core_received_result" + RESULT_SENT = "result_sent" + class PerformanceMonitor: _instance: Optional[PerformanceMonitor] = None _initialized: bool = False @@ -47,41 +74,52 @@ def __init__(self, config: Optional[Dict[str, Any]] = None): if self._initialized: return - self.enabled: bool = False - self.output_dir: Path = Path("performance_logs") - self.csv_path: Path = self.output_dir / "performance_metrics.csv" - self.metrics_by_operation_id: Dict[str, PerformanceMetrics] = {} - self.metrics_history: List[PerformanceMetrics] = [] - self.process: Optional[psutil.Process] = None + self._enabled: bool = False + self.paths = _Paths() + self._state = _RuntimeState() if config: perf_cfg = config.get("performance", {}) - self.enabled = perf_cfg.get("enabled", False) - log_file = perf_cfg.get("file", str(self.csv_path)) - self.output_dir = Path(log_file).parent - self.csv_path = Path(log_file) + self._enabled = perf_cfg.get("enabled", False) + log_file = perf_cfg.get("file", str(self.paths.csv_path)) + self.paths.output_dir = Path(log_file).parent + self.paths.csv_path = Path(log_file) - if self.enabled: + if self._enabled: try: - self.output_dir.mkdir(parents=True, exist_ok=True) - self.process = psutil.Process() - if not self.csv_path.exists(): + self.paths.output_dir.mkdir(parents=True, exist_ok=True) + self._state.process = psutil.Process() + if not self.paths.csv_path.exists(): self._write_csv_headers() - logger.debug("PERFORMANCE - Logging to %s", self.csv_path) + logger.debug("PERFORMANCE - Logging to %s", self.paths.csv_path) except Exception as exc: logger.error( "Failed to initialize performance monitoring: %s", exc) - self.enabled = False + self._enabled = False else: logger.debug("PERFORMANCE - Monitoring disabled") self._initialized = True + @property + def enabled(self) -> bool: + """Return whether performance monitoring is active.""" + return self._enabled + + @property + def history(self) -> List[PerformanceMetrics]: + """Return the list of completed metrics.""" + return list(self._state.metrics_history) + + def get_metric(self, operation_id: str) -> Optional[PerformanceMetrics]: + """Return metrics for an ongoing operation.""" + return self._state.metrics_by_operation_id.get(operation_id) + def _write_csv_headers(self) -> None: - if not self.enabled: + if not self._enabled: return try: - with self.csv_path.open("w", newline="", encoding="utf-8") as fh: + with self.paths.csv_path.open("w", newline="", encoding="utf-8") as fh: writer = csv.writer(fh) writer.writerow([ "Timestamp", @@ -106,24 +144,30 @@ def _write_csv_headers(self) -> None: ]) except Exception as exc: logger.error("Failed to write CSV headers: %s", exc) - self.enabled = False + self._enabled = False def _update_system_metrics(self, metric: PerformanceMetrics) -> None: - if self.process: - metric.cpu_percent = self.process.cpu_percent() - metric.memory_rss_mb = self.process.memory_info().rss / (1024 * 1024) + if self._state.process: + metric.cpu_percent = self._state.process.cpu_percent() + metric.memory_rss_mb = ( + self._state.process.memory_info().rss / (1024 * 1024) + ) def _is_valid_operation(self, operation_id: str) -> bool: - return self.enabled and operation_id in self.metrics_by_operation_id + return self._enabled and operation_id in self._state.metrics_by_operation_id - def start_operation(self, operation_id: str, client_id: str = "unknown", - protocol: str = "unknown", - simulation_type: str = "unknown") -> None: - if not self.enabled or operation_id in self.metrics_by_operation_id: + def start_operation( + self, + operation_id: str, + client_id: str = "unknown", + protocol: str = "unknown", + simulation_type: str = "unknown", + ) -> None: + if not self._enabled or operation_id in self._state.metrics_by_operation_id: return now = time.time() - self.metrics_by_operation_id[operation_id] = PerformanceMetrics( + self._state.metrics_by_operation_id[operation_id] = PerformanceMetrics( client_id=client_id, client_protocol=protocol, operation_id=operation_id, @@ -135,34 +179,42 @@ def start_operation(self, operation_id: str, client_id: str = "unknown", ) def record_core_received_input(self, operation_id: str) -> None: - self._update_timestamp(operation_id, "core_received_input_time") + self.record_event(operation_id, EventType.CORE_RECEIVED_INPUT) def record_core_sent_input(self, operation_id: str) -> None: - self._update_timestamp(operation_id, "core_sent_input_time") + self.record_event(operation_id, EventType.CORE_SENT_INPUT) def record_core_received_result(self, operation_id: str) -> None: - if not self._is_valid_operation(operation_id): - return - now = time.time() - metric = self.metrics_by_operation_id[operation_id] - metric.result_times.append(now) - self._update_system_metrics(metric) + self.record_event(operation_id, EventType.CORE_RECEIVED_RESULT) def record_result_sent(self, operation_id: str) -> None: + self.record_event(operation_id, EventType.RESULT_SENT) + + def record_event(self, operation_id: str, event: EventType) -> None: + """Generic event recorder used by the specialized methods.""" if not self._is_valid_operation(operation_id): return + now = time.time() - metric = self.metrics_by_operation_id[operation_id] - metric.result_sent_times.append(now) - if metric.result_times: - overhead = now - metric.result_times[-1] - metric.output_overheads.append(overhead) + metric = self._state.metrics_by_operation_id[operation_id] + + if event == EventType.CORE_RECEIVED_INPUT: + metric.core_received_input_time = now + elif event == EventType.CORE_SENT_INPUT: + metric.core_sent_input_time = now + elif event == EventType.CORE_RECEIVED_RESULT: + metric.result_times.append(now) + elif event == EventType.RESULT_SENT: + metric.result_sent_times.append(now) + if metric.result_times: + metric.output_overheads.append(now - metric.result_times[-1]) + self._update_system_metrics(metric) def finalize_operation(self, operation_id: str) -> None: if not self._is_valid_operation(operation_id): return - metric = self.metrics_by_operation_id.pop(operation_id) + metric = self._state.metrics_by_operation_id.pop(operation_id) metric.result_completed_time = time.time() metric.total_duration = metric.result_completed_time - metric.request_received_time if metric.core_sent_input_time: @@ -171,18 +223,20 @@ def finalize_operation(self, operation_id: str) -> None: metric.input_overhead + o for o in metric.output_overheads] self._update_system_metrics(metric) - self.metrics_history.append(metric) + self._state.metrics_history.append(metric) self._save_metrics_to_csv(metric) def _update_timestamp(self, operation_id: str, field_name: str) -> None: if not self._is_valid_operation(operation_id): return now = time.time() - setattr(self.metrics_by_operation_id[operation_id], field_name, now) - self._update_system_metrics(self.metrics_by_operation_id[operation_id]) + setattr(self._state.metrics_by_operation_id[operation_id], field_name, now) + self._update_system_metrics( + self._state.metrics_by_operation_id[operation_id] + ) def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None: - if not self.enabled: + if not self._enabled: return avg_interval = 0.0 @@ -193,7 +247,7 @@ def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None: avg_interval = sum(intervals) / len(intervals) try: - with self.csv_path.open("a", newline="", encoding="utf-8") as fh: + with self.paths.csv_path.open("a", newline="", encoding="utf-8") as fh: writer = csv.writer(fh) writer.writerow([ metric.timestamp, diff --git a/simulation_bridge/test/unit/test_performance_monitor.py b/simulation_bridge/test/unit/test_performance_monitor.py index 50b5f9f4..be0f8bcc 100644 --- a/simulation_bridge/test/unit/test_performance_monitor.py +++ b/simulation_bridge/test/unit/test_performance_monitor.py @@ -74,8 +74,8 @@ def test_start_operation_creates_metric(monitor_enabled): monitor_enabled.start_operation( op_id, client_id="c1", protocol="rest", simulation_type="batch") - assert op_id in monitor_enabled.metrics_by_operation_id - metric = monitor_enabled.metrics_by_operation_id[op_id] + metric = monitor_enabled.get_metric(op_id) + assert metric is not None assert metric.operation_id == op_id assert metric.timestamp > 0 assert metric.client_id == "c1" @@ -90,10 +90,12 @@ def test_record_timestamps_update_fields(monitor_enabled): with patch("time.time", return_value=1234.5): monitor_enabled.record_core_received_input(op_id) - assert monitor_enabled.metrics_by_operation_id[op_id].core_received_input_time == pytest.approx(1234.5) + metric = monitor_enabled.get_metric(op_id) + assert metric and metric.core_received_input_time == pytest.approx(1234.5) monitor_enabled.record_core_sent_input(op_id) - assert monitor_enabled.metrics_by_operation_id[op_id].core_sent_input_time == pytest.approx(1234.5) + metric = monitor_enabled.get_metric(op_id) + assert metric and metric.core_sent_input_time == pytest.approx(1234.5) def test_record_core_received_result_appends_time_and_updates_metrics(monitor_enabled): @@ -105,7 +107,7 @@ def test_record_core_received_result_appends_time_and_updates_metrics(monitor_en patch.object(monitor_enabled, "_update_system_metrics") as mock_update, ): monitor_enabled.record_core_received_result(op_id) - metric = monitor_enabled.metrics_by_operation_id[op_id] + metric = monitor_enabled.get_metric(op_id) assert metric.result_times[-1] == pytest.approx(1000.0) mock_update.assert_called_once_with(metric) @@ -114,7 +116,7 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled): op_id = "op4" monitor_enabled.start_operation( op_id, client_id="c1", protocol="rest", simulation_type="batch") - metric = monitor_enabled.metrics_by_operation_id[op_id] + metric = monitor_enabled.get_metric(op_id) metric.request_received_time = 1.0 metric.core_sent_input_time = 2.0 @@ -128,7 +130,7 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled): ): monitor_enabled.finalize_operation(op_id) - assert op_id not in monitor_enabled.metrics_by_operation_id + assert monitor_enabled.get_metric(op_id) is None assert metric.total_duration == pytest.approx(9.0, abs=0.1) assert metric.input_overhead == pytest.approx(1.0, abs=0.1) assert metric.total_overheads == [2.0, 2.0, 2.0] @@ -146,5 +148,5 @@ def test_disabled_monitor_skips_methods(monitor_disabled): monitor_disabled.record_core_received_result(op_id) monitor_disabled.finalize_operation(op_id) - assert op_id not in monitor_disabled.metrics_by_operation_id - assert len(monitor_disabled.metrics_history) == 0 + assert monitor_disabled.get_metric(op_id) is None + assert len(monitor_disabled.history) == 0