diff --git a/docs/developer/hyperion/hyperion-blueapi.rst b/docs/developer/hyperion/hyperion-blueapi.rst index b4fe1e082..04706e7f9 100644 --- a/docs/developer/hyperion/hyperion-blueapi.rst +++ b/docs/developer/hyperion/hyperion-blueapi.rst @@ -23,7 +23,7 @@ Hyperion on BlueAPI consists of two components: Deployment ---------- -``hyperion-blueapi`` is automatically available in a standard Hyperion deployment. +``hyperion-blueapi`` and ``hyperion-supervisor`` are automatically available in a standard Hyperion deployment. Launching --------- @@ -33,3 +33,33 @@ Launching :: ./run_hyperion.sh --beamline=i03 --dev --blueapi + + +``hyperion-supervisor`` can be launched using the ``run_hyperion.sh`` script, using the ``--supervisor`` option: + +:: + + ./run_hyperion.sh --beamline=i03 --dev --supervisor + +Configuration +------------- + +Configuration of ``hyperion-blueapi`` and ``hyperion-supervisor`` is done via standard BlueAPI .yaml configuration files. +Basic configuration files for i03 are supplied as follows in ``src/mx_bluesky/hyperion``. + +.. csv-table:: hyperion-blueapi configuration files + :widths: auto + :header: "File", "Description" + + "blueapi_config.yaml", "Defines beamline device module and blueapi plans to be exported, BlueAPI REST to listen on, Stomp and graylog servers." + + +.. csv-table:: hyperion-supervisor configuration files + :widths: auto + :header: "File", "Description" + + "supervisor/client_config.yaml", "Tells the supervisor how to communicate with hyperion-blueapi, specifying the REST endpoint and stomp server." + "supervisor/supervisor_config.yaml", "Configures the internal blueapi context with a minimal beamline module containing the baton device and the graylog endpoint." + +When these are deployed in kubernetes it is anticipated that these will be provided by specifying +directly in the values.yaml which will be expanded by the base helmcharts at deployment time. diff --git a/pyproject.toml b/pyproject.toml index 1f6a97673..afc5bd152 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ "ophyd >= 1.10.5", "ophyd-async >= 0.14.0", "bluesky >= 1.14.6", - "dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@a946a46ffd418ea25fbae9134954659cfa223d6f", + "dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@a6d784617a42cb5fc2139d361fe0323532fffb66", ] diff --git a/run_hyperion.sh b/run_hyperion.sh index 80ce1b9b0..dbbe419d2 100755 --- a/run_hyperion.sh +++ b/run_hyperion.sh @@ -9,6 +9,11 @@ MODE=gda CONFIG_DIR=`dirname $0`/src/mx_bluesky/hyperion BLUEAPI_CONFIG=$CONFIG_DIR/blueapi_config.yaml +SUPERVISOR_CONFIG=$CONFIG_DIR/supervisor/supervisor_config.yaml +CLIENT_CONFIG=$CONFIG_DIR/supervisor/client_config.yaml +DO_CALLBACKS=1 +HEALTHCHECK_PORT=5005 +CALLBACK_WATCHDOG_PORT=5005 for option in "$@"; do case $option in @@ -26,12 +31,19 @@ for option in "$@"; do --dev) IN_DEV=true BLUEAPI_CONFIG=$CONFIG_DIR/blueapi_dev_config.yaml + SUPERVISOR_CONFIG=$CONFIG_DIR/supervisor/supervisor_dev_config.yaml ;; --udc) MODE=udc ;; --blueapi) MODE=blueapi + CALLBACK_WATCHDOG_PORT=5006 + ;; + --supervisor) + MODE=supervisor + DO_CALLBACKS=0 + HEALTHCHECK_PORT=5006 ;; --help|--info|--h) source .venv/bin/activate @@ -48,6 +60,8 @@ Options: --dev Enable dev mode to run from a local workspace on a development machine. --udc Start hyperion in UDC mode instead of taking commands from GDA --blueapi Start hyperion in blueapi mode instead of taking commands from GDA + --supervisor Start hyperion in supervisor mode, taking commands from Agamemnon and feeding them to + an instance running in blueapi mode. --help This help By default this script will start an Hyperion server unless the --no-start flag is specified. @@ -62,11 +76,18 @@ END done kill_active_apps () { - echo "Killing active instances of hyperion and hyperion-callbacks..." - pkill -e -f "python.*hyperion" - pkill -e -f "SCREEN.*hyperion" - blueapi controller stop 2>/dev/null - echo "done." + if [ $MODE = "supervisor" ]; then + # supervisor mode kills only supervisor + echo "Killing active instances of hyperion supervisor..." + pkill -e -f "mx-bluesky/.venv/bin/python .*--mode supervisor" + else + echo "Killing active instances of hyperion-blueapi" + pkill -e -f "python .*mx-bluesky/.venv/bin/blueapi .*serve" + echo "Killing vanilla hyperion instances" + pkill -e -f "mx-bluesky/.venv/bin/python .*--mode (gda|udc)" + echo "Killing hyperion-callbacks" + pkill -e -f "mx-bluesky/.venv/bin/python .*hyperion-callbacks" + fi } check_user () { @@ -99,7 +120,7 @@ if [[ $START == 1 ]]; then check_user ISPYB_CONFIG_PATH="/dls_sw/dasc/mariadb/credentials/ispyb-hyperion-${BEAMLINE}.cfg" else - ISPYB_CONFIG_PATH="$RELATIVE_SCRIPT_DIR/tests/test_data/ispyb_test_credentials.cfg" + ISPYB_CONFIG_PATH="$RELATIVE_SCRIPT_DIR/tests/test_data/ispyb-test-credentials.cfg" ZOCALO_CONFIG="$RELATIVE_SCRIPT_DIR/tests/test_data/zocalo-test-configuration.yaml" export ZOCALO_CONFIG fi @@ -121,8 +142,12 @@ if [[ $START == 1 ]]; then fi echo "$(date) Logging to $LOG_DIR" export LOG_DIR - mkdir -p $LOG_DIR - start_log_path=$LOG_DIR/start_log.log + mkdir -p "$LOG_DIR" + if [ $MODE = "supervisor" ]; then + start_log_path=$LOG_DIR/supervisor_start_log.log + else + start_log_path=$LOG_DIR/start_log.log + fi callback_start_log_path=$LOG_DIR/callback_start_log.log source .venv/bin/activate @@ -130,8 +155,11 @@ if [[ $START == 1 ]]; then declare -A h_and_cb_args=( ["IN_DEV"]="$IN_DEV" ) declare -A h_and_cb_arg_strings=( ["IN_DEV"]="--dev" ) - h_commands="--mode $MODE" - cb_commands=() + h_commands="--mode $MODE " + cb_commands="--watchdog-port $CALLBACK_WATCHDOG_PORT " + if [ $MODE = "supervisor" ]; then + h_commands+="--client-config ${CLIENT_CONFIG} --supervisor-config ${SUPERVISOR_CONFIG} " + fi for i in "${!h_and_cb_args[@]}" do if [ "${h_and_cb_args[$i]}" != false ]; then @@ -146,18 +174,20 @@ if [[ $START == 1 ]]; then blueapi --config $BLUEAPI_CONFIG serve > $start_log_path 2>&1 & HEALTHCHECK_ENDPOINT="healthz" else - echo "Starting hyperion with hyperion $h_commands, start_log is $start_log_path" + echo "Starting hyperion in mode $MODE with hyperion $h_commands, start_log is $start_log_path" hyperion `echo $h_commands;`>$start_log_path 2>&1 & HEALTHCHECK_ENDPOINT="status" fi - echo "Starting hyperion-callbacks with hyperion-callbacks $cb_commands, start_log is $callback_start_log_path" - hyperion-callbacks `echo $cb_commands;`>$callback_start_log_path 2>&1 & + if [[ $DO_CALLBACKS == 1 ]]; then + echo "Starting hyperion-callbacks with hyperion-callbacks $cb_commands, start_log is $callback_start_log_path" + hyperion-callbacks `echo $cb_commands;`>$callback_start_log_path 2>&1 & + fi echo "$(date) Waiting for Hyperion to start" for i in {1..30} do echo "$(date)" - curl --head -X GET http://localhost:5005/$HEALTHCHECK_ENDPOINT >/dev/null + curl --head -X GET http://localhost:$HEALTHCHECK_PORT/$HEALTHCHECK_ENDPOINT >/dev/null ret_value=$? if [ $ret_value -ne 0 ]; then sleep 1 diff --git a/src/mx_bluesky/Getting started.ipynb b/src/mx_bluesky/Getting started.ipynb index 6b539f072..2c2d0f716 100644 --- a/src/mx_bluesky/Getting started.ipynb +++ b/src/mx_bluesky/Getting started.ipynb @@ -52,6 +52,7 @@ "import importlib\n", "\n", "from dodal.utils import collect_factories\n", + "\n", "beamline = \"i02_2\"\n", "module_name = f\"dodal.beamlines.{beamline}\"\n", "beamline_module = importlib.import_module(module_name)\n", diff --git a/src/mx_bluesky/hyperion/__main__.py b/src/mx_bluesky/hyperion/__main__.py index 0277d352e..4c29d8180 100755 --- a/src/mx_bluesky/hyperion/__main__.py +++ b/src/mx_bluesky/hyperion/__main__.py @@ -2,9 +2,11 @@ import signal import threading from dataclasses import asdict +from pathlib import Path from sys import argv from traceback import format_exception +from blueapi.config import ApplicationConfig, ConfigLoader from blueapi.core import BlueskyContext from flask import Flask, request from flask_restful import Api, Resource @@ -43,6 +45,7 @@ StatusAndMessage, make_error_status_and_message, ) +from mx_bluesky.hyperion.supervisor import SupervisorRunner from mx_bluesky.hyperion.utils.context import setup_context @@ -153,7 +156,11 @@ def create_app(runner: GDARunner, test_config=None) -> Flask: def initialise_globals(args: HyperionArgs): """Do all early main low-level application initialisation.""" do_default_logging_setup( - CONST.LOG_FILE_NAME, CONST.GRAYLOG_PORT, dev_mode=args.dev_mode + CONST.SUPERVISOR_LOG_FILE_NAME + if args.mode == HyperionMode.SUPERVISOR + else CONST.LOG_FILE_NAME, + CONST.GRAYLOG_PORT, + dev_mode=args.dev_mode, ) LOGGER.info(f"Hyperion launched with args:{argv}") alerting.set_alerting_service(LoggingAlertService(CONST.GRAYLOG_STREAM_ID)) @@ -184,13 +191,32 @@ def main(): case HyperionMode.UDC: context = setup_context(dev_mode=args.dev_mode) plan_runner = InProcessRunner(context, args.dev_mode) - create_server_for_udc(plan_runner) + create_server_for_udc(plan_runner, HyperionConstants.HYPERION_PORT) _register_sigterm_handler(plan_runner) run_forever(plan_runner) case HyperionMode.SUPERVISOR: - raise RuntimeError( - "Supervisor mode not supported yet see https://github.com/DiamondLightSource/mx-bluesky/issues/1365" - ) + if not args.client_config: + raise RuntimeError( + "BlueAPI client configuration file must be specified in supervisor mode." + ) + if not args.supervisor_config: + raise RuntimeError( + "BlueAPI supervisor configuration file must be specified in supervisor mode." + ) + + client_config = _load_config_from_yaml(Path(args.client_config)) + supervisor_config = _load_config_from_yaml(Path(args.supervisor_config)) + context = BlueskyContext(configuration=supervisor_config) + plan_runner = SupervisorRunner(context, client_config, args.dev_mode) + create_server_for_udc(plan_runner, HyperionConstants.SUPERVISOR_PORT) + _register_sigterm_handler(plan_runner) + run_forever(plan_runner) + + +def _load_config_from_yaml(config_path: Path): + loader = ConfigLoader(ApplicationConfig) + loader.use_values_from_yaml(config_path) + return loader.load() def _register_sigterm_handler(runner: PlanRunner): diff --git a/src/mx_bluesky/hyperion/blueapi_config.yaml b/src/mx_bluesky/hyperion/blueapi_config.yaml index da8bfcf23..67c8f43dd 100644 --- a/src/mx_bluesky/hyperion/blueapi_config.yaml +++ b/src/mx_bluesky/hyperion/blueapi_config.yaml @@ -8,10 +8,9 @@ env: broadcast_status_events: false api: url: http://localhost:5005 - cors: - allow_credentials: True - origins: - - "*" +stomp: + enabled: true + url: tcp://localhost:61613 logging: graylog: url: "tcp://graylog-log-target.diamond.ac.uk:12232" diff --git a/src/mx_bluesky/hyperion/blueapi_dev_config.yaml b/src/mx_bluesky/hyperion/blueapi_dev_config.yaml index 16ac4e3eb..f02f83307 100644 --- a/src/mx_bluesky/hyperion/blueapi_dev_config.yaml +++ b/src/mx_bluesky/hyperion/blueapi_dev_config.yaml @@ -9,9 +9,8 @@ env: broadcast_status_events: false api: url: http://localhost:5005 - cors: - allow_credentials: True - origins: - - "*" +stomp: + enabled: true + url: tcp://localhost:61613 logging: level: DEBUG diff --git a/src/mx_bluesky/hyperion/blueapi_plans/__init__.py b/src/mx_bluesky/hyperion/blueapi_plans/__init__.py index ce45acd9f..93eb4b5c8 100644 --- a/src/mx_bluesky/hyperion/blueapi_plans/__init__.py +++ b/src/mx_bluesky/hyperion/blueapi_plans/__init__.py @@ -73,13 +73,13 @@ def robot_unload( def clean_up_udc( visit: str, - cleanup_group="cleanup", + cleanup_group: str = "cleanup", robot: BartRobot = inject("robot"), smargon: Smargon = inject("smargon"), aperture_scatterguard: ApertureScatterguard = inject("aperture_scatterguard"), lower_gonio: XYZStage = inject("lower_gonio"), detector_motion: DetectorMotion = inject("detector_motion"), -): +) -> MsgGenerator: yield from bps.abs_set( detector_motion.shutter, ShutterState.CLOSED, group=cleanup_group ) diff --git a/src/mx_bluesky/hyperion/external_interaction/callbacks/__main__.py b/src/mx_bluesky/hyperion/external_interaction/callbacks/__main__.py index c32a58e3f..26cd1ccd3 100644 --- a/src/mx_bluesky/hyperion/external_interaction/callbacks/__main__.py +++ b/src/mx_bluesky/hyperion/external_interaction/callbacks/__main__.py @@ -52,8 +52,8 @@ from mx_bluesky.hyperion.external_interaction.callbacks.snapshot_callback import ( BeamDrawingCallback, ) -from mx_bluesky.hyperion.parameters.cli import parse_callback_dev_mode_arg -from mx_bluesky.hyperion.parameters.constants import CONST, HyperionConstants +from mx_bluesky.hyperion.parameters.cli import CallbackArgs, parse_callback_args +from mx_bluesky.hyperion.parameters.constants import CONST from mx_bluesky.hyperion.parameters.gridscan import ( GridCommonWithHyperionDetectorParams, HyperionSpecifiedThreeDGridScan, @@ -159,8 +159,8 @@ def wait_for_threads_forever(threads: Sequence[Thread]): class HyperionCallbackRunner: """Runs Nexus, ISPyB and Zocalo callbacks in their own process.""" - def __init__(self, dev_mode) -> None: - setup_logging(dev_mode) + def __init__(self, callback_args: CallbackArgs) -> None: + setup_logging(callback_args.dev_mode) log_info("Hyperion callback process started.") set_alerting_service(LoggingAlertService(CONST.GRAYLOG_STREAM_ID)) @@ -187,7 +187,12 @@ def start_dispatcher(callbacks: list[Callable]): name="0MQ Dispatcher", ) - self.watchdog_thread = Thread(target=run_watchdog, daemon=True, name="Watchdog") + self.watchdog_thread = Thread( + target=run_watchdog, + daemon=True, + name="Watchdog", + args=[callback_args.watchdog_port], + ) log_info("Created 0MQ proxy and local RemoteDispatcher.") def start(self): @@ -201,12 +206,12 @@ def start(self): ) -def run_watchdog(): +def run_watchdog(watchdog_port: int): log_info("Hyperion watchdog keepalive running") while True: try: with request.urlopen( - f"http://localhost:{HyperionConstants.HYPERION_PORT}/callbackPing", + f"http://localhost:{watchdog_port}/callbackPing", timeout=PING_TIMEOUT_S, ) as response: if response.status != 200: @@ -219,9 +224,10 @@ def run_watchdog(): def main(dev_mode=False) -> None: - dev_mode = dev_mode or parse_callback_dev_mode_arg() + callback_args = parse_callback_args() + callback_args.dev_mode = dev_mode or callback_args.dev_mode print(f"In dev mode: {dev_mode}") - runner = HyperionCallbackRunner(dev_mode) + runner = HyperionCallbackRunner(callback_args) runner.start() diff --git a/src/mx_bluesky/hyperion/parameters/cli.py b/src/mx_bluesky/hyperion/parameters/cli.py index 18c9bf572..c3306e1b7 100644 --- a/src/mx_bluesky/hyperion/parameters/cli.py +++ b/src/mx_bluesky/hyperion/parameters/cli.py @@ -4,6 +4,7 @@ from pydantic.dataclasses import dataclass from mx_bluesky._version import version +from mx_bluesky.hyperion.parameters.constants import HyperionConstants class HyperionMode(StrEnum): @@ -16,6 +17,14 @@ class HyperionMode(StrEnum): class HyperionArgs: mode: HyperionMode dev_mode: bool = False + client_config: str | None = None + supervisor_config: str | None = None + + +@dataclass +class CallbackArgs: + dev_mode: bool = False + watchdog_port: int = HyperionConstants.HYPERION_PORT def _add_callback_relevant_args(parser: argparse.ArgumentParser) -> None: @@ -27,12 +36,17 @@ def _add_callback_relevant_args(parser: argparse.ArgumentParser) -> None: ) -def parse_callback_dev_mode_arg() -> bool: +def parse_callback_args() -> CallbackArgs: """Returns the bool representing the 'dev_mode' argument.""" parser = argparse.ArgumentParser() _add_callback_relevant_args(parser) + parser.add_argument( + "--watchdog-port", + type=int, + help="Liveness port for callbacks to ping regularly", + ) args = parser.parse_args() - return args.dev + return CallbackArgs(dev_mode=args.dev, watchdog_port=args.watchdog_port) def parse_cli_args() -> HyperionArgs: @@ -54,5 +68,17 @@ def parse_cli_args() -> HyperionArgs: type=HyperionMode, choices=HyperionMode.__members__.values(), ) + parser.add_argument( + "--client-config", help="Specify the blueapi client configuration file." + ) + parser.add_argument( + "--supervisor-config", + help="Specify the supervisor bluesky context configuration file.", + ) args = parser.parse_args() - return HyperionArgs(dev_mode=args.dev or False, mode=args.mode) + return HyperionArgs( + dev_mode=args.dev or False, + mode=args.mode, + supervisor_config=args.supervisor_config, + client_config=args.client_config, + ) diff --git a/src/mx_bluesky/hyperion/parameters/constants.py b/src/mx_bluesky/hyperion/parameters/constants.py index 0a39d06fc..122e16339 100644 --- a/src/mx_bluesky/hyperion/parameters/constants.py +++ b/src/mx_bluesky/hyperion/parameters/constants.py @@ -64,6 +64,7 @@ class HyperionConstants: PLAN = PlanNameConstants() WAIT = PlanGroupCheckpointConstants() HYPERION_PORT = 5005 + SUPERVISOR_PORT = 5006 CALLBACK_0MQ_PROXY_PORTS = (5577, 5578) DESCRIPTORS = DocDescriptorNames() CONFIG_SERVER_URL = ( @@ -75,6 +76,7 @@ class HyperionConstants: GRAYLOG_STREAM_ID = "66264f5519ccca6d1c9e4e03" PARAMETER_SCHEMA_DIRECTORY = "src/hyperion/parameters/schemas/" LOG_FILE_NAME = "hyperion.log" + SUPERVISOR_LOG_FILE_NAME = "hyperion-supervisor.log" DEVICE_SETTINGS_CONSTANTS = DeviceSettingsConstants() diff --git a/src/mx_bluesky/hyperion/plan_runner_api.py b/src/mx_bluesky/hyperion/plan_runner_api.py index b79c187b5..927c3a70d 100644 --- a/src/mx_bluesky/hyperion/plan_runner_api.py +++ b/src/mx_bluesky/hyperion/plan_runner_api.py @@ -4,23 +4,22 @@ from flask_restful import Api, Resource from mx_bluesky.common.utils.log import LOGGER -from mx_bluesky.hyperion.parameters.constants import HyperionConstants from mx_bluesky.hyperion.plan_runner import PlanRunner # Ignore this function for code coverage as there is no way to shut down # a server once it is started. -def create_server_for_udc(runner: PlanRunner) -> Thread: # pragma: no cover +def create_server_for_udc(runner: PlanRunner, port: int) -> Thread: # pragma: no cover """Create a minimal API for Hyperion UDC mode""" app = create_app_for_udc(runner) flask_thread = Thread( target=app.run, - kwargs={"host": "0.0.0.0", "port": HyperionConstants.HYPERION_PORT}, + kwargs={"host": "0.0.0.0", "port": port}, daemon=True, ) flask_thread.start() - LOGGER.info(f"Hyperion now listening on {HyperionConstants.HYPERION_PORT}") + LOGGER.info(f"Hyperion now listening on {port}") return flask_thread diff --git a/src/mx_bluesky/hyperion/supervisor/__init__.py b/src/mx_bluesky/hyperion/supervisor/__init__.py new file mode 100644 index 000000000..9d3fc2f72 --- /dev/null +++ b/src/mx_bluesky/hyperion/supervisor/__init__.py @@ -0,0 +1,3 @@ +from mx_bluesky.hyperion.supervisor._supervisor import SupervisorRunner + +__all__ = ["SupervisorRunner"] diff --git a/src/mx_bluesky/hyperion/supervisor/_supervisor.py b/src/mx_bluesky/hyperion/supervisor/_supervisor.py new file mode 100644 index 000000000..ca191b9f6 --- /dev/null +++ b/src/mx_bluesky/hyperion/supervisor/_supervisor.py @@ -0,0 +1,116 @@ +from collections.abc import Sequence + +from blueapi.client.client import BlueapiClient +from blueapi.client.event_bus import BlueskyStreamingError +from blueapi.config import ApplicationConfig +from blueapi.core import BlueskyContext +from blueapi.service.model import TaskRequest +from bluesky import plan_stubs as bps +from bluesky.utils import MsgGenerator + +from mx_bluesky.common.parameters.components import MxBlueskyParameters +from mx_bluesky.common.parameters.constants import Status +from mx_bluesky.common.utils.log import LOGGER +from mx_bluesky.hyperion.parameters.components import UDCCleanup, UDCDefaultState, Wait +from mx_bluesky.hyperion.parameters.load_centre_collect import LoadCentreCollect +from mx_bluesky.hyperion.plan_runner import PlanError, PlanRunner + + +class SupervisorRunner(PlanRunner): + """Runner that executes plans by delegating to a remote blueapi instance""" + + def __init__( + self, + bluesky_context: BlueskyContext, + client_config: ApplicationConfig, + dev_mode: bool, + ): + super().__init__(bluesky_context, dev_mode) + self.blueapi_client = BlueapiClient.from_config(client_config) + self._current_status = Status.IDLE + + def decode_and_execute( + self, current_visit: str | None, parameter_list: Sequence[MxBlueskyParameters] + ) -> MsgGenerator: + try: + yield from self.check_external_callbacks_are_alive() + except Exception as e: + raise PlanError(f"Exception raised during plan execution: {e}") from e + instrument_session = current_visit or "NO_VISIT" + try: + if self._current_status == Status.ABORTING: + raise PlanError("Plan execution cancelled, supervisor is shutting down") + self._current_status = Status.BUSY + for parameters in parameter_list: + LOGGER.info( + f"Executing plan with parameters: {parameters.model_dump_json(indent=2)}" + ) + match parameters: + case LoadCentreCollect(): + task_request = TaskRequest( + name="load_centre_collect", + params={"parameters": parameters}, + instrument_session=instrument_session, + ) + self._run_task_remotely(task_request) + case Wait(): + yield from bps.sleep(parameters.duration_s) + case UDCDefaultState(): + task_request = TaskRequest( + name="move_to_udc_default_state", + params={}, + instrument_session=instrument_session, + ) + self._run_task_remotely(task_request) + case UDCCleanup(): + task_request = TaskRequest( + name="clean_up_udc", + params={"visit": current_visit}, + instrument_session=instrument_session, + ) + self._run_task_remotely(task_request) + case _: + raise AssertionError( + f"Unsupported instruction decoded from agamemnon {type(parameters)}" + ) + except: + self._current_status = Status.FAILED + raise + else: + self._current_status = Status.IDLE + return current_visit + + @property + def current_status(self) -> Status: + return self._current_status + + def is_connected(self) -> bool: + try: + self.blueapi_client.get_state() + except Exception as e: + LOGGER.debug(f"Failed to get worker state: {e}") + return False + return True + + def shutdown(self): + LOGGER.info( + "Hyperion supervisor received shutdown request, signalling abort to BlueAPI server..." + ) + if self.current_status != Status.BUSY: + self.request_run_engine_abort() + else: + self._current_status = Status.ABORTING + self.blueapi_client.abort() + + def _run_task_remotely(self, task_request: TaskRequest): + try: + self.blueapi_client.run_task(task_request) + except BlueskyStreamingError as e: + # We will receive a BlueskyStreamingError if the remote server + # processed an abort during plan execution, but this is not + # the only possible cause. + if self.current_status == Status.ABORTING: + LOGGER.info("Aborting local runner...") + self.request_run_engine_abort() + else: + raise PlanError(f"Exception raised during plan execution: {e}") from e diff --git a/src/mx_bluesky/hyperion/supervisor/client_config.yaml b/src/mx_bluesky/hyperion/supervisor/client_config.yaml new file mode 100644 index 000000000..a32a27969 --- /dev/null +++ b/src/mx_bluesky/hyperion/supervisor/client_config.yaml @@ -0,0 +1,6 @@ +# Configuration for the BlueAPI client running in the hyperion supervisor +api: + url: http://localhost:5005 +stomp: + enabled: true + url: tcp://localhost:61613 diff --git a/src/mx_bluesky/hyperion/supervisor/supervisor_config.yaml b/src/mx_bluesky/hyperion/supervisor/supervisor_config.yaml new file mode 100644 index 000000000..f8771b889 --- /dev/null +++ b/src/mx_bluesky/hyperion/supervisor/supervisor_config.yaml @@ -0,0 +1,10 @@ +# Configuration for the supervisor BlueAPI context +# to access the beamline baton device +env: + sources: + - kind: deviceManager + module: dodal.beamlines.i03_supervisor +logging: + graylog: + url: "tcp://graylog-log-target.diamond.ac.uk:12232" + enabled: true diff --git a/src/mx_bluesky/hyperion/supervisor/supervisor_dev_config.yaml b/src/mx_bluesky/hyperion/supervisor/supervisor_dev_config.yaml new file mode 100644 index 000000000..b6b0b6d6f --- /dev/null +++ b/src/mx_bluesky/hyperion/supervisor/supervisor_dev_config.yaml @@ -0,0 +1,9 @@ +# Configuration for the supervisor BlueAPI context +# to access the beamline baton device +env: + sources: + - kind: deviceManager + module: dodal.beamlines.i03_supervisor + mock: true + events: + broadcast_status_events: false diff --git a/tests/system_tests/hyperion/external_interaction/callbacks/test_external_callbacks.py b/tests/system_tests/hyperion/external_interaction/callbacks/test_external_callbacks.py index 9c2a7b1d1..6227b2273 100644 --- a/tests/system_tests/hyperion/external_interaction/callbacks/test_external_callbacks.py +++ b/tests/system_tests/hyperion/external_interaction/callbacks/test_external_callbacks.py @@ -34,7 +34,7 @@ from mx_bluesky.hyperion.experiment_plans.rotation_scan_plan import ( rotation_scan, ) -from mx_bluesky.hyperion.parameters.constants import CONST +from mx_bluesky.hyperion.parameters.constants import CONST, HyperionConstants from mx_bluesky.hyperion.parameters.device_composites import ( HyperionFlyScanXRayCentreComposite, ) @@ -86,6 +86,8 @@ def external_callbacks(): "python", "src/mx_bluesky/hyperion/external_interaction/callbacks/__main__.py", "--dev", + "--watchdog-port", + str(HyperionConstants.HYPERION_PORT), ], env=process_env, ) diff --git a/tests/system_tests/hyperion/supervisor/__init__.py b/tests/system_tests/hyperion/supervisor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/system_tests/hyperion/supervisor/client_config.yaml b/tests/system_tests/hyperion/supervisor/client_config.yaml new file mode 100644 index 000000000..42d3ac9e6 --- /dev/null +++ b/tests/system_tests/hyperion/supervisor/client_config.yaml @@ -0,0 +1,7 @@ +# Configuration for the BlueAPI client running in the hyperion supervisor +# NOT TO BE USED IN PRODUCTION +api: + url: http://localhost:5005 +stomp: + enabled: true + url: tcp://localhost:61613 diff --git a/tests/system_tests/hyperion/supervisor/dummy_plans.py b/tests/system_tests/hyperion/supervisor/dummy_plans.py new file mode 100644 index 000000000..ee3f4da13 --- /dev/null +++ b/tests/system_tests/hyperion/supervisor/dummy_plans.py @@ -0,0 +1,35 @@ +from bluesky import plan_stubs as bps +from bluesky.utils import MsgGenerator +from dodal.common import inject +from dodal.devices.aperturescatterguard import ApertureScatterguard +from dodal.devices.detector.detector_motion import DetectorMotion +from dodal.devices.motors import XYZStage +from dodal.devices.robot import BartRobot +from dodal.devices.smargon import Smargon + +from mx_bluesky.common.utils.exceptions import WarningError + + +def publish_event(plan_name: str): + yield from bps.open_run(md={"plan_name": plan_name}) + yield from bps.close_run() + + +def clean_up_udc( + visit: str, + cleanup_group: str = "cleanup", + robot: BartRobot = inject("robot"), + smargon: Smargon = inject("smargon"), + aperture_scatterguard: ApertureScatterguard = inject("aperture_scatterguard"), + lower_gonio: XYZStage = inject("lower_gonio"), + detector_motion: DetectorMotion = inject("detector_motion"), +) -> MsgGenerator: + yield from publish_event("clean_up_udc") + match visit: + case "raise_warning_error": + raise WarningError("Test warning error") + case "raise_other_error": + raise RuntimeError("Test unexpected error") + case "wait_for_abort": + while True: + yield from bps.sleep(1) diff --git a/tests/system_tests/hyperion/supervisor/supervisor_config.yaml b/tests/system_tests/hyperion/supervisor/supervisor_config.yaml new file mode 100644 index 000000000..e22144e64 --- /dev/null +++ b/tests/system_tests/hyperion/supervisor/supervisor_config.yaml @@ -0,0 +1,18 @@ +# Configuration for the supervisor BlueAPI context +# to access the beamline baton device +env: + sources: + - kind: deviceManager + module: dodal.beamlines.i03_supervisor + events: + broadcast_status_events: false +api: + url: http://localhost:5006 + cors: + allow_credentials: True + origins: + - "*" +logging: + graylog: + url: "tcp://graylog-log-target.diamond.ac.uk:12232" + enabled: true diff --git a/tests/system_tests/hyperion/supervisor/system_test_blueapi.yaml b/tests/system_tests/hyperion/supervisor/system_test_blueapi.yaml new file mode 100644 index 000000000..282e0a4dc --- /dev/null +++ b/tests/system_tests/hyperion/supervisor/system_test_blueapi.yaml @@ -0,0 +1,20 @@ +env: + sources: + - kind: deviceManager + module: dodal.beamlines.i03 + mock: true + - kind: planFunctions + module: system_tests.hyperion.supervisor.dummy_plans + events: + broadcast_status_events: false +api: + url: http://localhost:5005 + cors: + allow_credentials: True + origins: + - "*" +stomp: + enabled: true + url: tcp://localhost:61613 +logging: + level: DEBUG diff --git a/tests/system_tests/hyperion/supervisor/test_supervisor.py b/tests/system_tests/hyperion/supervisor/test_supervisor.py new file mode 100644 index 000000000..395f669a4 --- /dev/null +++ b/tests/system_tests/hyperion/supervisor/test_supervisor.py @@ -0,0 +1,245 @@ +import subprocess +import time +from concurrent.futures.thread import ThreadPoolExecutor +from functools import partial +from os import environ, getcwd +from pathlib import Path +from threading import Event +from time import sleep + +import pytest +from blueapi.client.event_bus import AnyEvent, EventBusClient +from blueapi.config import ApplicationConfig, ConfigLoader +from blueapi.core import BlueskyContext, DataEvent +from blueapi.worker import WorkerEvent, WorkerState +from bluesky import RunEngine, RunEngineInterrupted +from bluesky import plan_stubs as bps +from bluesky_stomp.messaging import MessageContext + +from mx_bluesky.common.parameters.components import get_param_version +from mx_bluesky.common.parameters.constants import Status +from mx_bluesky.hyperion.parameters.components import UDCCleanup +from mx_bluesky.hyperion.plan_runner import PlanError +from mx_bluesky.hyperion.supervisor import SupervisorRunner + +from ....unit_tests.hyperion.external_interaction.callbacks.test_alert_on_container_change import ( + TEST_VISIT, +) + +BLUEAPI_SERVER_CONFIG = ( + "tests/system_tests/hyperion/supervisor/system_test_blueapi.yaml" +) + + +@pytest.fixture(scope="module") +def tpe(): + return ThreadPoolExecutor(max_workers=1) + + +@pytest.fixture(scope="module") +def mock_blueapi_server(): + with subprocess.Popen( + [ + "blueapi", + "--config", + BLUEAPI_SERVER_CONFIG, + "serve", + ], + env=environ | {"PYTHONPATH": getcwd() + "/tests"}, + ) as blueapi_server: + try: + yield blueapi_server + finally: + blueapi_server.terminate() + + +@pytest.fixture +def mock_bluesky_context(run_engine: RunEngine): + loader = ConfigLoader(ApplicationConfig) + loader.use_values_from_yaml( + Path("tests/system_tests/hyperion/supervisor/supervisor_config.yaml") + ) + supervisor_config = loader.load() + yield BlueskyContext(configuration=supervisor_config, run_engine=run_engine) + + +@pytest.fixture +def client_config() -> ApplicationConfig: + loader = ConfigLoader(ApplicationConfig) + loader.use_values_from_yaml( + Path("tests/system_tests/hyperion/supervisor/client_config.yaml") + ) + return loader.load() + + +def get_event_bus_client(supervisor: SupervisorRunner) -> EventBusClient: + return supervisor.blueapi_client._events # type: ignore + + +@pytest.fixture +def supervisor_runner(supervisor_runner_no_ping: SupervisorRunner): + supervisor_runner_no_ping.reset_callback_watchdog_timer() + yield supervisor_runner_no_ping + + +@pytest.fixture +def supervisor_runner_no_ping( + mock_bluesky_context: BlueskyContext, client_config: ApplicationConfig +): + runner = SupervisorRunner(mock_bluesky_context, client_config, True) + timeout = time.monotonic() + 30 + while time.monotonic() < timeout: + if runner.is_connected(): + return runner + sleep(1) + else: + raise AssertionError("Failed to connect to blueapi") + + +def handle_event(plan_started: Event, event_payload: AnyEvent, context: MessageContext): + match event_payload: + case DataEvent() as data_event: + if ( + data_event.name == "start" + and data_event.doc["plan_name"] == "clean_up_udc" + ): + plan_started.set() + + +@pytest.mark.system_test +def test_supervisor_connects_to_blueapi_and_stomp( + mock_blueapi_server, + mock_bluesky_context: BlueskyContext, + client_config: ApplicationConfig, + supervisor_runner: SupervisorRunner, +): + params = UDCCleanup.model_validate({"parameter_model_version": get_param_version()}) + ebc = get_event_bus_client(supervisor_runner) + + received_message_event = Event() + + ebc.subscribe_to_all_events(partial(handle_event, received_message_event)) + assert supervisor_runner.current_status == Status.IDLE + supervisor_runner.run_engine( + supervisor_runner.decode_and_execute(TEST_VISIT, [params]) + ) + received_message_event.wait() + + +@pytest.mark.skip(reason="https://github.com/DiamondLightSource/blueapi/issues/1312") +def test_supervisor_continues_to_next_instruction_on_warning_error( + supervisor_runner: SupervisorRunner, +): + params = UDCCleanup.model_validate({"parameter_model_version": get_param_version()}) + supervisor_runner.run_engine( + supervisor_runner.decode_and_execute("raise_warning_error", [params]) + ) + assert supervisor_runner.current_status == Status.FAILED + + +def test_supervisor_raises_request_abort_when_shutdown_requested( + supervisor_runner: SupervisorRunner, tpe: ThreadPoolExecutor +): + params = UDCCleanup.model_validate({"parameter_model_version": get_param_version()}) + ebc = get_event_bus_client(supervisor_runner) + plan_aborted = Event() + plan_called = Event() + + def handle_abort(event_payload: AnyEvent, context: MessageContext): + match event_payload: + case WorkerEvent() as worker_event: + if ( + worker_event.state == WorkerState.IDLE + and worker_event.task_status + and worker_event.task_status.task_complete + and worker_event.task_status.task_failed + ): + plan_aborted.set() + + ebc.subscribe_to_all_events(partial(handle_event, plan_called)) + ebc.subscribe_to_all_events(handle_abort) + + def shutdown_in_background(): + plan_called.wait(10) + assert supervisor_runner.current_status == Status.BUSY + assert supervisor_runner.blueapi_client.get_state() == WorkerState.RUNNING + supervisor_runner.shutdown() + assert supervisor_runner.current_status == Status.ABORTING + + fut = tpe.submit(shutdown_in_background) + + with pytest.raises(RunEngineInterrupted): + supervisor_runner.run_engine( + supervisor_runner.decode_and_execute("wait_for_abort", [params]) + ) + + assert supervisor_runner.blueapi_client.get_state() == WorkerState.IDLE + assert plan_aborted.wait(10) + fut.result() + + +def test_supervisor_raises_plan_error_when_plan_fails_with_other_exception( + supervisor_runner: SupervisorRunner, +): + params = UDCCleanup.model_validate({"parameter_model_version": get_param_version()}) + with pytest.raises(PlanError, match="Exception raised during plan execution:"): + supervisor_runner.run_engine( + supervisor_runner.decode_and_execute("raise_other_error", [params]) + ) + assert supervisor_runner.current_status == Status.FAILED + + +def test_shutdown_raises_run_engine_interrupted_when_idle( + supervisor_runner: SupervisorRunner, tpe: ThreadPoolExecutor +): + plan_started = Event() + + def idle_plan(): + plan_started.set() + while True: + yield from bps.sleep(1) + + def shutdown_in_background(): + plan_started.wait(10) + supervisor_runner.shutdown() + + fut = tpe.submit(shutdown_in_background) + + with pytest.raises(RunEngineInterrupted): + supervisor_runner.run_engine(idle_plan()) + + fut.result() + + +async def test_supervisor_checks_for_external_callback_ping( + supervisor_runner_no_ping: SupervisorRunner, tpe: ThreadPoolExecutor +): + ebc = get_event_bus_client(supervisor_runner_no_ping) + received_message_event = Event() + ebc.subscribe_to_all_events(partial(handle_event, received_message_event)) + params = UDCCleanup.model_validate({"parameter_model_version": get_param_version()}) + + def run_test_in_background(): + sleep(1) + assert not received_message_event.is_set() + supervisor_runner_no_ping.reset_callback_watchdog_timer() + received_message_event.wait(1) + + fut = tpe.submit(run_test_in_background) + supervisor_runner_no_ping.run_engine( + supervisor_runner_no_ping.decode_and_execute(TEST_VISIT, [params]) + ) + fut.result() + + +def test_supervisor_raises_plan_error_when_external_callbacks_watchdog_expired( + supervisor_runner_no_ping: SupervisorRunner, +): + runner = supervisor_runner_no_ping + runner.EXTERNAL_CALLBACK_WATCHDOG_TIMER_S = 0.5 # type: ignore + runner.reset_callback_watchdog_timer() + params = UDCCleanup.model_validate({"parameter_model_version": get_param_version()}) + # Allow callback watchdog to expire + sleep(1) + with pytest.raises(PlanError, match="External callback watchdog timer expired.*"): + runner.run_engine(runner.decode_and_execute(TEST_VISIT, [params])) diff --git a/tests/unit_tests/hyperion/external_interaction/callbacks/test_external_callbacks.py b/tests/unit_tests/hyperion/external_interaction/callbacks/test_external_callbacks.py index e2a5c2b84..51259d7c7 100644 --- a/tests/unit_tests/hyperion/external_interaction/callbacks/test_external_callbacks.py +++ b/tests/unit_tests/hyperion/external_interaction/callbacks/test_external_callbacks.py @@ -20,12 +20,14 @@ setup_logging, wait_for_threads_forever, ) +from mx_bluesky.hyperion.parameters.cli import CallbackArgs +from mx_bluesky.hyperion.parameters.constants import HyperionConstants @patch("mx_bluesky.hyperion.external_interaction.callbacks.__main__.run_watchdog") @patch( - "mx_bluesky.hyperion.external_interaction.callbacks.__main__.parse_callback_dev_mode_arg", - return_value=("DEBUG", True), + "mx_bluesky.hyperion.external_interaction.callbacks.__main__.parse_callback_args", + return_value=CallbackArgs(True, HyperionConstants.SUPERVISOR_PORT), ) @patch("mx_bluesky.hyperion.external_interaction.callbacks.__main__.setup_callbacks") @patch("mx_bluesky.hyperion.external_interaction.callbacks.__main__.setup_logging") @@ -40,7 +42,7 @@ def test_main_function( setup_alerting: MagicMock, setup_logging: MagicMock, setup_callbacks: MagicMock, - parse_callback_dev_mode_arg: MagicMock, + parse_callback_args: MagicMock, mock_run_watchdog: MagicMock, ): proxy_started = Event() @@ -48,7 +50,7 @@ def test_main_function( watchdog_started = Event() mock_proxy.return_value.start.side_effect = proxy_started.set mock_dispatcher.return_value.start.side_effect = dispatcher_started.set - mock_run_watchdog.side_effect = watchdog_started.set + mock_run_watchdog.side_effect = lambda _: watchdog_started.set() or None main() @@ -71,8 +73,8 @@ def test_setup_callbacks(): @pytest.mark.skip_log_setup @patch( - "mx_bluesky.hyperion.external_interaction.callbacks.__main__.parse_callback_dev_mode_arg", - return_value=True, + "mx_bluesky.hyperion.external_interaction.callbacks.__main__.parse_callback_args", + return_value=CallbackArgs(True, HyperionConstants.SUPERVISOR_PORT), ) def test_setup_logging(parse_callback_cli_args): assert DODAL_LOGGER.parent != ISPYB_ZOCALO_CALLBACK_LOGGER @@ -118,8 +120,20 @@ def test_launching_external_callbacks_pings_regularly( ) with pytest.raises(RuntimeError, match="Exit this thread"): - run_watchdog() + run_watchdog(5005) mock_request.urlopen.assert_called_with( "http://localhost:5005/callbackPing", timeout=PING_TIMEOUT_S ) + + +@patch("sys.argv", new=["hyperion-callbacks", "--watchdog-port", "1234"]) +@patch( + "mx_bluesky.hyperion.external_interaction.callbacks.__main__.HyperionCallbackRunner" +) +def test_launch_with_watchdog_port_arg_applies_port(mock_callback_runner: MagicMock): + main(dev_mode=True) + mock_callback_runner.assert_called_once() + callback_args = mock_callback_runner.mock_calls[0].args[0] + assert callback_args.dev_mode + assert callback_args.watchdog_port == 1234 diff --git a/tests/unit_tests/hyperion/supervisor/__init__.py b/tests/unit_tests/hyperion/supervisor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit_tests/hyperion/supervisor/test_supervisor.py b/tests/unit_tests/hyperion/supervisor/test_supervisor.py new file mode 100644 index 000000000..9efb9aa91 --- /dev/null +++ b/tests/unit_tests/hyperion/supervisor/test_supervisor.py @@ -0,0 +1,284 @@ +from concurrent.futures import Executor +from threading import Event +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from blueapi.client.event_bus import BlueskyStreamingError +from blueapi.core import BlueskyContext +from blueapi.service.model import TaskRequest +from bluesky import RunEngine, RunEngineInterrupted +from bluesky import plan_stubs as bps + +from mx_bluesky.common.parameters.components import ( + MxBlueskyParameters, + get_param_version, +) +from mx_bluesky.common.parameters.constants import Status +from mx_bluesky.hyperion.parameters.components import UDCCleanup, UDCDefaultState, Wait +from mx_bluesky.hyperion.parameters.load_centre_collect import LoadCentreCollect +from mx_bluesky.hyperion.plan_runner import PlanError +from mx_bluesky.hyperion.supervisor import SupervisorRunner + +TEST_VISIT = "cm12345-67" + + +@pytest.fixture +def mock_bluesky_context(run_engine: RunEngine): + return BlueskyContext(run_engine=run_engine) + + +@pytest.fixture(autouse=True) +def mock_blueapi_client(): + with patch( + "mx_bluesky.hyperion.supervisor._supervisor.BlueapiClient" + ) as mock_class: + yield mock_class.from_config.return_value + + +@pytest.fixture +def blueapi_config(): + return MagicMock() + + +@pytest.fixture +def runner(mock_bluesky_context, blueapi_config): + runner = SupervisorRunner(mock_bluesky_context, blueapi_config, True) + with patch.object(runner, "check_external_callbacks_are_alive"): + yield runner + + +def test_decode_and_execute_load_centre_collect( + mock_blueapi_client: MagicMock, runner: SupervisorRunner, load_centre_collect_params +): + runner.context.run_engine( + runner.decode_and_execute(TEST_VISIT, [load_centre_collect_params]) + ) + + mock_blueapi_client.run_task.assert_called_once_with( + TaskRequest( + name="load_centre_collect", + params={"parameters": load_centre_collect_params}, + instrument_session=TEST_VISIT, + ) + ) + + +def test_decode_and_execute_wait( + mock_blueapi_client: MagicMock, runner: SupervisorRunner +): + mock_sleep = AsyncMock() + runner.context.run_engine.register_command("sleep", mock_sleep) + try: + runner.context.run_engine( + runner.decode_and_execute( + TEST_VISIT, + [ + Wait.model_validate( + { + "parameter_model_version": get_param_version(), + "duration_s": 10, + } + ) + ], + ) + ) + finally: + runner.context.run_engine.register_command( + "sleep", runner.context.run_engine._sleep + ) + + mock_blueapi_client.run_task.assert_not_called() + mock_sleep.assert_awaited_once() + + +def test_decode_and_execute_default_state( + mock_blueapi_client: MagicMock, runner: SupervisorRunner +): + runner.context.run_engine( + runner.decode_and_execute( + TEST_VISIT, + [ + UDCDefaultState.model_validate( + {"parameter_model_version": get_param_version()} + ) + ], + ) + ) + + mock_blueapi_client.run_task.assert_called_once_with( + TaskRequest( + name="move_to_udc_default_state", params={}, instrument_session=TEST_VISIT + ) + ) + + +def test_decode_and_execute_udc_cleanup( + mock_blueapi_client: MagicMock, runner: SupervisorRunner +): + runner.context.run_engine( + runner.decode_and_execute( + TEST_VISIT, + [ + UDCCleanup.model_validate( + {"parameter_model_version": get_param_version()} + ) + ], + ) + ) + + mock_blueapi_client.run_task.assert_called_once_with( + TaskRequest( + name="clean_up_udc", + params={"visit": TEST_VISIT}, + instrument_session=TEST_VISIT, + ) + ) + + +def test_current_status_set_to_busy_during_execution( + mock_blueapi_client: MagicMock, runner: SupervisorRunner, executor: Executor +): + task_executing = Event() + check_complete = Event() + + def check_busy(): + task_executing.wait(timeout=1) + assert runner.current_status == Status.BUSY + check_complete.set() + + def wait_for_check(_): + task_executing.set() + check_complete.wait(1) + + mock_blueapi_client.run_task.side_effect = wait_for_check + + fut = executor.submit(check_busy) + try: + assert runner.current_status == Status.IDLE + runner.context.run_engine( + runner.decode_and_execute( + TEST_VISIT, + [ + UDCDefaultState.model_validate( + {"parameter_model_version": get_param_version()} + ) + ], + ) + ) + finally: + fut.result(1) + + +def test_current_status_set_to_failed_on_exception_and_raise_plan_error( + mock_blueapi_client: MagicMock, runner: SupervisorRunner +): + mock_blueapi_client.run_task.side_effect = BlueskyStreamingError( + "Simulated exception" + ) + + with pytest.raises(PlanError, match="Exception raised.*: Simulated exception"): + runner.context.run_engine( + runner.decode_and_execute( + TEST_VISIT, + [ + UDCDefaultState.model_validate( + {"parameter_model_version": get_param_version()} + ) + ], + ) + ) + assert runner.current_status == Status.FAILED + + +def test_is_connected_queries_blueapi_client( + runner: SupervisorRunner, mock_blueapi_client: MagicMock +): + assert runner.is_connected() + mock_blueapi_client.get_state.assert_called_once() + + +def test_is_connected_returns_false_on_exception( + runner: SupervisorRunner, mock_blueapi_client: MagicMock +): + mock_blueapi_client.get_state.side_effect = RuntimeError("Simulated exception") + assert not runner.is_connected() + + +def test_shutdown_sends_abort_to_run_engine_when_idle( + runner: SupervisorRunner, executor: Executor +): + runner.run_engine = MagicMock(spec=RunEngine) + + def request_shutdown(): + runner.shutdown() + + fut = executor.submit(request_shutdown) + + fut.result(1) + runner.run_engine.abort.assert_called_once() + + +def test_shutdown_sends_abort_to_blueapi_client_when_running_then_aborts( + runner: SupervisorRunner, executor: Executor +): + task_running = Event() + remote_abort_requested = Event() + + def request_shutdown(): + task_running.wait(1) + runner.shutdown() + + def mock_run_task(_): + task_running.set() + remote_abort_requested.wait(1) + raise BlueskyStreamingError("Simulated abort exception") + + def mock_baton_handler_loop(): + yield from runner.decode_and_execute( + TEST_VISIT, + [ + UDCDefaultState.model_validate( + {"parameter_model_version": get_param_version()} + ) + ], + ) + while True: + yield from bps.sleep(0.1) + + runner.blueapi_client.run_task.side_effect = mock_run_task # type: ignore + runner.blueapi_client.abort.side_effect = remote_abort_requested.set # type: ignore + + fut = executor.submit(request_shutdown) + try: + with pytest.raises(RunEngineInterrupted): + runner.context.run_engine(mock_baton_handler_loop()) + finally: + fut.result(1) + + +def test_exception_during_callback_check_raises_plan_error( + runner: SupervisorRunner, load_centre_collect_params: LoadCentreCollect +): + with patch.object( + runner, + "check_external_callbacks_are_alive", + side_effect=RuntimeError("Simulated exception"), + ): + with pytest.raises(PlanError, match="Exception raised.*: Simulated exception"): + runner.context.run_engine( + runner.decode_and_execute(TEST_VISIT, [load_centre_collect_params]) + ) + + +def test_unrecognised_instruction_raises_assertion_error(runner: SupervisorRunner): + with pytest.raises(AssertionError, match="Unsupported instruction"): + runner.context.run_engine( + runner.decode_and_execute( + TEST_VISIT, + [ + MxBlueskyParameters.model_validate( + {"parameter_model_version": get_param_version()} + ) + ], + ) + ) diff --git a/tests/unit_tests/hyperion/test_main_system.py b/tests/unit_tests/hyperion/test_main_system.py index d74b4fca6..8abdcc464 100644 --- a/tests/unit_tests/hyperion/test_main_system.py +++ b/tests/unit_tests/hyperion/test_main_system.py @@ -11,9 +11,10 @@ from sys import argv from time import sleep from typing import Any -from unittest.mock import ANY, MagicMock, patch +from unittest.mock import ANY, MagicMock, call, patch import pytest +from blueapi.config import ApplicationConfig from blueapi.core import BlueskyContext from dodal.devices.attenuator.attenuator import BinaryFilterAttenuator from dodal.devices.baton import Baton @@ -46,7 +47,7 @@ HyperionMode, parse_cli_args, ) -from mx_bluesky.hyperion.parameters.constants import CONST +from mx_bluesky.hyperion.parameters.constants import CONST, HyperionConstants from mx_bluesky.hyperion.parameters.gridscan import HyperionSpecifiedThreeDGridScan from mx_bluesky.hyperion.plan_runner import PlanRunner from mx_bluesky.hyperion.runner import GDARunner @@ -649,6 +650,83 @@ def test_hyperion_in_gda_mode_doesnt_start_udc_loop( mock_gda_runner.assert_called_once() +@patch( + "sys.argv", + new=["hyperion", "--mode", "supervisor", "--supervisor-config", "test_config"], +) +def test_hyperion_in_supervisor_mode_requires_client_config_option(): + with pytest.raises( + RuntimeError, + match="BlueAPI client configuration file must be specified in supervisor mode.", + ): + main() + + +@patch( + "sys.argv", + new=["hyperion", "--mode", "supervisor", "--client-config", "test_config"], +) +def test_hyperion_in_supervisor_mode_requires_supervisor_config_option(): + with pytest.raises( + RuntimeError, + match="BlueAPI supervisor configuration file must be specified in supervisor mode.", + ): + main() + + +@pytest.fixture +def mock_supervisor_mode(): + parent = MagicMock() + with patch.multiple( + "mx_bluesky.hyperion.__main__", + ConfigLoader=parent.ConfigLoader, + BlueskyContext=parent.BlueskyContext, + run_forever=parent.run_forever, + signal=parent.signal, + SupervisorRunner=parent.SupervisorRunner, + ): + yield parent + + +@patch( + "sys.argv", + new=[ + "hyperion", + "--mode", + "supervisor", + "--client-config", + "test_client_config", + "--supervisor-config", + "test_supervisor_config", + ], +) +@patch("mx_bluesky.hyperion.__main__.run_forever", MagicMock()) +def test_hyperion_in_supervisor_mode_creates_rest_server_on_supervisor_port( + mock_supervisor_mode: MagicMock, + mock_create_udc_server: MagicMock, +): + mock_supervisor_mode.ConfigLoader.return_value.load.side_effect = [ + "client_config", + "supervisor_config", + ] + main() + mock_supervisor_mode.assert_has_calls( + [ + call.ConfigLoader(ApplicationConfig), + call.ConfigLoader().use_values_from_yaml(Path("test_client_config")), + call.ConfigLoader().load(), + call.ConfigLoader(ApplicationConfig), + call.ConfigLoader().use_values_from_yaml(Path("test_supervisor_config")), + call.ConfigLoader().load(), + call.BlueskyContext(configuration="supervisor_config"), + call.SupervisorRunner(ANY, "client_config", False), + ] + ) + mock_create_udc_server.assert_called_once_with( + ANY, HyperionConstants.SUPERVISOR_PORT + ) + + @patch("mx_bluesky.hyperion.__main__.Api") @patch("mx_bluesky.hyperion.__main__.setup_context", MagicMock()) @patch("mx_bluesky.hyperion.baton_handler.find_device_in_context", MagicMock())