Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simulation_bridge/docs/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ A new row is appended to the CSV each time the bridge processes a client request
| **Timestamp** | Float (epoch seconds) | Time when `start_operation()` is called. Serves as the reference point for all time deltas. |
| **Client ID** | String | Identifier of the requesting client. |
| **Client Protocol** | String | Protocol used to communicate with the bridge (e.g., REST, MQTT). |
| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. |
| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. Internally paired with **Client Protocol** to avoid collisions when identical IDs originate from different clients. |
| **Simulation Type** | String | Type of simulation requested (`batch`, `streaming`, or `interactive`). |
| **Request Received Time** | Float (seconds) | When the bridge's event loop first registers the incoming API call. |
| **Core Received Input Time** | Float (seconds) | The moment when the simulation core acknowledged and buffered the input payload (i.e., when it consumed the data from the bridge). The difference between `Core Received Input Time` and `Request Received Time` quantifies the signal overhead. |
Expand Down
8 changes: 4 additions & 4 deletions simulation_bridge/src/core/bridge_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def handle_input_message(self, sender, **kwargs): # pylint: disable=unused-argu
operation_id = message_dict.get(
'simulation', {}).get(
'request_id', 'unknown')
performance_monitor.record_core_received_input(operation_id)
performance_monitor.record_core_received_input(operation_id, protocol)
try:
message = MessageModel.model_validate(message_dict)
except Exception as e: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -188,9 +188,9 @@ def handle_result_rabbitmq_message(self, sender, **kwargs): # pylint: disable=u
protocol='rabbitmq',
operation_id=operation_id)
status = message.get('status', 'unknown')
performance_monitor.record_result_sent(operation_id)
performance_monitor.record_result_sent(operation_id, 'rabbitmq')
if status == 'completed':
performance_monitor.finalize_operation(operation_id)
performance_monitor.finalize_operation(operation_id, 'rabbitmq')

