diff --git a/agents/matlab/matlab_agent/resources/README.md b/agents/matlab/matlab_agent/resources/README.md index bf2490be..50339caf 100644 --- a/agents/matlab/matlab_agent/resources/README.md +++ b/agents/matlab/matlab_agent/resources/README.md @@ -25,13 +25,12 @@ pip install pika pyyaml ## Clients -Two Python scripts are provided: +Three Python scripts are provided: - **use_matlab_agent.py** – Simple client that sends a simulation request and waits for results. Use this for _batch_ and _streaming_ simulations. -- **use_matlab_agent_interactive.py** – Async client for _interactive_ - simulations. It streams input frames to MATLAB and prints outputs as they - arrive. +- **use_matlab_agent_interactive.py** – Async client for _interactive_ simulations. It streams input frames to MATLAB and prints outputs as they arrive. +- **use_matlab_agent_command.py** – Send `STOP`, `RUN`, or `CHECK` commands to a running MATLAB agent. ## Configuration @@ -119,3 +118,26 @@ To run the batch simulation example, specify the full absolute path to the paylo ```bash python use_matlab_agent.py --api-payload "/Users/foo/simulation-bridge/agents/matlab/matlab_agent/docs/examples/batch-simulation/api/simulation_batch.yaml.example" ``` + +## Control Commands + +During a simulation you can remotely control the MATLAB agent by sending simple +command messages via RabbitMQ. Supported commands are: + +- `RUN` – clear any previous stop request and allow the simulation to run. +- `STOP` – request the currently running simulation to terminate gracefully. +- `CHECK` – query the agent status (returns `running` or `stopped`). + +To issue a command, publish a YAML message containing a `command` field to the +agent input exchange: + +```yaml +command: STOP +``` + +You can use `use_matlab_agent_command.py` to send these commands from the command line. For example: + +```bash +python use_matlab_agent_command.py STOP +``` + diff --git a/agents/matlab/matlab_agent/resources/use_matlab_agent_command.py b/agents/matlab/matlab_agent/resources/use_matlab_agent_command.py new file mode 100644 index 00000000..6071b522 --- /dev/null +++ b/agents/matlab/matlab_agent/resources/use_matlab_agent_command.py @@ -0,0 +1,126 @@ +"""use_matlab_agent_command.py + +Simple RabbitMQ client to send control commands (STOP, RUN, CHECK) + to a MATLAB agent. +""" + +import argparse +import ssl +import uuid +from typing import Any, Dict + +import pika +import yaml + + +class MatlabAgentCommandClient: + """Client to send control commands to the MATLAB agent.""" + + def __init__( + self, + agent_identifier: str = "dt", + destination_identifier: str = "matlab", + config_path: str = "use.yaml", + ) -> None: + self.agent_id = agent_identifier + self.destination_id = destination_identifier + + self.config = self._load_yaml(config_path) + rabbit_cfg: Dict[str, Any] = self.config.get("rabbitmq", {}) + + credentials = pika.PlainCredentials( + rabbit_cfg.get("username", "guest"), + rabbit_cfg.get("password", "guest"), + ) + + tls_enabled = bool(rabbit_cfg.get("tls", False)) + ssl_options = None + port = rabbit_cfg.get("port", 5671 if tls_enabled else 5672) + if tls_enabled: + context = ssl.create_default_context() + context.minimum_version = ssl.TLSVersion.TLSv1_2 + ssl_options = pika.SSLOptions(context, rabbit_cfg.get("host", "localhost")) + + params = pika.ConnectionParameters( + host=rabbit_cfg.get("host", "localhost"), + port=port, + virtual_host=rabbit_cfg.get("vhost", "/"), + credentials=credentials, + heartbeat=rabbit_cfg.get("heartbeat", 600), + ssl_options=ssl_options, + ) + + self.connection = pika.BlockingConnection(params) + self.channel = self.connection.channel() + + # Exchanges and result queue (used for CHECK) + self.channel.exchange_declare( + exchange="ex.bridge.output", exchange_type="topic", durable=True + ) + self.channel.exchange_declare( + exchange="ex.sim.result", exchange_type="topic", durable=True + ) + self.result_queue = f"Q.{self.agent_id}.matlab.result" + self.channel.queue_declare(queue=self.result_queue, durable=True) + self.channel.queue_bind( + exchange="ex.sim.result", + queue=self.result_queue, + routing_key=f"{self.destination_id}.result.{self.agent_id}", + ) + + def send_command(self, command: str, wait_response: bool = False) -> None: + """Publish the command message and optionally wait for a response.""" + cmd = command.upper() + self.channel.basic_publish( + exchange="ex.bridge.output", + routing_key=f"{self.agent_id}.{self.destination_id}", + body=yaml.dump({"command": cmd}), + properties=pika.BasicProperties( + delivery_mode=2, + content_type="application/x-yaml", + message_id=str(uuid.uuid4()), + ), + ) + print(f"[{self.agent_id.upper()}] Sent command '{cmd}'.") + + if wait_response: + method, _props, body = self.channel.basic_get( + queue=self.result_queue, auto_ack=True + ) + if method and body: + try: + response = yaml.safe_load(body) + print("Response:", response) + except yaml.YAMLError: + print("Invalid response received") + else: + print("No response received.") + + @staticmethod + def _load_yaml(file_path: str) -> Dict[str, Any]: + with open(file_path, "r", encoding="utf-8") as fh: + return yaml.safe_load(fh) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Send control commands to the MATLAB agent" + ) + parser.add_argument("command", help="Command to send: STOP, RUN, or CHECK") + parser.add_argument( + "--config", default="use.yaml", help="YAML configuration file" + ) + args = parser.parse_args() + + AGENT_ID = "dt" + DESTINATION = "matlab" + + client = MatlabAgentCommandClient( + agent_identifier=AGENT_ID, + destination_identifier=DESTINATION, + config_path=args.config, + ) + + wait_resp = args.command.upper() == "CHECK" + client.send_command(args.command, wait_response=wait_resp) + client.connection.close() diff --git a/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py b/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py index 658cacde..6e692dd4 100644 --- a/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py +++ b/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py @@ -18,6 +18,7 @@ from ...core.batch import handle_batch_simulation from ...core.streaming import handle_streaming_simulation from ...core.interactive import handle_interactive_simulation +from ...utils.commands import CommandRegistry logger = get_logger() @@ -139,6 +140,29 @@ def handle_message( msg_dict = {} msg_dict = yaml.safe_load(body) logger.debug("Parsed message: %s", msg_dict) + # Handle simple command messages + if isinstance(msg_dict, dict) and 'command' in msg_dict: + cmd = msg_dict['command'].upper() + if cmd == 'STOP': + CommandRegistry.stop() + elif cmd == 'RUN': + CommandRegistry.reset() + elif cmd == 'CHECK': + status = 'stopped' if CommandRegistry.should_stop() else 'running' + self.rabbitmq_manager.send_result( + source, + create_response( + 'success', + '', + '', + {}, + outputs={'status': status}, + bridge_meta='control', + request_id='control' + ) + ) + ch.basic_ack(delivery_tag=method.delivery_tag) + return except yaml.YAMLError as e: logger.error("YAML parsing error: %s", e) error_response = create_response( diff --git a/agents/matlab/matlab_agent/src/core/interactive.py b/agents/matlab/matlab_agent/src/core/interactive.py index 954b3f87..8ef71e69 100644 --- a/agents/matlab/matlab_agent/src/core/interactive.py +++ b/agents/matlab/matlab_agent/src/core/interactive.py @@ -17,6 +17,7 @@ from ..utils.create_response import create_response from ..utils.logger import get_logger from ..utils.performance_monitor import PerformanceMonitor +from ..utils.commands import CommandRegistry from ..utils.constants import ( ACCEPT_TIMEOUT, BUFFER_SIZE, @@ -240,6 +241,9 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: try: while True: + if CommandRegistry.should_stop(): + logger.info("[INTERACTIVE] Stop command received") + return if self.out_srv.matlab_proc and self.out_srv.matlab_proc.poll() is not None: logger.debug("[INTERACTIVE] MATLAB process ended, stopping loop") break @@ -265,6 +269,7 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: logger.info("[INTERACTIVE] Interrupted by user") finally: pm.record_simulation_complete() + CommandRegistry.reset() def close(self) -> None: """Close the TCP servers""" diff --git a/agents/matlab/matlab_agent/src/core/streaming.py b/agents/matlab/matlab_agent/src/core/streaming.py index 476b7340..154e6a62 100644 --- a/agents/matlab/matlab_agent/src/core/streaming.py +++ b/agents/matlab/matlab_agent/src/core/streaming.py @@ -20,6 +20,7 @@ from ..utils.create_response import create_response from ..utils.logger import get_logger from ..utils.performance_monitor import PerformanceMonitor +from ..utils.commands import CommandRegistry # Configure logger logger = get_logger() @@ -300,6 +301,9 @@ def run(self, inputs: Dict[str, Any], performance_monitor) -> None: buffer = b"" sequence = 0 while True: + if CommandRegistry.should_stop(): + logger.info("Stopping streaming simulation on command") + break chunk = self.connection.connection.recv(4096) if not chunk: logger.debug("Connection closed") @@ -316,6 +320,7 @@ def run(self, inputs: Dict[str, Any], performance_monitor) -> None: except json.JSONDecodeError as e: logger.warning("Invalid JSON: %s", str(e)) performance_monitor.record_simulation_complete() + CommandRegistry.reset() except socket.timeout as e: logger.error("Connection timeout: %s", str(e)) raise MatlabStreamingError("Connection timeout") from e diff --git a/agents/matlab/matlab_agent/src/utils/commands.py b/agents/matlab/matlab_agent/src/utils/commands.py new file mode 100644 index 00000000..6f22b42b --- /dev/null +++ b/agents/matlab/matlab_agent/src/utils/commands.py @@ -0,0 +1,21 @@ +import threading + +class CommandRegistry: + """Global command registry for simulation control.""" + + _stop_event = threading.Event() + + @classmethod + def stop(cls) -> None: + """Signal that the current simulation should stop.""" + cls._stop_event.set() + + @classmethod + def reset(cls) -> None: + """Clear the stop flag, allowing simulations to run.""" + cls._stop_event.clear() + + @classmethod + def should_stop(cls) -> bool: + """Check whether a stop command was issued.""" + return cls._stop_event.is_set()