Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions agents/matlab/matlab_agent/resources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ pip install pika pyyaml

## Clients

Two Python scripts are provided:
Three Python scripts are provided:

- **use_matlab_agent.py** – Simple client that sends a simulation request and
waits for results. Use this for _batch_ and _streaming_ simulations.
- **use_matlab_agent_interactive.py** – Async client for _interactive_
simulations. It streams input frames to MATLAB and prints outputs as they
arrive.
- **use_matlab_agent_interactive.py** – Async client for _interactive_ simulations. It streams input frames to MATLAB and prints outputs as they arrive.
- **use_matlab_agent_command.py** – Send `STOP`, `RUN`, or `CHECK` commands to a running MATLAB agent.

## Configuration

Expand Down Expand Up @@ -119,3 +118,26 @@ To run the batch simulation example, specify the full absolute path to the paylo
```bash
python use_matlab_agent.py --api-payload "/Users/foo/simulation-bridge/agents/matlab/matlab_agent/docs/examples/batch-simulation/api/simulation_batch.yaml.example"
```

## Control Commands

During a simulation you can remotely control the MATLAB agent by sending simple
command messages via RabbitMQ. Supported commands are:

- `RUN` – clear any previous stop request and allow the simulation to run.
- `STOP` – request the currently running simulation to terminate gracefully.
- `CHECK` – query the agent status (returns `running` or `stopped`).

To issue a command, publish a YAML message containing a `command` field to the
agent input exchange:

```yaml
command: STOP
```

You can use `use_matlab_agent_command.py` to send these commands from the command line. For example:

```bash
python use_matlab_agent_command.py STOP
```

126 changes: 126 additions & 0 deletions agents/matlab/matlab_agent/resources/use_matlab_agent_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""use_matlab_agent_command.py

