Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 101 additions & 47 deletions simulation_bridge/src/utils/performance_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional, Any, List
from enum import Enum

import psutil

Expand All @@ -16,6 +17,7 @@

@dataclass
class PerformanceMetrics:
"""Metrics collected for a single operation."""
client_id: str
client_protocol: str
operation_id: str
Expand All @@ -34,6 +36,31 @@ class PerformanceMetrics:
output_overheads: List[float] = field(default_factory=list)
total_overheads: List[float] = field(default_factory=list)


@dataclass
class _Paths:
"""Internal storage for output paths."""
output_dir: Path = Path("performance_logs")
csv_path: Path = field(
default_factory=lambda: Path("performance_logs/performance_metrics.csv"))


@dataclass
class _RuntimeState:
"""Internal runtime state for the monitor."""
metrics_by_operation_id: Dict[str, PerformanceMetrics] = field(
default_factory=dict)
metrics_history: List[PerformanceMetrics] = field(default_factory=list)
process: Optional[psutil.Process] = None


class EventType(Enum):
"""Supported performance events."""
CORE_RECEIVED_INPUT = "core_received_input"
CORE_SENT_INPUT = "core_sent_input"
CORE_RECEIVED_RESULT = "core_received_result"
RESULT_SENT = "result_sent"

class PerformanceMonitor:
_instance: Optional[PerformanceMonitor] = None
_initialized: bool = False
Expand All @@ -47,41 +74,52 @@ def __init__(self, config: Optional[Dict[str, Any]] = None):
if self._initialized:
return

self.enabled: bool = False
self.output_dir: Path = Path("performance_logs")
self.csv_path: Path = self.output_dir / "performance_metrics.csv"
self.metrics_by_operation_id: Dict[str, PerformanceMetrics] = {}
self.metrics_history: List[PerformanceMetrics] = []
self.process: Optional[psutil.Process] = None
self._enabled: bool = False
self.paths = _Paths()
self._state = _RuntimeState()

if config:
perf_cfg = config.get("performance", {})
self.enabled = perf_cfg.get("enabled", False)
log_file = perf_cfg.get("file", str(self.csv_path))
self.output_dir = Path(log_file).parent
self.csv_path = Path(log_file)
self._enabled = perf_cfg.get("enabled", False)
log_file = perf_cfg.get("file", str(self.paths.csv_path))
self.paths.output_dir = Path(log_file).parent
self.paths.csv_path = Path(log_file)

if self.enabled:
if self._enabled:
try:
self.output_dir.mkdir(parents=True, exist_ok=True)
self.process = psutil.Process()
if not self.csv_path.exists():
self.paths.output_dir.mkdir(parents=True, exist_ok=True)
self._state.process = psutil.Process()
if not self.paths.csv_path.exists():
self._write_csv_headers()
logger.debug("PERFORMANCE - Logging to %s", self.csv_path)
logger.debug("PERFORMANCE - Logging to %s", self.paths.csv_path)
except Exception as exc:
logger.error(
"Failed to initialize performance monitoring: %s", exc)
self.enabled = False
self._enabled = False
else:
logger.debug("PERFORMANCE - Monitoring disabled")

self._initialized = True

@property
def enabled(self) -> bool:
"""Return whether performance monitoring is active."""
return self._enabled

@property
def history(self) -> List[PerformanceMetrics]:
"""Return the list of completed metrics."""
return list(self._state.metrics_history)

def get_metric(self, operation_id: str) -> Optional[PerformanceMetrics]:
"""Return metrics for an ongoing operation."""
return self._state.metrics_by_operation_id.get(operation_id)

def _write_csv_headers(self) -> None:
if not self.enabled:
if not self._enabled:
return
try:
with self.csv_path.open("w", newline="", encoding="utf-8") as fh:
with self.paths.csv_path.open("w", newline="", encoding="utf-8") as fh:
writer = csv.writer(fh)
writer.writerow([
"Timestamp",
Expand All @@ -106,24 +144,30 @@ def _write_csv_headers(self) -> None:
])
except Exception as exc:
logger.error("Failed to write CSV headers: %s", exc)
self.enabled = False
self._enabled = False