def handle_result_unknown_message(self, sender, **kwargs): # pylint: disable=unused-argument
"""
Expand Down Expand Up @@ -241,7 +241,7 @@ def _publish_message(self, producer, consumer, message, # pylint: disable=too-m
exchange, producer, consumer, protocol)
# Record sent input time in performance monitor
if exchange == 'ex.bridge.output':
performance_monitor.record_core_sent_input(operation_id)
performance_monitor.record_core_sent_input(operation_id, protocol)
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
logger.error("RabbitMQ connection error: %s", e)
Expand Down
4 changes: 2 additions & 2 deletions simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ def send_result(self, message):
logger.debug(
"Message published to MQTT topic '%s': %s", output_topic, message)
status = message.get('status', 'unknown')
performance_monitor.record_result_sent(operation_id)
performance_monitor.record_result_sent(operation_id, 'mqtt')
if status == 'completed':
performance_monitor.finalize_operation(operation_id)
performance_monitor.finalize_operation(operation_id, 'mqtt')
except (ConnectionError, TimeoutError) as e:
logger.error("Error publishing MQTT message: %s", e)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _process_message(self, ch, method, properties, body, queue_name):
)
elif queue_name == 'Q.bridge.result':
operation_id = message.get('request_id', 'unknown')
performance_monitor.record_core_received_result(operation_id)
performance_monitor.record_core_received_result(operation_id, 'rabbitmq')
bridge_meta = message.get('bridge_meta', {})
if isinstance(bridge_meta, str):
if bridge_meta.strip().startswith('{'):
Expand Down
4 changes: 2 additions & 2 deletions simulation_bridge/src/protocol_adapters/rest/rest_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ def publish_result_message_rest(self, sender, **kwargs):
destination = message.get('destinations', [])[0]
self.send_result_sync(destination, message)
status = message.get('status', 'unknown')
performance_monitor.record_result_sent(operation_id)
performance_monitor.record_result_sent(operation_id, 'rest')
if status == 'completed':
performance_monitor.finalize_operation(operation_id)
performance_monitor.finalize_operation(operation_id, 'rest')
logger.debug(
"Successfully scheduled result message for REST client: %s",
destination)
Expand Down
53 changes: 27 additions & 26 deletions simulation_bridge/src/utils/performance_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional, Any, List
from typing import Dict, Optional, Any, List, Tuple
from enum import Enum

import psutil
Expand Down Expand Up @@ -49,7 +49,7 @@ class _Paths:
@dataclass
class _RuntimeState:
"""Internal runtime state for the monitor."""
metrics_by_operation_id: Dict[str, PerformanceMetrics] = field(
metrics_by_operation_id: Dict[Tuple[str, str], PerformanceMetrics] = field(
default_factory=dict)
metrics_history: List[PerformanceMetrics] = field(default_factory=list)
process: Optional[psutil.Process] = None
Expand Down Expand Up @@ -113,9 +113,9 @@ def history(self) -> List[PerformanceMetrics]:
"""Return the list of completed metrics."""
return list(self._state.metrics_history)

def get_metric(self, operation_id: str) -> Optional[PerformanceMetrics]:
def get_metric(self, operation_id: str, protocol: str) -> Optional[PerformanceMetrics]:
"""Return metrics for an ongoing operation."""
return self._state.metrics_by_operation_id.get(operation_id)
return self._state.metrics_by_operation_id.get((operation_id, protocol))

def _write_csv_headers(self) -> None:
if not self._enabled:
Expand Down Expand Up @@ -155,8 +155,8 @@ def _update_system_metrics(self, metric: PerformanceMetrics) -> None:
self._state.process.memory_info().rss / (1024 * 1024)
)

def _is_valid_operation(self, operation_id: str) -> bool:
return self._enabled and operation_id in self._state.metrics_by_operation_id
def _is_valid_operation(self, operation_id: str, protocol: str) -> bool:
return self._enabled and (operation_id, protocol) in self._state.metrics_by_operation_id

def start_operation(
self,
Expand All @@ -165,11 +165,12 @@ def start_operation(
protocol: str = "unknown",
simulation_type: str = "unknown",
) -> None:
if not self._enabled or operation_id in self._state.metrics_by_operation_id:
key = (operation_id, protocol)
if not self._enabled or key in self._state.metrics_by_operation_id:
return

now = time.time()
self._state.metrics_by_operation_id[operation_id] = PerformanceMetrics(
self._state.metrics_by_operation_id[key] = PerformanceMetrics(
client_id=client_id,
client_protocol=protocol,
operation_id=operation_id,
Expand All @@ -180,25 +181,25 @@ def start_operation(
core_sent_input_time=0.0,
)

def record_core_received_input(self, operation_id: str) -> None:
self.record_event(operation_id, EventType.CORE_RECEIVED_INPUT)
def record_core_received_input(self, operation_id: str, protocol: str) -> None:
self.record_event(operation_id, protocol, EventType.CORE_RECEIVED_INPUT)

def record_core_sent_input(self, operation_id: str) -> None:
self.record_event(operation_id, EventType.CORE_SENT_INPUT)
def record_core_sent_input(self, operation_id: str, protocol: str) -> None:
self.record_event(operation_id, protocol, EventType.CORE_SENT_INPUT)

def record_core_received_result(self, operation_id: str) -> None:
self.record_event(operation_id, EventType.CORE_RECEIVED_RESULT)
def record_core_received_result(self, operation_id: str, protocol: str) -> None:
self.record_event(operation_id, protocol, EventType.CORE_RECEIVED_RESULT)

def record_result_sent(self, operation_id: str) -> None:
self.record_event(operation_id, EventType.RESULT_SENT)
def record_result_sent(self, operation_id: str, protocol: str) -> None:
self.record_event(operation_id, protocol, EventType.RESULT_SENT)

def record_event(self, operation_id: str, event: EventType) -> None:
def record_event(self, operation_id: str, protocol: str, event: EventType) -> None:
"""Generic event recorder used by the specialized methods."""
if not self._is_valid_operation(operation_id):
if not self._is_valid_operation(operation_id, protocol):
return

now = time.time()
metric = self._state.metrics_by_operation_id[operation_id]
metric = self._state.metrics_by_operation_id[(operation_id, protocol)]

if event == EventType.CORE_RECEIVED_INPUT:
metric.core_received_input_time = now
Expand All @@ -213,10 +214,10 @@ def record_event(self, operation_id: str, event: EventType) -> None:

self._update_system_metrics(metric)

def finalize_operation(self, operation_id: str) -> None:
if not self._is_valid_operation(operation_id):
def finalize_operation(self, operation_id: str, protocol: str) -> None:
if not self._is_valid_operation(operation_id, protocol):
return
metric = self._state.metrics_by_operation_id.pop(operation_id)
metric = self._state.metrics_by_operation_id.pop((operation_id, protocol))
metric.result_completed_time = time.time()
metric.total_duration = metric.result_completed_time - metric.request_received_time
if metric.core_sent_input_time:
Expand All @@ -228,16 +229,16 @@ def finalize_operation(self, operation_id: str) -> None:
self._state.metrics_history.append(metric)
self._save_metrics_to_csv(metric)

def _update_timestamp(self, operation_id: str, field_name: str) -> None:
if not self._is_valid_operation(operation_id):
def _update_timestamp(self, operation_id: str, protocol: str, field_name: str) -> None:
if not self._is_valid_operation(operation_id, protocol):
return
now = time.time()
setattr(
self._state.metrics_by_operation_id[operation_id],
self._state.metrics_by_operation_id[(operation_id, protocol)],
field_name,
now)
self._update_system_metrics(
self._state.metrics_by_operation_id[operation_id]
self._state.metrics_by_operation_id[(operation_id, protocol)]
)

def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None:
Expand Down
32 changes: 16 additions & 16 deletions simulation_bridge/test/unit/test_performance_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_start_operation_creates_metric(monitor_enabled):
monitor_enabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")

metric = monitor_enabled.get_metric(op_id)
metric = monitor_enabled.get_metric(op_id, "rest")
assert metric is not None
assert metric.operation_id == op_id
assert metric.timestamp > 0
Expand All @@ -90,13 +90,13 @@ def test_record_timestamps_update_fields(monitor_enabled):
op_id, client_id="c1", protocol="rest", simulation_type="batch")

with patch("time.time", return_value=1234.5):
monitor_enabled.record_core_received_input(op_id)
metric = monitor_enabled.get_metric(op_id)
monitor_enabled.record_core_received_input(op_id, "rest")
metric = monitor_enabled.get_metric(op_id, "rest")
assert metric and metric.core_received_input_time == pytest.approx(
1234.5)

monitor_enabled.record_core_sent_input(op_id)
metric = monitor_enabled.get_metric(op_id)
monitor_enabled.record_core_sent_input(op_id, "rest")
metric = monitor_enabled.get_metric(op_id, "rest")
assert metric and metric.core_sent_input_time == pytest.approx(1234.5)


Expand All @@ -109,8 +109,8 @@ def test_record_core_received_result_appends_time_and_updates_metrics(
patch("time.time", return_value=1000.0),
patch.object(monitor_enabled, "_update_system_metrics") as mock_update,
):
monitor_enabled.record_core_received_result(op_id)
metric = monitor_enabled.get_metric(op_id)
monitor_enabled.record_core_received_result(op_id, "rest")
metric = monitor_enabled.get_metric(op_id, "rest")
assert metric.result_times[-1] == pytest.approx(1000.0)
mock_update.assert_called_once_with(metric)

Expand All @@ -119,7 +119,7 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled):
op_id = "op4"
monitor_enabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")
metric = monitor_enabled.get_metric(op_id)
metric = monitor_enabled.get_metric(op_id, "rest")

metric.request_received_time = 1.0
metric.core_sent_input_time = 2.0
Expand All @@ -131,9 +131,9 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled):
patch.object(monitor_enabled, "_save_metrics_to_csv") as mock_save,
patch.object(monitor_enabled, "_update_system_metrics") as mock_update,
):
monitor_enabled.finalize_operation(op_id)
monitor_enabled.finalize_operation(op_id, "rest")

assert monitor_enabled.get_metric(op_id) is None
assert monitor_enabled.get_metric(op_id, "rest") is None
assert metric.total_duration == pytest.approx(9.0, abs=0.1)
assert metric.input_overhead == pytest.approx(1.0, abs=0.1)
assert metric.total_overheads == [2.0, 2.0, 2.0]
Expand All @@ -146,11 +146,11 @@ def test_disabled_monitor_skips_methods(monitor_disabled):
op_id = "op_disabled"
monitor_disabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")
monitor_disabled.record_core_received_input(op_id)
monitor_disabled.record_core_sent_input(op_id)
monitor_disabled.record_result_sent(op_id)
monitor_disabled.record_core_received_result(op_id)
monitor_disabled.finalize_operation(op_id)
monitor_disabled.record_core_received_input(op_id, "rest")
monitor_disabled.record_core_sent_input(op_id, "rest")
monitor_disabled.record_result_sent(op_id, "rest")
monitor_disabled.record_core_received_result(op_id, "rest")
monitor_disabled.finalize_operation(op_id, "rest")

assert monitor_disabled.get_metric(op_id) is None
assert monitor_disabled.get_metric(op_id, "rest") is None
assert len(monitor_disabled.history) == 0
Loading