Simple RabbitMQ client to send control commands (STOP, RUN, CHECK)
to a MATLAB agent.
"""

import argparse
import ssl
import uuid
from typing import Any, Dict

import pika
import yaml


class MatlabAgentCommandClient:
"""Client to send control commands to the MATLAB agent."""

def __init__(
self,
agent_identifier: str = "dt",
destination_identifier: str = "matlab",
config_path: str = "use.yaml",
) -> None:
self.agent_id = agent_identifier
self.destination_id = destination_identifier

self.config = self._load_yaml(config_path)
rabbit_cfg: Dict[str, Any] = self.config.get("rabbitmq", {})

credentials = pika.PlainCredentials(
rabbit_cfg.get("username", "guest"),
rabbit_cfg.get("password", "guest"),
)

tls_enabled = bool(rabbit_cfg.get("tls", False))
ssl_options = None
port = rabbit_cfg.get("port", 5671 if tls_enabled else 5672)
if tls_enabled:
context = ssl.create_default_context()
context.minimum_version = ssl.TLSVersion.TLSv1_2
ssl_options = pika.SSLOptions(context, rabbit_cfg.get("host", "localhost"))

params = pika.ConnectionParameters(
host=rabbit_cfg.get("host", "localhost"),
port=port,
virtual_host=rabbit_cfg.get("vhost", "/"),
credentials=credentials,
heartbeat=rabbit_cfg.get("heartbeat", 600),
ssl_options=ssl_options,
)

self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()

# Exchanges and result queue (used for CHECK)
self.channel.exchange_declare(
exchange="ex.bridge.output", exchange_type="topic", durable=True
)
self.channel.exchange_declare(
exchange="ex.sim.result", exchange_type="topic", durable=True
)
self.result_queue = f"Q.{self.agent_id}.matlab.result"
self.channel.queue_declare(queue=self.result_queue, durable=True)
self.channel.queue_bind(
exchange="ex.sim.result",
queue=self.result_queue,
routing_key=f"{self.destination_id}.result.{self.agent_id}",
)

def send_command(self, command: str, wait_response: bool = False) -> None:
"""Publish the command message and optionally wait for a response."""
cmd = command.upper()
self.channel.basic_publish(
exchange="ex.bridge.output",
routing_key=f"{self.agent_id}.{self.destination_id}",
body=yaml.dump({"command": cmd}),
properties=pika.BasicProperties(
delivery_mode=2,
content_type="application/x-yaml",
message_id=str(uuid.uuid4()),
),
)
print(f"[{self.agent_id.upper()}] Sent command '{cmd}'.")

if wait_response:
method, _props, body = self.channel.basic_get(
queue=self.result_queue, auto_ack=True
)
if method and body:
try:
response = yaml.safe_load(body)
print("Response:", response)
except yaml.YAMLError:
print("Invalid response received")
else:
print("No response received.")

@staticmethod
def _load_yaml(file_path: str) -> Dict[str, Any]:
with open(file_path, "r", encoding="utf-8") as fh:
return yaml.safe_load(fh)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Send control commands to the MATLAB agent"
)
parser.add_argument("command", help="Command to send: STOP, RUN, or CHECK")
parser.add_argument(
"--config", default="use.yaml", help="YAML configuration file"
)
args = parser.parse_args()

AGENT_ID = "dt"
DESTINATION = "matlab"

client = MatlabAgentCommandClient(
agent_identifier=AGENT_ID,
destination_identifier=DESTINATION,
config_path=args.config,
)

wait_resp = args.command.upper() == "CHECK"
client.send_command(args.command, wait_response=wait_resp)
client.connection.close()
24 changes: 24 additions & 0 deletions agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ...core.batch import handle_batch_simulation
from ...core.streaming import handle_streaming_simulation
from ...core.interactive import handle_interactive_simulation
from ...utils.commands import CommandRegistry

logger = get_logger()

Expand Down Expand Up @@ -139,6 +140,29 @@ def handle_message(
msg_dict = {}
msg_dict = yaml.safe_load(body)
logger.debug("Parsed message: %s", msg_dict)
# Handle simple command messages
if isinstance(msg_dict, dict) and 'command' in msg_dict:
cmd = msg_dict['command'].upper()
if cmd == 'STOP':
CommandRegistry.stop()
elif cmd == 'RUN':
CommandRegistry.reset()
elif cmd == 'CHECK':
status = 'stopped' if CommandRegistry.should_stop() else 'running'
self.rabbitmq_manager.send_result(
source,
create_response(
'success',
'',
'',
{},
outputs={'status': status},
bridge_meta='control',
request_id='control'
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
return
except yaml.YAMLError as e:
logger.error("YAML parsing error: %s", e)
error_response = create_response(
Expand Down
5 changes: 5 additions & 0 deletions agents/matlab/matlab_agent/src/core/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ..utils.create_response import create_response
from ..utils.logger import get_logger
from ..utils.performance_monitor import PerformanceMonitor
from ..utils.commands import CommandRegistry
from ..utils.constants import (
ACCEPT_TIMEOUT,
BUFFER_SIZE,
Expand Down Expand Up @@ -240,6 +241,9 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None:

try:
while True:
if CommandRegistry.should_stop():
logger.info("[INTERACTIVE] Stop command received")
return
if self.out_srv.matlab_proc and self.out_srv.matlab_proc.poll() is not None:
logger.debug("[INTERACTIVE] MATLAB process ended, stopping loop")
break
Expand All @@ -265,6 +269,7 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None:
logger.info("[INTERACTIVE] Interrupted by user")
finally:
pm.record_simulation_complete()
CommandRegistry.reset()

def close(self) -> None:
"""Close the TCP servers"""
Expand Down
5 changes: 5 additions & 0 deletions agents/matlab/matlab_agent/src/core/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ..utils.create_response import create_response
from ..utils.logger import get_logger
from ..utils.performance_monitor import PerformanceMonitor
from ..utils.commands import CommandRegistry

# Configure logger
logger = get_logger()
Expand Down Expand Up @@ -300,6 +301,9 @@ def run(self, inputs: Dict[str, Any], performance_monitor) -> None:
buffer = b""
sequence = 0
while True:
if CommandRegistry.should_stop():
logger.info("Stopping streaming simulation on command")
break
chunk = self.connection.connection.recv(4096)
if not chunk:
logger.debug("Connection closed")
Expand All @@ -316,6 +320,7 @@ def run(self, inputs: Dict[str, Any], performance_monitor) -> None:
except json.JSONDecodeError as e:
logger.warning("Invalid JSON: %s", str(e))
performance_monitor.record_simulation_complete()
CommandRegistry.reset()
except socket.timeout as e:
logger.error("Connection timeout: %s", str(e))
raise MatlabStreamingError("Connection timeout") from e
Expand Down
21 changes: 21 additions & 0 deletions agents/matlab/matlab_agent/src/utils/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import threading

class CommandRegistry:
"""Global command registry for simulation control."""

_stop_event = threading.Event()

@classmethod
def stop(cls) -> None:
"""Signal that the current simulation should stop."""
cls._stop_event.set()

@classmethod
def reset(cls) -> None:
"""Clear the stop flag, allowing simulations to run."""
cls._stop_event.clear()

@classmethod
def should_stop(cls) -> bool:
"""Check whether a stop command was issued."""
return cls._stop_event.is_set()
Loading