diff --git a/pyproject.toml b/pyproject.toml index 4a2b16698..34f7d5d43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ "ophyd >= 1.10.5", "ophyd-async >= 0.14.0", "bluesky >= 1.14.6", - "dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@62960e0e587bf86943ce1b581848fa131ef884d5", + "dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@1726_beam_centre_device_post_testing", ] diff --git a/src/mx_bluesky/Getting started.ipynb b/src/mx_bluesky/Getting started.ipynb index 6b539f072..2c2d0f716 100644 --- a/src/mx_bluesky/Getting started.ipynb +++ b/src/mx_bluesky/Getting started.ipynb @@ -52,6 +52,7 @@ "import importlib\n", "\n", "from dodal.utils import collect_factories\n", + "\n", "beamline = \"i02_2\"\n", "module_name = f\"dodal.beamlines.{beamline}\"\n", "beamline_module = importlib.import_module(module_name)\n", diff --git a/src/mx_bluesky/beamlines/i04/__init__.py b/src/mx_bluesky/beamlines/i04/__init__.py index 560406a30..4ba33d98e 100644 --- a/src/mx_bluesky/beamlines/i04/__init__.py +++ b/src/mx_bluesky/beamlines/i04/__init__.py @@ -2,6 +2,8 @@ i04_default_grid_detect_and_xray_centre, ) from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( + find_beam_centres, + optimise_transmission_with_oav, take_oav_image_with_scintillator_in, ) from mx_bluesky.beamlines.i04.thawing_plan import ( @@ -16,4 +18,6 @@ "i04_default_grid_detect_and_xray_centre", "thaw_and_murko_centre", "take_oav_image_with_scintillator_in", + "optimise_transmission_with_oav", + "find_beam_centres", ] diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index 0d47b69cc..03f64b934 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -6,8 +6,10 @@ from dodal.common import inject from dodal.devices.attenuator.attenuator import BinaryFilterAttenuator from dodal.devices.backlight import Backlight +from dodal.devices.i04.beam_centre import CentreEllipseMethod +from dodal.devices.i04.max_pixel import MaxPixel from dodal.devices.mx_phase1.beamstop import Beamstop, BeamstopPositions -from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_detector import OAV, ZoomControllerWithBeamCentres from dodal.devices.robot import BartRobot, PinMounted from dodal.devices.scintillator import InOut, Scintillator from dodal.devices.xbpm_feedback import XBPMFeedback @@ -19,6 +21,7 @@ from ophyd_async.core import InOut as core_INOUT from mx_bluesky.common.utils.exceptions import BeamlineStateError +from mx_bluesky.common.utils.log import LOGGER initial_wait_group = "Wait for scint to move in" @@ -48,21 +51,35 @@ def take_oav_image_with_scintillator_in( defaults are always correct. """ + LOGGER.info("prearing beamline") yield from _prepare_beamline_for_scintillator_images( - robot, beamstop, backlight, scintillator, xbpm_feedback, initial_wait_group + robot, + beamstop, + backlight, + scintillator, + xbpm_feedback, + shutter, + initial_wait_group, ) - + LOGGER.info("setting transmission") yield from bps.abs_set(attenuator, transmission, group=initial_wait_group) if image_name is None: image_name = f"{time.time_ns()}ATT{transmission * 100}" - + LOGGER.info(f"using image name {image_name}") + LOGGER.info("Waiting for initial_wait_group...") yield from bps.wait(initial_wait_group) + LOGGER.info("Opening shutter...") + yield from bps.abs_set(shutter.control_mode, ZebraShutterControl.MANUAL, wait=True) yield from bps.abs_set(shutter, ZebraShutterState.OPEN, wait=True) - take_and_save_oav_image(file_path=image_path, file_name=image_name, oav=oav) + LOGGER.info("Taking image...") + + yield from take_and_save_oav_image( + file_path=image_path, file_name=image_name, oav=oav + ) def _prepare_beamline_for_scintillator_images( @@ -71,10 +88,11 @@ def _prepare_beamline_for_scintillator_images( backlight: Backlight, scintillator: Scintillator, xbpm_feedback: XBPMFeedback, + shutter: ZebraShutter, group: str, ) -> MsgGenerator: """ - Prepares the beamline for oav image by making sure the pin is NOT mounted and + Prepares the beamline for oav image by making sure the pin is not mounted and the beam is on (feedback check). Finally, the scintillator is moved in. """ pin_mounted = yield from bps.rd(robot.gonio_pin_sensor) @@ -91,6 +109,9 @@ def _prepare_beamline_for_scintillator_images( yield from bps.abs_set(scintillator.selected_pos, InOut.IN, group=group) + yield from bps.abs_set(shutter.control_mode, ZebraShutterControl.MANUAL, wait=True) + yield from bps.abs_set(shutter, ZebraShutterState.OPEN, wait=True) + def take_and_save_oav_image( file_name: str, @@ -109,7 +130,174 @@ def take_and_save_oav_image( if not os.path.exists(full_file_path): yield from bps.abs_set(oav.snapshot.filename, file_name, group=group) yield from bps.abs_set(oav.snapshot.directory, file_path, group=group) - yield from bps.wait(group) + yield from bps.wait(group, timeout=60) yield from bps.trigger(oav.snapshot, wait=True) else: raise FileExistsError("OAV image file path already exists") + + +def _max_pixel_at_transmission( + max_pixel: MaxPixel, + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + transmission: float, +): + yield from bps.trigger(xbpm_feedback, wait=True) + yield from bps.mv(attenuator, transmission) + yield from bps.trigger(max_pixel, wait=True) + return (yield from bps.rd(max_pixel.max_pixel_val)) + + +def optimise_transmission_with_oav( + upper_bound: float = 100, + lower_bound: float = 0, + target_brightness_fraction: float = 0.75, + tolerance: int = 5, + max_iterations: int = 10, + max_pixel: MaxPixel = inject("max_pixel"), + attenuator: BinaryFilterAttenuator = inject("attenuator"), + xbpm_feedback: XBPMFeedback = inject("xbpm_feedback"), +) -> MsgGenerator: + """ + Plan to find the optimal oav transmission. First the brightest pixel at 100% + transmission is taken. A fraction of this (target_brightness_fraction) is taken + as the target - as in the optimal transmission will have it's max pixel as the set + target. A binary search is used to reach this. + Args: + upper_bound: Maximum transmission which will be searched. In percent. + lower_bound: Minimum transmission which will be searched. In percent. + target_brightness_fraction: Fraction of the brightest pixel at 100% + transmission which should be used as the target max pixel brightness. + tolerance: Amount the brightness can be off by and still find a match. + max_iterations: Maximum amount of iterations. + """ + + if upper_bound < lower_bound: + raise ValueError( + f"Upper bound ({upper_bound}) must be higher than lower bound {lower_bound}" + ) + + brightest_pixel_at_full_beam = yield from _max_pixel_at_transmission( + max_pixel, attenuator, xbpm_feedback, 1 + ) + + if brightest_pixel_at_full_beam == 0: + raise ValueError("No beam found at full transmission") + + target_pixel_brightness = brightest_pixel_at_full_beam * target_brightness_fraction + LOGGER.info( + f"Optimising until max pixel in image has a value of {target_pixel_brightness}" + ) + + iterations = 0 + + while iterations < max_iterations: + mid = round((upper_bound + lower_bound) / 2, 2) # limit to 2 dp + LOGGER.info(f"On iteration {iterations}") + + brightest_pixel = yield from _max_pixel_at_transmission( + max_pixel, attenuator, xbpm_feedback, mid / 100 + ) + + LOGGER.info(f"Upper bound is: {upper_bound}, Lower bound is: {lower_bound}") + LOGGER.info( + f"Testing transmission {mid}, brightest pixel found {brightest_pixel}" + ) + + if ( + target_pixel_brightness - tolerance + <= brightest_pixel + <= target_pixel_brightness + tolerance + ): + mid = round(mid, 0) + LOGGER.info(f"\nOptimal transmission found: {mid}") + return mid + + # condition for too low so want to try higher + elif brightest_pixel < target_pixel_brightness - tolerance: + LOGGER.info("Result: Too low \n") + lower_bound = mid + + # condition for too high so want to try lower + elif brightest_pixel > target_pixel_brightness + tolerance: + LOGGER.info("Result: Too high \n") + upper_bound = mid + iterations += 1 + raise StopIteration("Max iterations reached") + + +def _get_all_zoom_levels( + zoom_controller: ZoomControllerWithBeamCentres, +) -> MsgGenerator[list[str]]: + zoom_levels = [] + level_signals = [ + centring_device.level_name + for centring_device in zoom_controller.beam_centres.values() + ] + for signal in level_signals: + level_name = yield from bps.rd(signal) + if level_name: + zoom_levels.append(level_name) + return zoom_levels + + +def find_beam_centres( + zoom_levels_to_centre: list[str] | None = None, + zoom_levels_to_optimise_transmission: list[str] | None = None, + robot: BartRobot = inject("robot"), + beamstop: Beamstop = inject("beamstop"), + backlight: Backlight = inject("backlight"), + scintillator: Scintillator = inject("scintillator"), + xbpm_feedback: XBPMFeedback = inject("xbpm_feedback"), + max_pixel: MaxPixel = inject("max_pixel"), + centre_ellipse: CentreEllipseMethod = inject("beam_centre"), + attenuator: BinaryFilterAttenuator = inject("attenuator"), + zoom_controller: ZoomControllerWithBeamCentres = inject("zoom_controller"), + shutter: ZebraShutter = inject("sample_shutter"), +) -> MsgGenerator: + """ + zoom_levels: The levels to do centring at, by default runs at all known zoom levels. + """ + if zoom_levels_to_optimise_transmission is None: + zoom_levels_to_optimise_transmission = ["1.0x", "7.5x"] + + if zoom_levels_to_centre is None: + zoom_levels_to_centre = yield from _get_all_zoom_levels(zoom_controller) + + LOGGER.info("Preparing beamline for images...") + yield from _prepare_beamline_for_scintillator_images( + robot, + beamstop, + backlight, + scintillator, + xbpm_feedback, + shutter, + initial_wait_group, + ) + + for centring_device in zoom_controller.beam_centres.values(): + zoom_name = yield from bps.rd(centring_device.level_name) + if zoom_name in zoom_levels_to_centre: + LOGGER.info(f"Moving to zoom level {zoom_name}") + yield from bps.abs_set(zoom_controller, zoom_name, wait=True) + if zoom_name in zoom_levels_to_optimise_transmission: + LOGGER.info(f"Optimising transmission at zoom level {zoom_name}") + yield from optimise_transmission_with_oav( + 100, + 0, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + + yield from bps.trigger(centre_ellipse, wait=True) + centre_x = yield from bps.rd(centre_ellipse.center_x_val) + centre_y = yield from bps.rd(centre_ellipse.center_y_val) + centre_x = round(centre_x) + centre_y = round(centre_y) + LOGGER.info(f"Writing centre values ({centre_x}, {centre_y}) to OAV PVs") + yield from bps.mv( + centring_device.x_centre, centre_x, centring_device.y_centre, centre_y + ) + + LOGGER.info("Done!") diff --git a/tests/conftest.py b/tests/conftest.py index 6c5ebc5ab..b8b62b5b0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ from pathlib import Path from types import ModuleType from typing import Any -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import MagicMock, patch import numpy import pydantic @@ -510,13 +510,7 @@ def oav(test_config_files): ) oav = i03.oav.build(mock=True, connect_immediately=True, params=parameters) - zoom_levels_list = ["1.0x", "3.0x", "5.0x", "7.5x", "10.0x", "15.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) - # Equivalent to previously set values for microns and beam centre set_mock_value(oav.zoom_controller.level, "5.0x") - set_mock_value(oav.grid_snapshot.x_size, 1024) set_mock_value(oav.grid_snapshot.y_size, 768) diff --git a/tests/system_tests/conftest.py b/tests/system_tests/conftest.py index 75d97ff56..12210ec4c 100644 --- a/tests/system_tests/conftest.py +++ b/tests/system_tests/conftest.py @@ -201,10 +201,6 @@ async def trigger_with_test_image(self): ): mock_get.return_value.__aenter__.return_value = empty_response set_mock_value(oav.zoom_controller.level, "1.0x") - zoom_levels_list = ["1.0x", "3.0x", "5.0x", "7.5x", "10.0x", "15.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) yield oav diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index 8893a3261..9a5428911 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -1,12 +1,15 @@ -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, call, patch +import bluesky.plan_stubs as bps import pytest from bluesky.run_engine import RunEngine from bluesky.simulators import RunEngineSimulator, assert_message_and_return_remaining from dodal.devices.attenuator.attenuator import BinaryFilterAttenuator from dodal.devices.backlight import Backlight +from dodal.devices.i04.beam_centre import CentreEllipseMethod +from dodal.devices.i04.max_pixel import MaxPixel from dodal.devices.mx_phase1.beamstop import Beamstop, BeamstopPositions -from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_detector import OAV, ZoomControllerWithBeamCentres from dodal.devices.robot import BartRobot, PinMounted from dodal.devices.scintillator import InOut, Scintillator from dodal.devices.xbpm_feedback import XBPMFeedback @@ -15,10 +18,17 @@ ZebraShutterControl, ZebraShutterState, ) -from ophyd_async.core import set_mock_value +from ophyd_async.core import ( + completed_status, + get_mock_put, + init_devices, + set_mock_value, +) from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( _prepare_beamline_for_scintillator_images, + find_beam_centres, + optimise_transmission_with_oav, take_and_save_oav_image, take_oav_image_with_scintillator_in, ) @@ -56,11 +66,18 @@ def test_prepare_beamline_for_scint_images( backlight: Backlight, scintillator: Scintillator, xbpm_feedback: XBPMFeedback, + sample_shutter: ZebraShutter, ): test_group = "my_group" messages = sim_run_engine.simulate_plan( _prepare_beamline_for_scintillator_images( - robot, beamstop_phase1, backlight, scintillator, xbpm_feedback, test_group + robot, + beamstop_phase1, + backlight, + scintillator, + xbpm_feedback, + sample_shutter, + test_group, ) ) @@ -251,3 +268,629 @@ async def test_take_and_save_oav_image_in_re(run_engine: RunEngine, oav: OAV, tm assert await oav.snapshot.filename.get_value() == expected_filename assert await oav.snapshot.directory.get_value() == str(expected_directory) oav.snapshot.trigger.assert_called_once() # type: ignore + + +@pytest.fixture() +async def max_pixel() -> MaxPixel: + async with init_devices(mock=True): + max_pixel = MaxPixel("TEST: MAX_PIXEL", "max_pixel") + + max_pixel.trigger = MagicMock(return_value=completed_status()) + return max_pixel + + +def test_optimise_transmission_first_gets_max_pixel_at_100_percent( + sim_run_engine: RunEngineSimulator, + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, +): + max_values = [100, 75] + + def return_max_values(_): + return {"readback": {"value": max_values.pop(0)}} + + sim_run_engine.add_handler("read", return_max_values, max_pixel.max_pixel_val.name) + + messages = sim_run_engine.simulate_plan( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ) + + messages = assert_message_and_return_remaining( + messages, + lambda msg: msg.command == "trigger" and msg.obj == xbpm_feedback, + ) + + messages = assert_message_and_return_remaining( + messages, + lambda msg: msg.command == "set" and msg.obj == attenuator and msg.args[0] == 1, + ) + + messages = assert_message_and_return_remaining( + messages, lambda msg: msg.command == "trigger" and msg.obj == max_pixel + ) + + messages = assert_message_and_return_remaining( + messages, + lambda msg: msg.command == "read" and msg.obj == max_pixel.max_pixel_val, + ) + + +@pytest.mark.parametrize("iterations", [10, 6, 4]) +def test_given_max_pixel_never_changes_then_optimise_transmission_raises_stop_iteration( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + iterations: int, +): + set_mock_value(max_pixel.max_pixel_val, 100) + + with pytest.raises(RuntimeError) as e: + run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + max_iterations=iterations, + ) + ) + + # The RE hides the StopIteration behind a RuntimeError but will mention it in the message + assert "StopIteration" in e.value.args[0] + assert max_pixel.trigger.call_count == iterations + 1 # type: ignore + + +def given_max_values(max_pixel: MaxPixel, max_values: list): + def _set_max_value(): + set_mock_value(max_pixel.max_pixel_val, max_values.pop(0)) + return completed_status() + + max_pixel.trigger.side_effect = _set_max_value # type: ignore + + +@pytest.mark.parametrize( + "lower_bound, upper_bound, expected_final_transmission", + [[0, 100, 50], [0, 10, 5], [5, 25, 15]], +) +def test_given_max_pixel_immediately_reaches_target_then_optimise_transmission_returns_half_bounds( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + lower_bound: int, + upper_bound: int, + expected_final_transmission: int, +): + given_max_values(max_pixel, [100, 75]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + lower_bound=lower_bound, + upper_bound=upper_bound, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == expected_final_transmission + + assert attenuator.set.call_args_list == [ # type: ignore + call(1), + call(expected_final_transmission / 100), + ] + + +@pytest.mark.parametrize( + "target_fraction", + [0.75, 0.26, 0.39], +) +def test_optimise_transmission_reaches_different_target_fractions( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + target_fraction: float, +): + given_max_values(max_pixel, [100, 100 * target_fraction]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + target_brightness_fraction=target_fraction, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == 50 + + assert attenuator.set.call_args_list == [call(1), call(0.5)] # type: ignore + + +def test_max_pixel_stays_too_large_then_optimise_transmission_keeps_reducing( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, +): + given_max_values(max_pixel, [100, 100, 100, 100, 100, 75]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == 3.0 + + assert attenuator.set.call_args_list == [ # type: ignore + call(1), + call(0.5), + call(pytest.approx(0.25)), + call(pytest.approx(0.125)), + call(pytest.approx(0.0625)), + call(pytest.approx(0.0312)), + ] + + +def test_max_pixel_stays_too_small_then_optimise_transmission_keeps_increasing( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, +): + given_max_values(max_pixel, [100, 20, 20, 20, 20, 75]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == 97.0 + + assert attenuator.set.call_args_list == [ # type: ignore + call(1), + call(0.5), + call(pytest.approx(0.75)), + call(pytest.approx(0.875)), + call(pytest.approx(0.9375)), + call(pytest.approx(0.9688)), + ] + + +@pytest.mark.parametrize( + "tolerance, expected_final_transmission, expected_calls", + [ + (10, 50.0, [call(1), call(0.5)]), + (3, 75.0, [call(1), call(0.5), call(0.75)]), + ], +) +def test_different_tolerances_change_when_we_accept( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + tolerance: int, + expected_final_transmission: float, + expected_calls: list, +): + given_max_values(max_pixel, [100, 68, 75]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + tolerance=tolerance, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == expected_final_transmission + assert attenuator.set.call_args_list == expected_calls # type: ignore + + +def test_brightness_alternates_above_then_below_target_bounds_shrink_both_sides( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, +): + given_max_values(max_pixel, [100, 90, 60, 85, 65, 75]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == 34.0 + + # Note the 2 dp rounding on set values: + assert attenuator.set.call_args_list == [ # type: ignore + call(1), + call(0.5), + call(0.25), + call(pytest.approx(0.375)), + call(pytest.approx(0.3125)), + call(pytest.approx(0.3438)), + ] + + +@pytest.mark.parametrize( + "edge_value", + [70, 80], +) +def test_equal_to_target_plus_or_minus_tolerance_matches_target( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + edge_value: int, +): + given_max_values(max_pixel, [100, edge_value]) + + plan = optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + plan_result = run_engine(plan).plan_result # type: ignore + + assert plan_result == 50 + assert attenuator.set.call_args_list == [call(1), call(0.5)] # type:ignore + + +def test_optimise_transmission_raises_value_error_when_upper_bound_less_than_lower_bound( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, +): + with pytest.raises(ValueError) as excinfo: + run_engine( + optimise_transmission_with_oav( + lower_bound=60, + upper_bound=40, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ) + assert "Upper bound (40) must be higher than lower bound 60" in str(excinfo.value) + + # Ensure nothing was moved/triggered since the + assert attenuator.set.call_count == 0 # type: ignore + assert xbpm_feedback.trigger.call_count == 0 # type: ignore + + +def test_optimise_transmission_raises_value_error_when_full_beam_brightness_is_zero( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, +): + given_max_values(max_pixel, [0]) + + with pytest.raises(ValueError) as excinfo: + run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ) + + assert "No beam" in str(excinfo.value) + + assert attenuator.set.call_count == 1 # type:ignore + + +@pytest.fixture() +async def centre_ellipse() -> CentreEllipseMethod: + async with init_devices(mock=True): + centre_ellipse = CentreEllipseMethod("", "centre_ellipse") + + centre_ellipse.trigger = MagicMock(return_value=completed_status()) + return centre_ellipse + + +def initialise_zoom_centres( + zoom_controller: ZoomControllerWithBeamCentres, init_values: dict +): + for i, (level, beam_centre) in enumerate(init_values.items()): + centre_device = zoom_controller.beam_centres[i] + set_mock_value(centre_device.level_name, level) + set_mock_value(centre_device.x_centre, beam_centre[0]) + set_mock_value(centre_device.y_centre, beam_centre[1]) + + +@pytest.fixture() +async def zoom_controller_with_centres() -> ZoomControllerWithBeamCentres: + async with init_devices(mock=True): + zoom_controller_with_centres = ZoomControllerWithBeamCentres( + "", "zoom_controller_with_centres" + ) + + level_names = ["1.0x", "2.0x", "3.0x", "7.5x"] + initialise_zoom_centres( + zoom_controller_with_centres, dict.fromkeys(level_names, (0, 0)) + ) + + return zoom_controller_with_centres + + +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +def test_find_beam_centres_starts_by_prepping_scintillator( + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + zoom_controller_with_centres.beam_centres = {} # type:ignore + run_engine( + find_beam_centres( + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + mock_prepare_scintillator.assert_called_once() + centre_ellipse.trigger.assert_not_called() # type:ignore + + +def mock_centre_ellipse_with_given_centres( + centre_ellipse: CentreEllipseMethod, given_centres: list[tuple[int, int]] +): + def centre_ellipse_trigger_side_effect(*args): + next_centre = given_centres.pop(0) + set_mock_value(centre_ellipse.center_x_val, next_centre[0]) + set_mock_value(centre_ellipse.center_y_val, next_centre[1]) + return completed_status() + + centre_ellipse.trigger.side_effect = centre_ellipse_trigger_side_effect # type:ignore + + +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.optimise_transmission_with_oav" +) +async def test_find_beam_centres_iterates_and_sets_centres( + mock_optimise: AsyncMock, + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + level_names = ["1.0x", "2.0x", "3.0x", "7.5x"] + new_centres = [(100, 100), (200, 200), (300, 300), (400, 400)] + expected_centres = new_centres.copy() + + mock_centre_ellipse_with_given_centres(centre_ellipse, new_centres) + + run_engine( + find_beam_centres( + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + + assert get_mock_put(zoom_controller_with_centres.level).call_count == 4 + assert get_mock_put(zoom_controller_with_centres.level).call_args_list == [ + call(level, wait=True) for level in level_names + ] + + for i, centre in zoom_controller_with_centres.beam_centres.items(): + level_name = await centre.level_name.get_value() + if level_name: + assert level_name == level_names[i] + assert (await centre.x_centre.get_value()) == expected_centres[i][0] + assert (await centre.y_centre.get_value()) == expected_centres[i][1] + + +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.optimise_transmission_with_oav" +) +async def test_if_only_some_levels_given_then_find_beam_centres_iterates_and_sets_those_centres( + mock_optimise: AsyncMock, + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + new_centres = [(100, 100), (200, 200), (300, 300), (400, 400)] + + mock_centre_ellipse_with_given_centres(centre_ellipse, new_centres) + + run_engine( + find_beam_centres( + zoom_levels_to_centre=["1.0x", "7.5x"], + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + + assert get_mock_put(zoom_controller_with_centres.level).call_count == 2 + assert get_mock_put(zoom_controller_with_centres.level).call_args_list == [ + call(level, wait=True) for level in ["1.0x", "7.5x"] + ] + + centres = list(zoom_controller_with_centres.beam_centres.values()) + + assert (await centres[0].level_name.get_value()) == "1.0x" + assert (await centres[0].x_centre.get_value()) == 100 + assert (await centres[0].y_centre.get_value()) == 100 + + for centre in [centres[1], centres[2]]: + assert (await centre.x_centre.get_value()) == 0 + assert (await centre.y_centre.get_value()) == 0 + + assert (await centres[3].level_name.get_value()) == "7.5x" + assert (await centres[3].x_centre.get_value()) == 200 + assert (await centres[3].y_centre.get_value()) == 200 + + +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.optimise_transmission_with_oav" +) +async def test_find_beam_centres_optimises_on_default_levels_only( + mock_optimise: MagicMock, + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + levels_where_optimised = [] + + def append_current_zoom(*args, **kwargs): + current_zoom = yield from bps.rd(zoom_controller_with_centres.level) + levels_where_optimised.append(current_zoom) + yield from bps.null() + + mock_optimise.side_effect = append_current_zoom + + run_engine( + find_beam_centres( + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + + assert mock_optimise.call_count == 2 + assert levels_where_optimised == ["1.0x", "7.5x"] + + +@pytest.mark.asyncio +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.optimise_transmission_with_oav" +) +async def test_find_beam_centres_respects_custom_optimise_list( + mock_optimise: MagicMock, + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + levels_where_optimised = [] + + def append_current_zoom(*args, **kwargs): + current_zoom = yield from bps.rd(zoom_controller_with_centres.level) + levels_where_optimised.append(current_zoom) + yield from bps.null() + + mock_optimise.side_effect = append_current_zoom + + run_engine( + find_beam_centres( + zoom_levels_to_optimise_transmission=["2.0x", "3.0x"], + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + + assert mock_optimise.call_count == 2 + assert levels_where_optimised == ["2.0x", "3.0x"] diff --git a/tests/unit_tests/beamlines/i04/test_thawing.py b/tests/unit_tests/beamlines/i04/test_thawing.py index 9b1fb7eb1..409d9fac5 100644 --- a/tests/unit_tests/beamlines/i04/test_thawing.py +++ b/tests/unit_tests/beamlines/i04/test_thawing.py @@ -1,6 +1,6 @@ import json from functools import partial -from unittest.mock import ANY, AsyncMock, MagicMock, call, patch +from unittest.mock import ANY, MagicMock, call, patch import pytest from bluesky.run_engine import RunEngine @@ -46,10 +46,6 @@ async def oav_full_screen() -> OAV: oav = OAVBeamCentrePV( "", config=oav_config, name="oav_full_screen", mjpeg_prefix="XTAL" ) - zoom_levels_list = ["1.0x", "2.0x", "5.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) set_mock_value(oav.zoom_controller.level, "1.0x") set_mock_value(oav.grid_snapshot.x_size, 1024) set_mock_value(oav.grid_snapshot.y_size, 768) @@ -61,11 +57,6 @@ async def oav_roi() -> OAV: oav_config = OAVConfig(ZOOM_LEVELS_XML) async with init_devices(mock=True, connect=True): oav = OAVBeamCentrePV("", config=oav_config, name="oav") - zoom_levels_list = ["1.0x", "2.0x", "5.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) - set_mock_value(oav.zoom_controller.level, "5.0x") set_mock_value(oav.grid_snapshot.x_size, 512) set_mock_value(oav.grid_snapshot.y_size, 384) diff --git a/tests/unit_tests/common/experiment_plans/test_grid_detection_plan.py b/tests/unit_tests/common/experiment_plans/test_grid_detection_plan.py index 1892e270f..88ee27981 100644 --- a/tests/unit_tests/common/experiment_plans/test_grid_detection_plan.py +++ b/tests/unit_tests/common/experiment_plans/test_grid_detection_plan.py @@ -60,10 +60,6 @@ def fake_devices( test_config_files["zoom_params_file"], test_config_files["display_config"] ) oav = i03.oav.build(connect_immediately=True, mock=True, params=params) - zoom_levels_list = ["1.0x", "3.0x", "5.0x", "7.5x", "10.0x", "15.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) set_mock_value(oav.zoom_controller.level, "5.0x") set_mock_value(oav.grid_snapshot.x_size, 1024) set_mock_value(oav.grid_snapshot.y_size, 768)