def _update_system_metrics(self, metric: PerformanceMetrics) -> None:
if self.process:
metric.cpu_percent = self.process.cpu_percent()
metric.memory_rss_mb = self.process.memory_info().rss / (1024 * 1024)
if self._state.process:
metric.cpu_percent = self._state.process.cpu_percent()
metric.memory_rss_mb = (
self._state.process.memory_info().rss / (1024 * 1024)
)

def _is_valid_operation(self, operation_id: str) -> bool:
return self.enabled and operation_id in self.metrics_by_operation_id
return self._enabled and operation_id in self._state.metrics_by_operation_id

def start_operation(self, operation_id: str, client_id: str = "unknown",
protocol: str = "unknown",
simulation_type: str = "unknown") -> None:
if not self.enabled or operation_id in self.metrics_by_operation_id:
def start_operation(
self,
operation_id: str,
client_id: str = "unknown",
protocol: str = "unknown",
simulation_type: str = "unknown",
) -> None:
if not self._enabled or operation_id in self._state.metrics_by_operation_id:
return

now = time.time()
self.metrics_by_operation_id[operation_id] = PerformanceMetrics(
self._state.metrics_by_operation_id[operation_id] = PerformanceMetrics(
client_id=client_id,
client_protocol=protocol,
operation_id=operation_id,
Expand All @@ -135,34 +179,42 @@ def start_operation(self, operation_id: str, client_id: str = "unknown",
)

def record_core_received_input(self, operation_id: str) -> None:
self._update_timestamp(operation_id, "core_received_input_time")
self.record_event(operation_id, EventType.CORE_RECEIVED_INPUT)

def record_core_sent_input(self, operation_id: str) -> None:
self._update_timestamp(operation_id, "core_sent_input_time")
self.record_event(operation_id, EventType.CORE_SENT_INPUT)

def record_core_received_result(self, operation_id: str) -> None:
if not self._is_valid_operation(operation_id):
return
now = time.time()
metric = self.metrics_by_operation_id[operation_id]
metric.result_times.append(now)
self._update_system_metrics(metric)
self.record_event(operation_id, EventType.CORE_RECEIVED_RESULT)

def record_result_sent(self, operation_id: str) -> None:
self.record_event(operation_id, EventType.RESULT_SENT)

def record_event(self, operation_id: str, event: EventType) -> None:
"""Generic event recorder used by the specialized methods."""
if not self._is_valid_operation(operation_id):
return

now = time.time()
metric = self.metrics_by_operation_id[operation_id]
metric.result_sent_times.append(now)
if metric.result_times:
overhead = now - metric.result_times[-1]
metric.output_overheads.append(overhead)
metric = self._state.metrics_by_operation_id[operation_id]

if event == EventType.CORE_RECEIVED_INPUT:
metric.core_received_input_time = now
elif event == EventType.CORE_SENT_INPUT:
metric.core_sent_input_time = now
elif event == EventType.CORE_RECEIVED_RESULT:
metric.result_times.append(now)
elif event == EventType.RESULT_SENT:
metric.result_sent_times.append(now)
if metric.result_times:
metric.output_overheads.append(now - metric.result_times[-1])

self._update_system_metrics(metric)

def finalize_operation(self, operation_id: str) -> None:
if not self._is_valid_operation(operation_id):
return
metric = self.metrics_by_operation_id.pop(operation_id)
metric = self._state.metrics_by_operation_id.pop(operation_id)
metric.result_completed_time = time.time()
metric.total_duration = metric.result_completed_time - metric.request_received_time
if metric.core_sent_input_time:
Expand All @@ -171,18 +223,20 @@ def finalize_operation(self, operation_id: str) -> None:
metric.input_overhead +
o for o in metric.output_overheads]
self._update_system_metrics(metric)
self.metrics_history.append(metric)
self._state.metrics_history.append(metric)
self._save_metrics_to_csv(metric)

def _update_timestamp(self, operation_id: str, field_name: str) -> None:
if not self._is_valid_operation(operation_id):
return
now = time.time()
setattr(self.metrics_by_operation_id[operation_id], field_name, now)
self._update_system_metrics(self.metrics_by_operation_id[operation_id])
setattr(self._state.metrics_by_operation_id[operation_id], field_name, now)
self._update_system_metrics(
self._state.metrics_by_operation_id[operation_id]
)

def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None:
if not self.enabled:
if not self._enabled:
return

avg_interval = 0.0
Expand All @@ -193,7 +247,7 @@ def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None:
avg_interval = sum(intervals) / len(intervals)

try:
with self.csv_path.open("a", newline="", encoding="utf-8") as fh:
with self.paths.csv_path.open("a", newline="", encoding="utf-8") as fh:
writer = csv.writer(fh)
writer.writerow([
metric.timestamp,
Expand Down
20 changes: 11 additions & 9 deletions simulation_bridge/test/unit/test_performance_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def test_start_operation_creates_metric(monitor_enabled):
monitor_enabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")

assert op_id in monitor_enabled.metrics_by_operation_id
metric = monitor_enabled.metrics_by_operation_id[op_id]
metric = monitor_enabled.get_metric(op_id)
assert metric is not None
assert metric.operation_id == op_id
assert metric.timestamp > 0
assert metric.client_id == "c1"
Expand All @@ -90,10 +90,12 @@ def test_record_timestamps_update_fields(monitor_enabled):

with patch("time.time", return_value=1234.5):
monitor_enabled.record_core_received_input(op_id)
assert monitor_enabled.metrics_by_operation_id[op_id].core_received_input_time == pytest.approx(1234.5)
metric = monitor_enabled.get_metric(op_id)
assert metric and metric.core_received_input_time == pytest.approx(1234.5)

monitor_enabled.record_core_sent_input(op_id)
assert monitor_enabled.metrics_by_operation_id[op_id].core_sent_input_time == pytest.approx(1234.5)
metric = monitor_enabled.get_metric(op_id)
assert metric and metric.core_sent_input_time == pytest.approx(1234.5)


def test_record_core_received_result_appends_time_and_updates_metrics(monitor_enabled):
Expand All @@ -105,7 +107,7 @@ def test_record_core_received_result_appends_time_and_updates_metrics(monitor_en
patch.object(monitor_enabled, "_update_system_metrics") as mock_update,
):
monitor_enabled.record_core_received_result(op_id)
metric = monitor_enabled.metrics_by_operation_id[op_id]
metric = monitor_enabled.get_metric(op_id)
assert metric.result_times[-1] == pytest.approx(1000.0)
mock_update.assert_called_once_with(metric)

Expand All @@ -114,7 +116,7 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled):
op_id = "op4"
monitor_enabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")
metric = monitor_enabled.metrics_by_operation_id[op_id]
metric = monitor_enabled.get_metric(op_id)

metric.request_received_time = 1.0
metric.core_sent_input_time = 2.0
Expand All @@ -128,7 +130,7 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled):
):
monitor_enabled.finalize_operation(op_id)

assert op_id not in monitor_enabled.metrics_by_operation_id
assert monitor_enabled.get_metric(op_id) is None
assert metric.total_duration == pytest.approx(9.0, abs=0.1)
assert metric.input_overhead == pytest.approx(1.0, abs=0.1)
assert metric.total_overheads == [2.0, 2.0, 2.0]
Expand All @@ -146,5 +148,5 @@ def test_disabled_monitor_skips_methods(monitor_disabled):
monitor_disabled.record_core_received_result(op_id)
monitor_disabled.finalize_operation(op_id)

assert op_id not in monitor_disabled.metrics_by_operation_id
assert len(monitor_disabled.metrics_history) == 0
assert monitor_disabled.get_metric(op_id) is None
assert len(monitor_disabled.history) == 0