From de7cddf1d47e52c9ab9db56885d4898c479a118e Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Mon, 14 Jul 2025 11:31:00 +0200 Subject: [PATCH] Use protocol-operation tuple for PerformanceMonitor --- simulation_bridge/docs/performance.md | 2 +- simulation_bridge/src/core/bridge_core.py | 8 +-- .../protocol_adapters/mqtt/mqtt_adapter.py | 4 +- .../rabbitmq/rabbitmq_adapter.py | 2 +- .../protocol_adapters/rest/rest_adapter.py | 4 +- .../src/utils/performance_monitor.py | 53 ++++++++++--------- .../test/unit/test_performance_monitor.py | 32 +++++------ 7 files changed, 53 insertions(+), 52 deletions(-) diff --git a/simulation_bridge/docs/performance.md b/simulation_bridge/docs/performance.md index fda53018..b746eb27 100644 --- a/simulation_bridge/docs/performance.md +++ b/simulation_bridge/docs/performance.md @@ -23,7 +23,7 @@ A new row is appended to the CSV each time the bridge processes a client request | **Timestamp** | Float (epoch seconds) | Time when `start_operation()` is called. Serves as the reference point for all time deltas. | | **Client ID** | String | Identifier of the requesting client. | | **Client Protocol** | String | Protocol used to communicate with the bridge (e.g., REST, MQTT). | -| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. | +| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. Internally paired with **Client Protocol** to avoid collisions when identical IDs originate from different clients. | | **Simulation Type** | String | Type of simulation requested (`batch`, `streaming`, or `interactive`). | | **Request Received Time** | Float (seconds) | When the bridge's event loop first registers the incoming API call. | | **Core Received Input Time** | Float (seconds) | The moment when the simulation core acknowledged and buffered the input payload (i.e., when it consumed the data from the bridge). The difference between `Core Received Input Time` and `Request Received Time` quantifies the signal overhead. | diff --git a/simulation_bridge/src/core/bridge_core.py b/simulation_bridge/src/core/bridge_core.py index 676c6050..9a597d85 100644 --- a/simulation_bridge/src/core/bridge_core.py +++ b/simulation_bridge/src/core/bridge_core.py @@ -145,7 +145,7 @@ def handle_input_message(self, sender, **kwargs): # pylint: disable=unused-argu operation_id = message_dict.get( 'simulation', {}).get( 'request_id', 'unknown') - performance_monitor.record_core_received_input(operation_id) + performance_monitor.record_core_received_input(operation_id, protocol) try: message = MessageModel.model_validate(message_dict) except Exception as e: # pylint: disable=broad-exception-caught @@ -188,9 +188,9 @@ def handle_result_rabbitmq_message(self, sender, **kwargs): # pylint: disable=u protocol='rabbitmq', operation_id=operation_id) status = message.get('status', 'unknown') - performance_monitor.record_result_sent(operation_id) + performance_monitor.record_result_sent(operation_id, 'rabbitmq') if status == 'completed': - performance_monitor.finalize_operation(operation_id) + performance_monitor.finalize_operation(operation_id, 'rabbitmq') def handle_result_unknown_message(self, sender, **kwargs): # pylint: disable=unused-argument """ @@ -241,7 +241,7 @@ def _publish_message(self, producer, consumer, message, # pylint: disable=too-m exchange, producer, consumer, protocol) # Record sent input time in performance monitor if exchange == 'ex.bridge.output': - performance_monitor.record_core_sent_input(operation_id) + performance_monitor.record_core_sent_input(operation_id, protocol) except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError) as e: logger.error("RabbitMQ connection error: %s", e) diff --git a/simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py b/simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py index dcdda0de..8f0a181b 100644 --- a/simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py +++ b/simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py @@ -237,9 +237,9 @@ def send_result(self, message): logger.debug( "Message published to MQTT topic '%s': %s", output_topic, message) status = message.get('status', 'unknown') - performance_monitor.record_result_sent(operation_id) + performance_monitor.record_result_sent(operation_id, 'mqtt') if status == 'completed': - performance_monitor.finalize_operation(operation_id) + performance_monitor.finalize_operation(operation_id, 'mqtt') except (ConnectionError, TimeoutError) as e: logger.error("Error publishing MQTT message: %s", e) diff --git a/simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py b/simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py index 0c1b0e0d..c4523554 100644 --- a/simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py +++ b/simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py @@ -154,7 +154,7 @@ def _process_message(self, ch, method, properties, body, queue_name): ) elif queue_name == 'Q.bridge.result': operation_id = message.get('request_id', 'unknown') - performance_monitor.record_core_received_result(operation_id) + performance_monitor.record_core_received_result(operation_id, 'rabbitmq') bridge_meta = message.get('bridge_meta', {}) if isinstance(bridge_meta, str): if bridge_meta.strip().startswith('{'): diff --git a/simulation_bridge/src/protocol_adapters/rest/rest_adapter.py b/simulation_bridge/src/protocol_adapters/rest/rest_adapter.py index dd5b3eaa..f2e216b4 100644 --- a/simulation_bridge/src/protocol_adapters/rest/rest_adapter.py +++ b/simulation_bridge/src/protocol_adapters/rest/rest_adapter.py @@ -228,9 +228,9 @@ def publish_result_message_rest(self, sender, **kwargs): destination = message.get('destinations', [])[0] self.send_result_sync(destination, message) status = message.get('status', 'unknown') - performance_monitor.record_result_sent(operation_id) + performance_monitor.record_result_sent(operation_id, 'rest') if status == 'completed': - performance_monitor.finalize_operation(operation_id) + performance_monitor.finalize_operation(operation_id, 'rest') logger.debug( "Successfully scheduled result message for REST client: %s", destination) diff --git a/simulation_bridge/src/utils/performance_monitor.py b/simulation_bridge/src/utils/performance_monitor.py index 80257be7..bf1151e3 100644 --- a/simulation_bridge/src/utils/performance_monitor.py +++ b/simulation_bridge/src/utils/performance_monitor.py @@ -4,7 +4,7 @@ import time from dataclasses import dataclass, field from pathlib import Path -from typing import Dict, Optional, Any, List +from typing import Dict, Optional, Any, List, Tuple from enum import Enum import psutil @@ -49,7 +49,7 @@ class _Paths: @dataclass class _RuntimeState: """Internal runtime state for the monitor.""" - metrics_by_operation_id: Dict[str, PerformanceMetrics] = field( + metrics_by_operation_id: Dict[Tuple[str, str], PerformanceMetrics] = field( default_factory=dict) metrics_history: List[PerformanceMetrics] = field(default_factory=list) process: Optional[psutil.Process] = None @@ -113,9 +113,9 @@ def history(self) -> List[PerformanceMetrics]: """Return the list of completed metrics.""" return list(self._state.metrics_history) - def get_metric(self, operation_id: str) -> Optional[PerformanceMetrics]: + def get_metric(self, operation_id: str, protocol: str) -> Optional[PerformanceMetrics]: """Return metrics for an ongoing operation.""" - return self._state.metrics_by_operation_id.get(operation_id) + return self._state.metrics_by_operation_id.get((operation_id, protocol)) def _write_csv_headers(self) -> None: if not self._enabled: @@ -155,8 +155,8 @@ def _update_system_metrics(self, metric: PerformanceMetrics) -> None: self._state.process.memory_info().rss / (1024 * 1024) ) - def _is_valid_operation(self, operation_id: str) -> bool: - return self._enabled and operation_id in self._state.metrics_by_operation_id + def _is_valid_operation(self, operation_id: str, protocol: str) -> bool: + return self._enabled and (operation_id, protocol) in self._state.metrics_by_operation_id def start_operation( self, @@ -165,11 +165,12 @@ def start_operation( protocol: str = "unknown", simulation_type: str = "unknown", ) -> None: - if not self._enabled or operation_id in self._state.metrics_by_operation_id: + key = (operation_id, protocol) + if not self._enabled or key in self._state.metrics_by_operation_id: return now = time.time() - self._state.metrics_by_operation_id[operation_id] = PerformanceMetrics( + self._state.metrics_by_operation_id[key] = PerformanceMetrics( client_id=client_id, client_protocol=protocol, operation_id=operation_id, @@ -180,25 +181,25 @@ def start_operation( core_sent_input_time=0.0, ) - def record_core_received_input(self, operation_id: str) -> None: - self.record_event(operation_id, EventType.CORE_RECEIVED_INPUT) + def record_core_received_input(self, operation_id: str, protocol: str) -> None: + self.record_event(operation_id, protocol, EventType.CORE_RECEIVED_INPUT) - def record_core_sent_input(self, operation_id: str) -> None: - self.record_event(operation_id, EventType.CORE_SENT_INPUT) + def record_core_sent_input(self, operation_id: str, protocol: str) -> None: + self.record_event(operation_id, protocol, EventType.CORE_SENT_INPUT) - def record_core_received_result(self, operation_id: str) -> None: - self.record_event(operation_id, EventType.CORE_RECEIVED_RESULT) + def record_core_received_result(self, operation_id: str, protocol: str) -> None: + self.record_event(operation_id, protocol, EventType.CORE_RECEIVED_RESULT) - def record_result_sent(self, operation_id: str) -> None: - self.record_event(operation_id, EventType.RESULT_SENT) + def record_result_sent(self, operation_id: str, protocol: str) -> None: + self.record_event(operation_id, protocol, EventType.RESULT_SENT) - def record_event(self, operation_id: str, event: EventType) -> None: + def record_event(self, operation_id: str, protocol: str, event: EventType) -> None: """Generic event recorder used by the specialized methods.""" - if not self._is_valid_operation(operation_id): + if not self._is_valid_operation(operation_id, protocol): return now = time.time() - metric = self._state.metrics_by_operation_id[operation_id] + metric = self._state.metrics_by_operation_id[(operation_id, protocol)] if event == EventType.CORE_RECEIVED_INPUT: metric.core_received_input_time = now @@ -213,10 +214,10 @@ def record_event(self, operation_id: str, event: EventType) -> None: self._update_system_metrics(metric) - def finalize_operation(self, operation_id: str) -> None: - if not self._is_valid_operation(operation_id): + def finalize_operation(self, operation_id: str, protocol: str) -> None: + if not self._is_valid_operation(operation_id, protocol): return - metric = self._state.metrics_by_operation_id.pop(operation_id) + metric = self._state.metrics_by_operation_id.pop((operation_id, protocol)) metric.result_completed_time = time.time() metric.total_duration = metric.result_completed_time - metric.request_received_time if metric.core_sent_input_time: @@ -228,16 +229,16 @@ def finalize_operation(self, operation_id: str) -> None: self._state.metrics_history.append(metric) self._save_metrics_to_csv(metric) - def _update_timestamp(self, operation_id: str, field_name: str) -> None: - if not self._is_valid_operation(operation_id): + def _update_timestamp(self, operation_id: str, protocol: str, field_name: str) -> None: + if not self._is_valid_operation(operation_id, protocol): return now = time.time() setattr( - self._state.metrics_by_operation_id[operation_id], + self._state.metrics_by_operation_id[(operation_id, protocol)], field_name, now) self._update_system_metrics( - self._state.metrics_by_operation_id[operation_id] + self._state.metrics_by_operation_id[(operation_id, protocol)] ) def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None: diff --git a/simulation_bridge/test/unit/test_performance_monitor.py b/simulation_bridge/test/unit/test_performance_monitor.py index 8a9a30f8..ebf06527 100644 --- a/simulation_bridge/test/unit/test_performance_monitor.py +++ b/simulation_bridge/test/unit/test_performance_monitor.py @@ -75,7 +75,7 @@ def test_start_operation_creates_metric(monitor_enabled): monitor_enabled.start_operation( op_id, client_id="c1", protocol="rest", simulation_type="batch") - metric = monitor_enabled.get_metric(op_id) + metric = monitor_enabled.get_metric(op_id, "rest") assert metric is not None assert metric.operation_id == op_id assert metric.timestamp > 0 @@ -90,13 +90,13 @@ def test_record_timestamps_update_fields(monitor_enabled): op_id, client_id="c1", protocol="rest", simulation_type="batch") with patch("time.time", return_value=1234.5): - monitor_enabled.record_core_received_input(op_id) - metric = monitor_enabled.get_metric(op_id) + monitor_enabled.record_core_received_input(op_id, "rest") + metric = monitor_enabled.get_metric(op_id, "rest") assert metric and metric.core_received_input_time == pytest.approx( 1234.5) - monitor_enabled.record_core_sent_input(op_id) - metric = monitor_enabled.get_metric(op_id) + monitor_enabled.record_core_sent_input(op_id, "rest") + metric = monitor_enabled.get_metric(op_id, "rest") assert metric and metric.core_sent_input_time == pytest.approx(1234.5) @@ -109,8 +109,8 @@ def test_record_core_received_result_appends_time_and_updates_metrics( patch("time.time", return_value=1000.0), patch.object(monitor_enabled, "_update_system_metrics") as mock_update, ): - monitor_enabled.record_core_received_result(op_id) - metric = monitor_enabled.get_metric(op_id) + monitor_enabled.record_core_received_result(op_id, "rest") + metric = monitor_enabled.get_metric(op_id, "rest") assert metric.result_times[-1] == pytest.approx(1000.0) mock_update.assert_called_once_with(metric) @@ -119,7 +119,7 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled): op_id = "op4" monitor_enabled.start_operation( op_id, client_id="c1", protocol="rest", simulation_type="batch") - metric = monitor_enabled.get_metric(op_id) + metric = monitor_enabled.get_metric(op_id, "rest") metric.request_received_time = 1.0 metric.core_sent_input_time = 2.0 @@ -131,9 +131,9 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled): patch.object(monitor_enabled, "_save_metrics_to_csv") as mock_save, patch.object(monitor_enabled, "_update_system_metrics") as mock_update, ): - monitor_enabled.finalize_operation(op_id) + monitor_enabled.finalize_operation(op_id, "rest") - assert monitor_enabled.get_metric(op_id) is None + assert monitor_enabled.get_metric(op_id, "rest") is None assert metric.total_duration == pytest.approx(9.0, abs=0.1) assert metric.input_overhead == pytest.approx(1.0, abs=0.1) assert metric.total_overheads == [2.0, 2.0, 2.0] @@ -146,11 +146,11 @@ def test_disabled_monitor_skips_methods(monitor_disabled): op_id = "op_disabled" monitor_disabled.start_operation( op_id, client_id="c1", protocol="rest", simulation_type="batch") - monitor_disabled.record_core_received_input(op_id) - monitor_disabled.record_core_sent_input(op_id) - monitor_disabled.record_result_sent(op_id) - monitor_disabled.record_core_received_result(op_id) - monitor_disabled.finalize_operation(op_id) + monitor_disabled.record_core_received_input(op_id, "rest") + monitor_disabled.record_core_sent_input(op_id, "rest") + monitor_disabled.record_result_sent(op_id, "rest") + monitor_disabled.record_core_received_result(op_id, "rest") + monitor_disabled.finalize_operation(op_id, "rest") - assert monitor_disabled.get_metric(op_id) is None + assert monitor_disabled.get_metric(op_id, "rest") is None assert len(monitor_disabled.history) == 0