diff --git a/src/mx_bluesky/beamlines/i04/callbacks/murko_callback.py b/src/mx_bluesky/beamlines/i04/callbacks/murko_callback.py index ade2bb9b86..782a43e27c 100644 --- a/src/mx_bluesky/beamlines/i04/callbacks/murko_callback.py +++ b/src/mx_bluesky/beamlines/i04/callbacks/murko_callback.py @@ -7,6 +7,7 @@ from dodal.log import LOGGER from event_model.documents import Event, RunStart, RunStop from redis import StrictRedis +from redis.exceptions import ConnectionError FORWARDING_COMPLETE_MESSAGE = "image_forwarding_complete" @@ -57,7 +58,20 @@ def __init__(self, redis_host: str, redis_password: str, redis_db: int = 0): self.last_uuid = None self.previous_omegas: list[OmegaReading] = [] + def _check_redis_connection(self): + try: + self.redis_client.ping() + return True + except ConnectionError: + LOGGER.warning( + f"Failed to connect to redis: {self.redis_client}. Murko callback will not run" + ) + return False + def start(self, doc: RunStart) -> RunStart | None: + self.redis_connected = self._check_redis_connection() + if not self.redis_connected: + return doc self.murko_metadata: dict = {"sample_id": doc.get("sample_id")} self.last_uuid = None self.previous_omegas = [] @@ -67,6 +81,8 @@ def start(self, doc: RunStart) -> RunStart | None: return doc def event(self, doc: Event) -> Event: + if not self.redis_connected: + return doc data = doc["data"] for prefix in ("oav", "oav_full_screen"): if f"{prefix}-beam_centre_j" in data: @@ -114,6 +130,8 @@ def call_murko(self, uuid: str, omega: float): self.redis_client.publish("murko", json.dumps(metadata)) def stop(self, doc: RunStop) -> RunStop | None: + if not self.redis_connected: + return doc LOGGER.info(f"Finished streaming {self.murko_metadata['sample_id']} to murko") LOGGER.info( f"Publishing forwarding complete message: {FORWARDING_COMPLETE_MESSAGE}" diff --git a/src/mx_bluesky/beamlines/i04/thawing_plan.py b/src/mx_bluesky/beamlines/i04/thawing_plan.py index b0c9973afa..3a3cd6bc21 100644 --- a/src/mx_bluesky/beamlines/i04/thawing_plan.py +++ b/src/mx_bluesky/beamlines/i04/thawing_plan.py @@ -88,6 +88,7 @@ def thaw_and_murko_centre( initial_zoom_level = yield from bps.rd(oav_fs.zoom_controller.level) initial_velocity = yield from bps.rd(smargon.omega.velocity) new_velocity = abs(rotation / time_to_thaw) * 2.0 + murko_callback = MurkoCallback( RedisConstants.REDIS_HOST, RedisConstants.REDIS_PASSWORD, diff --git a/tests/unit_tests/beamlines/i04/callbacks/test_murko_callback.py b/tests/unit_tests/beamlines/i04/callbacks/test_murko_callback.py index 71d71f7158..717cf649d8 100644 --- a/tests/unit_tests/beamlines/i04/callbacks/test_murko_callback.py +++ b/tests/unit_tests/beamlines/i04/callbacks/test_murko_callback.py @@ -1,5 +1,5 @@ import json -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from event_model import Event @@ -275,3 +275,26 @@ def test_when_murko_called_with_full_screen_and_roi_event_then_metadata_updates_ ) def test_extrapolate_omega(latest_omega, previous_omega, now, expected): assert extrapolate_omega(latest_omega, previous_omega, now) == expected + + +@patch( + "mx_bluesky.beamlines.i04.callbacks.murko_callback.MurkoCallback._check_redis_connection" +) +def test_if_redis_connection_fails_then_there_is_no_error( + mock_check_redis_connection: MagicMock, +): + mock_check_redis_connection.return_value = False + callback = MurkoCallback("", "") + doc = {} + callback.start(doc) + callback.event(doc) + callback.stop(doc) + + +def test_warning_is_logged_if_redis_connection_fails(caplog): + callback = MurkoCallback("", "") + doc = {} + callback.start(doc) + log_message = caplog.records[-1] + assert log_message.levelname == "WARNING" + assert "Failed to connect to redis: " in log_message.message diff --git a/tests/unit_tests/beamlines/i04/conftest.py b/tests/unit_tests/beamlines/i04/conftest.py index 51b4ec363e..88bd66d847 100644 --- a/tests/unit_tests/beamlines/i04/conftest.py +++ b/tests/unit_tests/beamlines/i04/conftest.py @@ -9,4 +9,5 @@ def murko_callback() -> MurkoCallback: callback = MurkoCallback("", "") callback.redis_client = MagicMock() + callback.redis_connected = True return callback diff --git a/tests/unit_tests/beamlines/i04/test_thawing.py b/tests/unit_tests/beamlines/i04/test_thawing.py index 9b1fb7eb14..2ce510b133 100644 --- a/tests/unit_tests/beamlines/i04/test_thawing.py +++ b/tests/unit_tests/beamlines/i04/test_thawing.py @@ -263,7 +263,9 @@ def test_thaw_and_stream_adds_murko_callback_and_produces_expected_messages( @patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback.stop") @patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback.call_murko") +@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback._check_redis_connection") def test_thaw_and_stream_will_produce_events_that_call_murko( + patch_check_redis_connection: MagicMock, patch_murko_call: MagicMock, patch_stop_call: MagicMock, smargon: Smargon, @@ -272,6 +274,8 @@ def test_thaw_and_stream_will_produce_events_that_call_murko( oav_forwarder: OAVToRedisForwarder, run_engine: RunEngine, ): + patch_check_redis_connection.return_value = True + class StopPlanError(Exception): pass @@ -367,7 +371,7 @@ def _run_thaw_and_stream_and_assert_zoom_changes( @patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback") def test_given_thaw_succeeds_then_thaw_and_stream_sets_zoom_to_1_and_back( - patch_murko_callback, + patch_murko_callback: MagicMock, smargon: Smargon, thawer: Thawer, oav_forwarder: OAVToRedisForwarder, @@ -383,8 +387,8 @@ def test_given_thaw_succeeds_then_thaw_and_stream_sets_zoom_to_1_and_back( @patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback") @patch("mx_bluesky.beamlines.i04.thawing_plan.bps.monitor") def test_given_thaw_fails_then_thaw_and_stream_sets_zoom_to_1_and_back( - mock__thaw, - patch_murko_callback, + mock__thaw: MagicMock, + patch_murko_callback: MagicMock, smargon: Smargon, thawer: Thawer, oav_forwarder: OAVToRedisForwarder, @@ -403,8 +407,8 @@ def test_given_thaw_fails_then_thaw_and_stream_sets_zoom_to_1_and_back( "mx_bluesky.beamlines.i04.thawing_plan._rotate_in_one_direction_and_stream_to_redis" ) def test_thaw_and_murko_centre_stages_and_unstages_murko_results_twice( - mock_rotate_and_stream, - patch_murko_callback, + mock_rotate_and_stream: MagicMock, + patch_murko_callback: MagicMock, smargon: Smargon, thawer: Thawer, oav_forwarder: OAVToRedisForwarder, @@ -424,8 +428,8 @@ def test_thaw_and_murko_centre_stages_and_unstages_murko_results_twice( @patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback") @patch("mx_bluesky.beamlines.i04.thawing_plan.bps.monitor") def test_given_thaw_and_murko_centre_errors_then_murko_results_still_unstaged( - mock__thaw, - patch_murko_callback, + mock__thaw: MagicMock, + patch_murko_callback: MagicMock, smargon: Smargon, thawer: Thawer, oav_forwarder: OAVToRedisForwarder, @@ -583,3 +587,46 @@ def test_thawing_plan_with_murko_callback_puts_correct_metadata_into_redis( assert publish_call_args_list[1].args[1] == json.dumps(FORWARDING_COMPLETE_MESSAGE) assert publish_call_args_list[2].args[1] == json.dumps(expected_roi_md) assert publish_call_args_list[3].args[1] == json.dumps(FORWARDING_COMPLETE_MESSAGE) + + +@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback._check_redis_connection") +def test_plans_carry_on_thaw_if_redis_connection_check_fails( + patch_callback_check_redis_connection: MagicMock, + smargon: Smargon, + thawer: Thawer, + robot: BartRobot, + oav_forwarder: OAVToRedisForwarder, + run_engine: RunEngine, +): + patch_callback_check_redis_connection.return_value = False + murko_results = MurkoResultsDevice() + murko_results._check_redis_connection = AsyncMock(return_value=False) + for plan in ( + thaw_and_murko_centre( + 10, + 360, + thawer=thawer, + smargon=smargon, + robot=robot, + oav_to_redis_forwarder=oav_forwarder, + murko_results=murko_results, + ), + thaw_and_stream_to_redis( + 10, + 360, + thawer=thawer, + smargon=smargon, + robot=robot, + oav_to_redis_forwarder=oav_forwarder, + ), + ): + run_engine(plan) + + omega_put = get_mock_put(smargon.omega.user_setpoint) + + assert omega_put.call_args_list == [ + call(360.0, wait=True), + call(0.0, wait=True), + ] + + omega_put.reset_mock()