From f28deaa0da4b8f895e38cc5c04a89509be428969 Mon Sep 17 00:00:00 2001 From: Oliver Silvester Date: Thu, 20 Nov 2025 14:57:10 +0000 Subject: [PATCH 01/18] WIP --- src/mx_bluesky/beamlines/i04/__init__.py | 3 +- .../i04/oav_centering_plans/oav_imaging.py | 48 ++++++++++++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/src/mx_bluesky/beamlines/i04/__init__.py b/src/mx_bluesky/beamlines/i04/__init__.py index cba5689920..172f319a64 100644 --- a/src/mx_bluesky/beamlines/i04/__init__.py +++ b/src/mx_bluesky/beamlines/i04/__init__.py @@ -2,7 +2,7 @@ i04_grid_detect_then_xray_centre, ) from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( - take_oav_image_with_scintillator_in, + take_oav_image_with_scintillator_in, optimise_oav_transmission_binary_search ) from mx_bluesky.beamlines.i04.thawing_plan import ( thaw, @@ -16,4 +16,5 @@ "i04_grid_detect_then_xray_centre", "thaw_and_murko_centre", "take_oav_image_with_scintillator_in", + "optimise_oav_transmission_binary_search" ] diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index 0d47b69cc4..d0c2a8f058 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -16,7 +16,10 @@ ZebraShutterControl, ZebraShutterState, ) +from dodal.devices.i04.max_pixel import MaxPixel from ophyd_async.core import InOut as core_INOUT +from mx_bluesky.common.utils.log import LOGGER + from mx_bluesky.common.utils.exceptions import BeamlineStateError @@ -109,7 +112,50 @@ def take_and_save_oav_image( if not os.path.exists(full_file_path): yield from bps.abs_set(oav.snapshot.filename, file_name, group=group) yield from bps.abs_set(oav.snapshot.directory, file_path, group=group) - yield from bps.wait(group) + yield from bps.wait(group, timeout=60) yield from bps.trigger(oav.snapshot, wait=True) else: raise FileExistsError("OAV image file path already exists") + +def optimise_oav_transmission_binary_search( + target_pixel_l: int, # this should be a fraction of the oversaturated luminosity + upper_bound: float, # in percent + lower_bound: float, # in percent + tolerance: int = 1, + max_iterations: int = 5, + max_pixel_lum: MaxPixel = inject("max_pixel"), + attenuator: BinaryFilterAttenuator = inject("attenuator"), +) -> MsgGenerator: + # # first we want to get the max_pixel from an oversaturated transmission and use + # yield from bps.mv(attenuator, upper_bound / 100, wait=True) + # brightest_pixel_sat = yield from bps.trigger(max_pixel_lum, wait=True) + # target_pixel_l = int(brightest_pixel_sat) * 0.5 + # print(f"~~Target luminosity: {target_pixel_l}~~\n") + while max_iterations > tolerance: + # may need to add logic to limit the amount of decimal points + mid = (upper_bound + lower_bound) / 2 + max_iterations -= 1 + + yield from bps.mv(attenuator, mid / 100) + yield from bps.trigger(max_pixel_lum, wait=True) + brightest_pixel = yield from bps.rd(max_pixel_lum.max_pixel_val) + + # brightest_pixel = get_max_pixel_value_from_transmission(transmission=mid) + LOGGER.info(f"Upper bound is: {upper_bound}, Lower bound is: {lower_bound}") + LOGGER.info(f"Testing transmission {mid}, brightest pixel found {brightest_pixel}") + + if target_pixel_l - tolerance < brightest_pixel < target_pixel_l + tolerance: + mid = round(mid, 0) + LOGGER.info(f"\nOptimal transmission found - {mid}") + return mid + + # condition for too low so want to try higher + elif brightest_pixel < target_pixel_l - tolerance: + LOGGER.info("Result: Too low \n") + lower_bound = mid + + # condition for too high so want to try lower + elif brightest_pixel > target_pixel_l + tolerance: + LOGGER.info("Result: Too high \n") + upper_bound = mid + return "Max iterations reached" From 462538c60564333aaf3523462f575e91673187c8 Mon Sep 17 00:00:00 2001 From: Srishty Date: Fri, 21 Nov 2025 17:36:41 +0000 Subject: [PATCH 02/18] Add logic for getting target pixel brightness from 100% transmission and the frac_of_max + add docstrings --- .../i04/oav_centering_plans/oav_imaging.py | 53 +++++++++++++------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index d0c2a8f058..5da87f15f3 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -6,6 +6,7 @@ from dodal.common import inject from dodal.devices.attenuator.attenuator import BinaryFilterAttenuator from dodal.devices.backlight import Backlight +from dodal.devices.i04.max_pixel import MaxPixel from dodal.devices.mx_phase1.beamstop import Beamstop, BeamstopPositions from dodal.devices.oav.oav_detector import OAV from dodal.devices.robot import BartRobot, PinMounted @@ -16,12 +17,10 @@ ZebraShutterControl, ZebraShutterState, ) -from dodal.devices.i04.max_pixel import MaxPixel from ophyd_async.core import InOut as core_INOUT -from mx_bluesky.common.utils.log import LOGGER - from mx_bluesky.common.utils.exceptions import BeamlineStateError +from mx_bluesky.common.utils.log import LOGGER initial_wait_group = "Wait for scint to move in" @@ -79,6 +78,10 @@ def _prepare_beamline_for_scintillator_images( """ Prepares the beamline for oav image by making sure the pin is NOT mounted and the beam is on (feedback check). Finally, the scintillator is moved in. + + Args: + devices: These are the specific ophyd-devices used for the plan, the + defaults are always correct. """ pin_mounted = yield from bps.rd(robot.gonio_pin_sensor) if pin_mounted == PinMounted.PIN_MOUNTED: @@ -117,32 +120,48 @@ def take_and_save_oav_image( else: raise FileExistsError("OAV image file path already exists") + def optimise_oav_transmission_binary_search( - target_pixel_l: int, # this should be a fraction of the oversaturated luminosity - upper_bound: float, # in percent - lower_bound: float, # in percent + upper_bound: float, # in percent + lower_bound: float, # in percent + frac_of_max: float = 0.5, tolerance: int = 1, max_iterations: int = 5, - max_pixel_lum: MaxPixel = inject("max_pixel"), + max_pixel: MaxPixel = inject("max_pixel"), attenuator: BinaryFilterAttenuator = inject("attenuator"), ) -> MsgGenerator: - # # first we want to get the max_pixel from an oversaturated transmission and use - # yield from bps.mv(attenuator, upper_bound / 100, wait=True) - # brightest_pixel_sat = yield from bps.trigger(max_pixel_lum, wait=True) - # target_pixel_l = int(brightest_pixel_sat) * 0.5 - # print(f"~~Target luminosity: {target_pixel_l}~~\n") + """ + Plan to find the optimal oav transmission. First the brightest pixel at 100% + transmission is taken. A fraction of this (frac_of_max) is taken as the target - + as in the optimal transmission will have it's max pixel as the set target. + A binary search is used to each the target. + Args: + upper_bound: Maximum transmission which will be searched. + lower_bound: Minimum transmission which will be searched. + frac_of_max: Fraction of the brightest pixel at 100% transmission which should be + used as the target max pixel brightness. + tolerance: Amount the search can be off by and still find a match. + max_iterations: Maximum amount of iterations. + """ + yield from bps.mv(attenuator, 100) # 100 % transmission + yield from bps.trigger(max_pixel, wait=True) + brightest_pixel_sat = yield from bps.rd(max_pixel.max_pixel_val) + target_pixel_l = brightest_pixel_sat * frac_of_max + LOGGER.info(f"~~Target luminosity: {target_pixel_l}~~\n") + while max_iterations > tolerance: - # may need to add logic to limit the amount of decimal points - mid = (upper_bound + lower_bound) / 2 + mid = round((upper_bound + lower_bound) / 2, 2) # limit to 2 dp max_iterations -= 1 yield from bps.mv(attenuator, mid / 100) - yield from bps.trigger(max_pixel_lum, wait=True) - brightest_pixel = yield from bps.rd(max_pixel_lum.max_pixel_val) + yield from bps.trigger(max_pixel, wait=True) + brightest_pixel = yield from bps.rd(max_pixel.max_pixel_val) # brightest_pixel = get_max_pixel_value_from_transmission(transmission=mid) LOGGER.info(f"Upper bound is: {upper_bound}, Lower bound is: {lower_bound}") - LOGGER.info(f"Testing transmission {mid}, brightest pixel found {brightest_pixel}") + LOGGER.info( + f"Testing transmission {mid}, brightest pixel found {brightest_pixel}" + ) if target_pixel_l - tolerance < brightest_pixel < target_pixel_l + tolerance: mid = round(mid, 0) From adc209f5dc02a278bd08875ead544c4a89291a18 Mon Sep 17 00:00:00 2001 From: Srishty Date: Fri, 21 Nov 2025 17:57:58 +0000 Subject: [PATCH 03/18] adding comments for tests --- tests/unit_tests/beamlines/i04/test_oav_imaging.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index e8406239b0..6aa08d239e 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -251,3 +251,8 @@ async def test_take_and_save_oav_image_in_re(run_engine: RunEngine, oav: OAV, tm assert await oav.snapshot.filename.get_value() == expected_filename assert await oav.snapshot.directory.get_value() == str(expected_directory) oav.snapshot.trigger.assert_called_once() # type: ignore + + +# add test for transmission optimization +# test that everything is called in the correct order. +# mock out the different transmissions and max vals and make sure you reach what's expected. From c37400c4a8d8f285fc51ce39665b5093c88e6dfa Mon Sep 17 00:00:00 2001 From: Srishty Date: Fri, 21 Nov 2025 18:28:57 +0000 Subject: [PATCH 04/18] mock function to get transmission out --- tests/unit_tests/beamlines/i04/test_oav_imaging.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index 6aa08d239e..1a9b703cbb 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -256,3 +256,12 @@ async def test_take_and_save_oav_image_in_re(run_engine: RunEngine, oav: OAV, tm # add test for transmission optimization # test that everything is called in the correct order. # mock out the different transmissions and max vals and make sure you reach what's expected. + + +# mock function transmission optimisation +def get_max_pixel_value_from_transmission(transmission) -> float: + return transmission + 10 + + +# def test_optimise_oav_transmission_binary_search(): +# with patch("mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.") From e35e079cf081548dcd7c9896c259927829e7a3c8 Mon Sep 17 00:00:00 2001 From: Srishty Date: Mon, 24 Nov 2025 16:47:18 +0000 Subject: [PATCH 05/18] pull changes from beamline testing --- .../beamlines/i04/test_oav_imaging.py | 45 +++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index 1a9b703cbb..8ca0ab35c0 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -258,10 +258,47 @@ async def test_take_and_save_oav_image_in_re(run_engine: RunEngine, oav: OAV, tm # mock out the different transmissions and max vals and make sure you reach what's expected. -# mock function transmission optimisation -def get_max_pixel_value_from_transmission(transmission) -> float: - return transmission + 10 +# # mock function transmission optimisation +# @pytest.fixture +# def get_max_pixel_value_from_transmission(transmission) -> float: +# return transmission + 10 + +# @pytest.fixture +# def get_max_pixel_value_from_transmission(max_pixel: MaxPixel) : +# max_pixel.max_pixel_val.read = AsyncMock([100, 3, 2], [200, 3, 90], [255/2, 90, 80]) + +# async def test_optimise_oav_transmission_binary_search( +# get_max_pixel_value_from_transmission, run_engine: RunEngine, max_pixel: MaxPixel, attenuator: BinaryFilterAttenuator +# ): +# pass +# ---------------------------- + +# # if target is set to 50, then we are expecting optimal transmission to be 40 +# def mid_logic(upper, lower): + +# @patch( +# "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.brightest_pixel_sat", 255 +# ) +# @patch( +# "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.brightest_pixel", +# get_max_pixel_value_from_transmission(), +# ) +# def test_optimise_oav_transmission_binary_search( +# get_max_pixel_value_from_transmission, run_engine: RunEngine, max_pixel: MaxPixel, attenuator: BinaryFilterAttenuator +# ): +# upper_bound = 100 +# lower_bound = 0 +# frac_of_max = 0.5 +# run_engine(optimise_oav_transmission_binary_search(upper_bound, lower_bound, max_pixel = max_pixel, attenuator=attenuator)) +# max_pixel_values = [50, 75, 87.5] + +# async def get_max_pixel_val(): +# upper = 50 +# for i in range(5): +# yield + +# max_pixel.max_pixel_val.read = AsyncMock(side_effect=get_max_pixel_val) +# set_mock_value() -# def test_optimise_oav_transmission_binary_search(): # with patch("mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.") From a69428bc2ebee8bb1abd04af96b2fe54db5b6d12 Mon Sep 17 00:00:00 2001 From: Srishty Date: Sun, 30 Nov 2025 21:53:32 +0000 Subject: [PATCH 06/18] Adding my commit --- .../i04/oav_centering_plans/oav_imaging.py | 2 +- .../beamlines/i04/test_oav_imaging.py | 70 +++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index 5da87f15f3..d6536d920b 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -134,7 +134,7 @@ def optimise_oav_transmission_binary_search( Plan to find the optimal oav transmission. First the brightest pixel at 100% transmission is taken. A fraction of this (frac_of_max) is taken as the target - as in the optimal transmission will have it's max pixel as the set target. - A binary search is used to each the target. + A binary search is used to reach the target. Args: upper_bound: Maximum transmission which will be searched. lower_bound: Minimum transmission which will be searched. diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index 8ca0ab35c0..dbd4bb3097 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -1,3 +1,4 @@ +from collections.abc import AsyncGenerator from unittest.mock import MagicMock, patch import pytest @@ -5,6 +6,7 @@ from bluesky.simulators import RunEngineSimulator, assert_message_and_return_remaining from dodal.devices.attenuator.attenuator import BinaryFilterAttenuator from dodal.devices.backlight import Backlight +from dodal.devices.i04.max_pixel import MaxPixel from dodal.devices.mx_phase1.beamstop import Beamstop, BeamstopPositions from dodal.devices.oav.oav_detector import OAV from dodal.devices.robot import BartRobot, PinMounted @@ -15,6 +17,7 @@ ZebraShutterControl, ZebraShutterState, ) +from ophyd_async.core import init_devices from ophyd_async.testing import set_mock_value from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( @@ -23,6 +26,7 @@ take_oav_image_with_scintillator_in, ) from mx_bluesky.common.utils.exceptions import BeamlineStateError +from mx_bluesky.common.utils.log import LOGGER async def test_check_exception_raised_if_pin_mounted( @@ -258,6 +262,24 @@ async def test_take_and_save_oav_image_in_re(run_engine: RunEngine, oav: OAV, tm # mock out the different transmissions and max vals and make sure you reach what's expected. +@pytest.fixture() +async def max_pixel() -> AsyncGenerator[MaxPixel]: + async with init_devices(mock=True): + max_pixel = MaxPixel("TEST: MAX_PIXEL") + yield max_pixel + + +@patch("mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.brightest_pixel_sat") +def test_binary_search( + mock_brightest_pixel: MagicMock, + max_pixel: MaxPixel, + attenuator: BinaryFilterAttenuator, + upper_bound=100, + lower_bound=0, +): + mock_brightest_pixel.return_value() + + # # mock function transmission optimisation # @pytest.fixture # def get_max_pixel_value_from_transmission(transmission) -> float: @@ -302,3 +324,51 @@ async def test_take_and_save_oav_image_in_re(run_engine: RunEngine, oav: OAV, tm # with patch("mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.") + + +def tranmission_to_max_pixel(transmission): + max_pixel = transmission + 30 + return max_pixel + # if target is 100 then we expect the optimal transmission to be 100 - 30 = 70 + + +def optimise_oav_transmission_binary_search( + upper_bound: float, # in percent + lower_bound: float, # in percent + frac_of_max: float = 0.5, + tolerance: int = 1, + max_iterations: int = 5, +): + target_pixel_l = 255 * frac_of_max + + while max_iterations > tolerance: + mid = round((upper_bound + lower_bound) / 2, 2) # limit to 2 dp + max_iterations -= 1 + + brightest_pixel = tranmission_to_max_pixel(mid) + # brightest_pixel = get_max_pixel_value_from_transmission(transmission=mid) + LOGGER.info(f"Upper bound is: {upper_bound}, Lower bound is: {lower_bound}") + LOGGER.info( + f"Testing transmission {mid}, brightest pixel found {brightest_pixel}" + ) + + if target_pixel_l - tolerance < brightest_pixel < target_pixel_l + tolerance: + mid = round(mid, 0) + LOGGER.info(f"\nOptimal transmission found - {mid}") + return mid + + # condition for too low so want to try higher + elif brightest_pixel < target_pixel_l - tolerance: + LOGGER.info("Result: Too low \n") + lower_bound = mid + + # condition for too high so want to try lower + elif brightest_pixel > target_pixel_l + tolerance: + LOGGER.info("Result: Too high \n") + upper_bound = mid + return "Max iterations reached" + + +def test_binary_search_logic(): + search = optimise_oav_transmission_binary_search(100, 0) + assert search == 70 From 086ac31998e5a968ef43dd61060cd4da1c7117a6 Mon Sep 17 00:00:00 2001 From: Srishty Date: Mon, 1 Dec 2025 14:14:23 +0000 Subject: [PATCH 07/18] refactoring so that 100 percent transmission is in a diff place --- .../i04/oav_centering_plans/oav_imaging.py | 18 ++-- .../beamlines/i04/test_oav_imaging.py | 90 ++++++++++--------- 2 files changed, 59 insertions(+), 49 deletions(-) diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index d6536d920b..da92230bd4 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -121,6 +121,16 @@ def take_and_save_oav_image( raise FileExistsError("OAV image file path already exists") +def _get_max_pixel_from_100_transmission( + max_pixel: MaxPixel = inject("max_pixel"), + attenuator: BinaryFilterAttenuator = inject("attenuator"), +): + yield from bps.mv(attenuator, 100) # 100 % transmission + yield from bps.trigger(max_pixel, wait=True) + target_brightest_pixel = yield from bps.rd(max_pixel.max_pixel_val) + return target_brightest_pixel + + def optimise_oav_transmission_binary_search( upper_bound: float, # in percent lower_bound: float, # in percent @@ -129,7 +139,7 @@ def optimise_oav_transmission_binary_search( max_iterations: int = 5, max_pixel: MaxPixel = inject("max_pixel"), attenuator: BinaryFilterAttenuator = inject("attenuator"), -) -> MsgGenerator: +): """ Plan to find the optimal oav transmission. First the brightest pixel at 100% transmission is taken. A fraction of this (frac_of_max) is taken as the target - @@ -143,9 +153,7 @@ def optimise_oav_transmission_binary_search( tolerance: Amount the search can be off by and still find a match. max_iterations: Maximum amount of iterations. """ - yield from bps.mv(attenuator, 100) # 100 % transmission - yield from bps.trigger(max_pixel, wait=True) - brightest_pixel_sat = yield from bps.rd(max_pixel.max_pixel_val) + brightest_pixel_sat = yield from _get_max_pixel_from_100_transmission() target_pixel_l = brightest_pixel_sat * frac_of_max LOGGER.info(f"~~Target luminosity: {target_pixel_l}~~\n") @@ -177,4 +185,4 @@ def optimise_oav_transmission_binary_search( elif brightest_pixel > target_pixel_l + tolerance: LOGGER.info("Result: Too high \n") upper_bound = mid - return "Max iterations reached" + raise StopIteration("Max iterations reached") diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index dbd4bb3097..62144dc673 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -22,6 +22,7 @@ from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( _prepare_beamline_for_scintillator_images, + optimise_oav_transmission_binary_search, take_and_save_oav_image, take_oav_image_with_scintillator_in, ) @@ -280,50 +281,51 @@ def test_binary_search( mock_brightest_pixel.return_value() -# # mock function transmission optimisation -# @pytest.fixture -# def get_max_pixel_value_from_transmission(transmission) -> float: -# return transmission + 10 - -# @pytest.fixture -# def get_max_pixel_value_from_transmission(max_pixel: MaxPixel) : -# max_pixel.max_pixel_val.read = AsyncMock([100, 3, 2], [200, 3, 90], [255/2, 90, 80]) - -# async def test_optimise_oav_transmission_binary_search( -# get_max_pixel_value_from_transmission, run_engine: RunEngine, max_pixel: MaxPixel, attenuator: BinaryFilterAttenuator -# ): -# pass -# ---------------------------- - -# # if target is set to 50, then we are expecting optimal transmission to be 40 -# def mid_logic(upper, lower): - -# @patch( -# "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.brightest_pixel_sat", 255 -# ) -# @patch( -# "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.brightest_pixel", -# get_max_pixel_value_from_transmission(), -# ) -# def test_optimise_oav_transmission_binary_search( -# get_max_pixel_value_from_transmission, run_engine: RunEngine, max_pixel: MaxPixel, attenuator: BinaryFilterAttenuator -# ): -# upper_bound = 100 -# lower_bound = 0 -# frac_of_max = 0.5 -# run_engine(optimise_oav_transmission_binary_search(upper_bound, lower_bound, max_pixel = max_pixel, attenuator=attenuator)) -# max_pixel_values = [50, 75, 87.5] - -# async def get_max_pixel_val(): -# upper = 50 -# for i in range(5): -# yield - -# max_pixel.max_pixel_val.read = AsyncMock(side_effect=get_max_pixel_val) -# set_mock_value() - - -# with patch("mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.") +# def test_initial_yield_froms( +# sim_run_engine: RunEngineSimulator, +# max_pixel: MaxPixel, +# attenuator: BinaryFilterAttenuator +# ): +# Messages = sim_run_engine.simulate_plan( +# take_oav_image_with_scintillator_in( + + +def test_optimise_oav_transmission_binary_search( + sim_run_engine: RunEngineSimulator, + max_pixel: MaxPixel, + attenuator: BinaryFilterAttenuator, + done_status, +): + # Simulated transmission-to-pixel mapping + # expecting transmission 18.75 to be correct + + # put this in a mark.parameterize so that you can try different dicts + transmission_map = {100: 255, 50: 180, 25: 160, 18.75: 130, 12.5: 118, 0: 0} + keys_list = list(transmission_map.keys()) + max_pixel.trigger = MagicMock(return_value=done_status) + attenuator.set = MagicMock() + + optimise_oav_transmission_binary_search( + upper_bound=100, lower_bound=0, max_pixel=max_pixel, attenuator=attenuator + ) + + # now defining a mock function + def mock_get_max_pixel_value_from_transmission(transmission): + pixel_val = transmission_map.get(transmission) + + if pixel_val is None: + raise KeyError(f"Transmission {transmission} is not in the mock dictionary") + + return pixel_val + + # AsyncMock(side_effect=[50,75,etc...]) # use one for the transmissions one for the pixel values. + + # with patch( + # "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.get_max_pixel_value_from_transmission", + # side_effect=mock_get_max_pixel_value_from_transmission, + # ): + # result = optimise_oav_transmission_binary_search(255 / 2, 100, 0, 5, 10) + # assert result == 18.75 def tranmission_to_max_pixel(transmission): From a2e699142b8c2943d0dcb732cbd939b9299b9d14 Mon Sep 17 00:00:00 2001 From: Srishty Date: Mon, 1 Dec 2025 16:42:51 +0000 Subject: [PATCH 08/18] attempting tests again --- .../beamlines/i04/test_oav_imaging.py | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index 62144dc673..c2f1137f43 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -1,5 +1,5 @@ from collections.abc import AsyncGenerator -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from bluesky.run_engine import RunEngine @@ -270,15 +270,15 @@ async def max_pixel() -> AsyncGenerator[MaxPixel]: yield max_pixel -@patch("mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.brightest_pixel_sat") -def test_binary_search( - mock_brightest_pixel: MagicMock, - max_pixel: MaxPixel, - attenuator: BinaryFilterAttenuator, - upper_bound=100, - lower_bound=0, -): - mock_brightest_pixel.return_value() +# @patch("mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.brightest_pixel_sat") +# def test_binary_search( +# mock_brightest_pixel: MagicMock, +# max_pixel: MaxPixel, +# attenuator: BinaryFilterAttenuator, +# upper_bound=100, +# lower_bound=0, +# ): +# mock_brightest_pixel.return_value() # def test_initial_yield_froms( @@ -290,7 +290,11 @@ def test_binary_search( # take_oav_image_with_scintillator_in( +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._get_max_pixel_from_100_transmission" +) def test_optimise_oav_transmission_binary_search( + mock_brightest_pixel: MagicMock, sim_run_engine: RunEngineSimulator, max_pixel: MaxPixel, attenuator: BinaryFilterAttenuator, @@ -300,23 +304,28 @@ def test_optimise_oav_transmission_binary_search( # expecting transmission 18.75 to be correct # put this in a mark.parameterize so that you can try different dicts - transmission_map = {100: 255, 50: 180, 25: 160, 18.75: 130, 12.5: 118, 0: 0} - keys_list = list(transmission_map.keys()) + mock_brightest_pixel.return_value = 255 + # transmission_map = {100: 255, 50: 180, 25: 160, 18.75: 130, 12.5: 118, 0: 0} + transmission_map = {50: 220, 25: 200, 18.75: 150, 12.5: 127} + # transmission_list = list(transmission_map.keys()) + + max_pixel_list = list(transmission_map.values()) max_pixel.trigger = MagicMock(return_value=done_status) attenuator.set = MagicMock() + side_effect_input = [{"readback": i} for i in max_pixel_list] + max_pixel.locate = AsyncMock(side_effect=side_effect_input) - optimise_oav_transmission_binary_search( - upper_bound=100, lower_bound=0, max_pixel=max_pixel, attenuator=attenuator - ) + optimise_oav_transmission_binary_search(upper_bound=100, lower_bound=0) + assert max_pixel.trigger.call_count == 4 # now defining a mock function - def mock_get_max_pixel_value_from_transmission(transmission): - pixel_val = transmission_map.get(transmission) + # def mock_get_max_pixel_value_from_transmission(transmission): + # pixel_val = transmission_map.get(transmission) - if pixel_val is None: - raise KeyError(f"Transmission {transmission} is not in the mock dictionary") + # if pixel_val is None: + # raise KeyError(f"Transmission {transmission} is not in the mock dictionary") - return pixel_val + # return pixel_val # AsyncMock(side_effect=[50,75,etc...]) # use one for the transmissions one for the pixel values. @@ -334,7 +343,7 @@ def tranmission_to_max_pixel(transmission): # if target is 100 then we expect the optimal transmission to be 100 - 30 = 70 -def optimise_oav_transmission_binary_search( +def optimise_oav_transmission_binary_search_without_yields( upper_bound: float, # in percent lower_bound: float, # in percent frac_of_max: float = 0.5, From 9a4117f9e339782e44c8f818d31abebb25cbfeb2 Mon Sep 17 00:00:00 2001 From: Srishty Date: Mon, 1 Dec 2025 18:52:39 +0000 Subject: [PATCH 09/18] run plan through run engine --- tests/unit_tests/beamlines/i04/test_oav_imaging.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index c2f1137f43..aad2394d01 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -17,8 +17,7 @@ ZebraShutterControl, ZebraShutterState, ) -from ophyd_async.core import init_devices -from ophyd_async.testing import set_mock_value +from ophyd_async.core import init_devices, set_mock_value from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( _prepare_beamline_for_scintillator_images, @@ -295,7 +294,7 @@ async def max_pixel() -> AsyncGenerator[MaxPixel]: ) def test_optimise_oav_transmission_binary_search( mock_brightest_pixel: MagicMock, - sim_run_engine: RunEngineSimulator, + run_engine: RunEngine, max_pixel: MaxPixel, attenuator: BinaryFilterAttenuator, done_status, @@ -304,7 +303,7 @@ def test_optimise_oav_transmission_binary_search( # expecting transmission 18.75 to be correct # put this in a mark.parameterize so that you can try different dicts - mock_brightest_pixel.return_value = 255 + mock_brightest_pixel.return_value = [255] # transmission_map = {100: 255, 50: 180, 25: 160, 18.75: 130, 12.5: 118, 0: 0} transmission_map = {50: 220, 25: 200, 18.75: 150, 12.5: 127} # transmission_list = list(transmission_map.keys()) @@ -315,7 +314,7 @@ def test_optimise_oav_transmission_binary_search( side_effect_input = [{"readback": i} for i in max_pixel_list] max_pixel.locate = AsyncMock(side_effect=side_effect_input) - optimise_oav_transmission_binary_search(upper_bound=100, lower_bound=0) + run_engine(optimise_oav_transmission_binary_search(upper_bound=100, lower_bound=0)) assert max_pixel.trigger.call_count == 4 # now defining a mock function From b2868bc077999d9064367294f67d566d13534461 Mon Sep 17 00:00:00 2001 From: Srishty Date: Tue, 2 Dec 2025 16:47:23 +0000 Subject: [PATCH 10/18] Initial commit - add plan for comissioning day tomorrow --- .../i04/oav_centering_plans/oav_imaging.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index da92230bd4..2df9619674 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -6,6 +6,7 @@ from dodal.common import inject from dodal.devices.attenuator.attenuator import BinaryFilterAttenuator from dodal.devices.backlight import Backlight +from dodal.devices.i04.beam_centre import CentreEllipseMethod from dodal.devices.i04.max_pixel import MaxPixel from dodal.devices.mx_phase1.beamstop import Beamstop, BeamstopPositions from dodal.devices.oav.oav_detector import OAV @@ -186,3 +187,35 @@ def optimise_oav_transmission_binary_search( LOGGER.info("Result: Too high \n") upper_bound = mid raise StopIteration("Max iterations reached") + + +def automated_centring( + robot: BartRobot, + beamstop: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod = inject("centre_ellipse"), +): + # plan to move scintillator in + # call the above plan first to optimise transmission + # then use the centring device to find the centre + yield from _prepare_beamline_for_scintillator_images( + robot, + beamstop, + backlight, + scintillator, + xbpm_feedback, + initial_wait_group, + ) + yield from optimise_oav_transmission_binary_search(100, 0) + + yield from bps.trigger(centre_ellipse, wait=True) + centre_x = bps.rd(centre_ellipse.center_x_val) + centre_y = bps.rd(centre_ellipse.center_y_val) + LOGGER.info(f"Centre X: {centre_x}, Centre Y: {centre_y}") + # now you can add the bit to move it to the centre + + +# A - if I put the injects then I don't think I need to put it here too From a740b564c3d36427883f4180d9f3a6ad5d1b8100 Mon Sep 17 00:00:00 2001 From: Srishty Date: Tue, 2 Dec 2025 17:11:55 +0000 Subject: [PATCH 11/18] add to the init file --- src/mx_bluesky/beamlines/i04/__init__.py | 2 ++ .../beamlines/i04/oav_centering_plans/oav_imaging.py | 3 --- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/mx_bluesky/beamlines/i04/__init__.py b/src/mx_bluesky/beamlines/i04/__init__.py index fb028f11f7..254a84d845 100644 --- a/src/mx_bluesky/beamlines/i04/__init__.py +++ b/src/mx_bluesky/beamlines/i04/__init__.py @@ -2,6 +2,7 @@ i04_default_grid_detect_and_xray_centre, ) from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( + automated_centring, optimise_oav_transmission_binary_search, take_oav_image_with_scintillator_in, ) @@ -18,4 +19,5 @@ "thaw_and_murko_centre", "take_oav_image_with_scintillator_in", "optimise_oav_transmission_binary_search", + "automated_centring", ] diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index 2df9619674..f90736783c 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -198,9 +198,6 @@ def automated_centring( max_pixel: MaxPixel, centre_ellipse: CentreEllipseMethod = inject("centre_ellipse"), ): - # plan to move scintillator in - # call the above plan first to optimise transmission - # then use the centring device to find the centre yield from _prepare_beamline_for_scintillator_images( robot, beamstop, From 11749261a76e1cceddd7302aca63e78716188efb Mon Sep 17 00:00:00 2001 From: Srishty Date: Wed, 3 Dec 2025 15:40:13 +0000 Subject: [PATCH 12/18] add changes post testing --- .../i04/oav_centering_plans/oav_imaging.py | 122 ++++++++++++------ 1 file changed, 84 insertions(+), 38 deletions(-) diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index f90736783c..ecd5fd050b 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -1,6 +1,6 @@ import os import time - +from time import sleep import bluesky.plan_stubs as bps from bluesky.utils import MsgGenerator from dodal.common import inject @@ -51,21 +51,28 @@ def take_oav_image_with_scintillator_in( defaults are always correct. """ + LOGGER.info("prearing beamline") yield from _prepare_beamline_for_scintillator_images( - robot, beamstop, backlight, scintillator, xbpm_feedback, initial_wait_group + robot, beamstop, backlight, scintillator, xbpm_feedback, initial_wait_group, shutter ) - + LOGGER.info("setting transmission") yield from bps.abs_set(attenuator, transmission, group=initial_wait_group) + if image_name is None: image_name = f"{time.time_ns()}ATT{transmission * 100}" - + LOGGER.info(f"using image name {image_name}") + LOGGER.info("Waiting for initial_wait_group...") yield from bps.wait(initial_wait_group) + LOGGER.info("Opening shutter...") + yield from bps.abs_set(shutter.control_mode, ZebraShutterControl.MANUAL, wait=True) yield from bps.abs_set(shutter, ZebraShutterState.OPEN, wait=True) - take_and_save_oav_image(file_path=image_path, file_name=image_name, oav=oav) + LOGGER.info("Taking image...") + + yield from take_and_save_oav_image(file_path=image_path, file_name=image_name, oav=oav) def _prepare_beamline_for_scintillator_images( @@ -74,6 +81,7 @@ def _prepare_beamline_for_scintillator_images( backlight: Backlight, scintillator: Scintillator, xbpm_feedback: XBPMFeedback, + shutter: ZebraShutter, group: str, ) -> MsgGenerator: """ @@ -98,6 +106,9 @@ def _prepare_beamline_for_scintillator_images( yield from bps.abs_set(scintillator.selected_pos, InOut.IN, group=group) + yield from bps.abs_set(shutter.control_mode, ZebraShutterControl.MANUAL, wait=True) + yield from bps.abs_set(shutter, ZebraShutterState.OPEN, wait=True) + def take_and_save_oav_image( file_name: str, @@ -123,24 +134,25 @@ def take_and_save_oav_image( def _get_max_pixel_from_100_transmission( - max_pixel: MaxPixel = inject("max_pixel"), - attenuator: BinaryFilterAttenuator = inject("attenuator"), + max_pixel: MaxPixel, + attenuator: BinaryFilterAttenuator, ): - yield from bps.mv(attenuator, 100) # 100 % transmission + yield from bps.mv(attenuator, 1) # 100 % transmission yield from bps.trigger(max_pixel, wait=True) target_brightest_pixel = yield from bps.rd(max_pixel.max_pixel_val) return target_brightest_pixel def optimise_oav_transmission_binary_search( - upper_bound: float, # in percent - lower_bound: float, # in percent - frac_of_max: float = 0.5, - tolerance: int = 1, - max_iterations: int = 5, + upper_bound: float = 100, # in percent + lower_bound: float = 0, # in percent + frac_of_max: float = 0.75, + tolerance: int = 5, + max_iterations: int = 10, max_pixel: MaxPixel = inject("max_pixel"), attenuator: BinaryFilterAttenuator = inject("attenuator"), -): + xbpm_feedback: XBPMFeedback = inject("xbpm_feedback") +) -> MsgGenerator: """ Plan to find the optimal oav transmission. First the brightest pixel at 100% transmission is taken. A fraction of this (frac_of_max) is taken as the target - @@ -154,15 +166,18 @@ def optimise_oav_transmission_binary_search( tolerance: Amount the search can be off by and still find a match. max_iterations: Maximum amount of iterations. """ - brightest_pixel_sat = yield from _get_max_pixel_from_100_transmission() + brightest_pixel_sat = yield from _get_max_pixel_from_100_transmission(max_pixel, attenuator) target_pixel_l = brightest_pixel_sat * frac_of_max LOGGER.info(f"~~Target luminosity: {target_pixel_l}~~\n") - while max_iterations > tolerance: + iterations = 0 + + while iterations target_pixel_l + tolerance: LOGGER.info("Result: Too high \n") upper_bound = mid + iterations += 1 raise StopIteration("Max iterations reached") + def automated_centring( - robot: BartRobot, - beamstop: Beamstop, - backlight: Backlight, - scintillator: Scintillator, - xbpm_feedback: XBPMFeedback, - max_pixel: MaxPixel, - centre_ellipse: CentreEllipseMethod = inject("centre_ellipse"), -): + zoom_levels: list[str] = ["1.0x", "1.5x", "2.0x", "2.5x", "3.0x", "5.0x", "7.5x", "10.0x"], + robot: BartRobot= inject("robot"), + beamstop: Beamstop = inject("beamstop"), + backlight: Backlight = inject("backlight"), + scintillator: Scintillator = inject("scintillator"), + xbpm_feedback: XBPMFeedback= inject("xbpm_feedback"), + max_pixel: MaxPixel = inject("max_pixel"), + centre_ellipse: CentreEllipseMethod = inject("beam_centre"), + attenuator: BinaryFilterAttenuator = inject("attenuator"), + oav: OAV = inject("oav"), + shutter: ZebraShutter = inject("sample_shutter"), + +) -> MsgGenerator: + + zoom_level_to_dict = {"7.5x": [oav.zoom_controller.x_placeholder_zoom_7, oav.zoom_controller.y_placeholder_zoom_7], "1.0x": [oav.zoom_controller.x_placeholder_zoom_1, oav.zoom_controller.y_placeholder_zoom_1], "1.5x": [oav.zoom_controller.x_placeholder_zoom_2, oav.zoom_controller.y_placeholder_zoom_2], "2.0x": [oav.zoom_controller.x_placeholder_zoom_3, oav.zoom_controller.y_placeholder_zoom_3], + "2.5x": [oav.zoom_controller.x_placeholder_zoom_4, oav.zoom_controller.y_placeholder_zoom_4], "3.0x": [oav.zoom_controller.x_placeholder_zoom_5, oav.zoom_controller.y_placeholder_zoom_5], "5.0x": [oav.zoom_controller.x_placeholder_zoom_6, oav.zoom_controller.y_placeholder_zoom_6], "10.0x": [oav.zoom_controller.x_placeholder_zoom_8, oav.zoom_controller.y_placeholder_zoom_8],} + + LOGGER.info("Preparing beamline for images...") yield from _prepare_beamline_for_scintillator_images( - robot, - beamstop, - backlight, - scintillator, - xbpm_feedback, - initial_wait_group, - ) - yield from optimise_oav_transmission_binary_search(100, 0) + robot, + beamstop, + backlight, + scintillator, + xbpm_feedback, + shutter, + initial_wait_group, + ) - yield from bps.trigger(centre_ellipse, wait=True) - centre_x = bps.rd(centre_ellipse.center_x_val) - centre_y = bps.rd(centre_ellipse.center_y_val) - LOGGER.info(f"Centre X: {centre_x}, Centre Y: {centre_y}") + for zoom in zoom_levels: + LOGGER.info(f"Moving to zoom level {zoom}") + yield from bps.abs_set(oav.zoom_controller, zoom, wait=True) + yield from bps.sleep(1) + if zoom == "7.5x" or zoom == "1.0x": + LOGGER.info(f"Optimising transmission (zoom level {zoom})") + yield from optimise_oav_transmission_binary_search(100, 0, max_pixel=max_pixel, attenuator=attenuator, xbpm_feedback=xbpm_feedback) + + + yield from bps.trigger(centre_ellipse, wait=True) + centre_x = yield from bps.rd(centre_ellipse.center_x_val) + centre_y = yield from bps.rd(centre_ellipse.center_y_val) + LOGGER.info(f"Centre X: {centre_x}, Centre Y: {centre_y}") + centre_x = round(centre_x) + centre_y = round(centre_y) + x_signal = zoom_level_to_dict[zoom][0] + y_signal = zoom_level_to_dict[zoom][1] + LOGGER.info("Writing centre values to OAV PVs") + yield from bps.mv(x_signal, centre_x, y_signal, centre_y) + + LOGGER.info("Done!") + # now you can add the bit to move it to the centre From d0bf2709a5a7c33070340cbc1edffa4901c598d0 Mon Sep 17 00:00:00 2001 From: Srishty Date: Thu, 4 Dec 2025 15:34:59 +0000 Subject: [PATCH 13/18] small changes --- .../i04/oav_centering_plans/oav_imaging.py | 111 +++++++++++++----- 1 file changed, 79 insertions(+), 32 deletions(-) diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index ecd5fd050b..1f0aeb3cec 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -1,6 +1,6 @@ import os import time -from time import sleep + import bluesky.plan_stubs as bps from bluesky.utils import MsgGenerator from dodal.common import inject @@ -53,12 +53,17 @@ def take_oav_image_with_scintillator_in( LOGGER.info("prearing beamline") yield from _prepare_beamline_for_scintillator_images( - robot, beamstop, backlight, scintillator, xbpm_feedback, initial_wait_group, shutter + robot, + beamstop, + backlight, + scintillator, + xbpm_feedback, + initial_wait_group, + shutter, ) LOGGER.info("setting transmission") yield from bps.abs_set(attenuator, transmission, group=initial_wait_group) - if image_name is None: image_name = f"{time.time_ns()}ATT{transmission * 100}" LOGGER.info(f"using image name {image_name}") @@ -72,7 +77,9 @@ def take_oav_image_with_scintillator_in( LOGGER.info("Taking image...") - yield from take_and_save_oav_image(file_path=image_path, file_name=image_name, oav=oav) + yield from take_and_save_oav_image( + file_path=image_path, file_name=image_name, oav=oav + ) def _prepare_beamline_for_scintillator_images( @@ -151,7 +158,7 @@ def optimise_oav_transmission_binary_search( max_iterations: int = 10, max_pixel: MaxPixel = inject("max_pixel"), attenuator: BinaryFilterAttenuator = inject("attenuator"), - xbpm_feedback: XBPMFeedback = inject("xbpm_feedback") + xbpm_feedback: XBPMFeedback = inject("xbpm_feedback"), ) -> MsgGenerator: """ Plan to find the optimal oav transmission. First the brightest pixel at 100% @@ -166,18 +173,20 @@ def optimise_oav_transmission_binary_search( tolerance: Amount the search can be off by and still find a match. max_iterations: Maximum amount of iterations. """ - brightest_pixel_sat = yield from _get_max_pixel_from_100_transmission(max_pixel, attenuator) + brightest_pixel_sat = yield from _get_max_pixel_from_100_transmission( + max_pixel, attenuator + ) target_pixel_l = brightest_pixel_sat * frac_of_max LOGGER.info(f"~~Target luminosity: {target_pixel_l}~~\n") iterations = 0 - while iterations MsgGenerator: - - zoom_level_to_dict = {"7.5x": [oav.zoom_controller.x_placeholder_zoom_7, oav.zoom_controller.y_placeholder_zoom_7], "1.0x": [oav.zoom_controller.x_placeholder_zoom_1, oav.zoom_controller.y_placeholder_zoom_1], "1.5x": [oav.zoom_controller.x_placeholder_zoom_2, oav.zoom_controller.y_placeholder_zoom_2], "2.0x": [oav.zoom_controller.x_placeholder_zoom_3, oav.zoom_controller.y_placeholder_zoom_3], - "2.5x": [oav.zoom_controller.x_placeholder_zoom_4, oav.zoom_controller.y_placeholder_zoom_4], "3.0x": [oav.zoom_controller.x_placeholder_zoom_5, oav.zoom_controller.y_placeholder_zoom_5], "5.0x": [oav.zoom_controller.x_placeholder_zoom_6, oav.zoom_controller.y_placeholder_zoom_6], "10.0x": [oav.zoom_controller.x_placeholder_zoom_8, oav.zoom_controller.y_placeholder_zoom_8],} - + zoom_level_to_dict = { + "7.5x": [ + oav.zoom_controller.x_placeholder_zoom_7, + oav.zoom_controller.y_placeholder_zoom_7, + ], + "1.0x": [ + oav.zoom_controller.x_placeholder_zoom_1, + oav.zoom_controller.y_placeholder_zoom_1, + ], + "1.5x": [ + oav.zoom_controller.x_placeholder_zoom_2, + oav.zoom_controller.y_placeholder_zoom_2, + ], + "2.0x": [ + oav.zoom_controller.x_placeholder_zoom_3, + oav.zoom_controller.y_placeholder_zoom_3, + ], + "2.5x": [ + oav.zoom_controller.x_placeholder_zoom_4, + oav.zoom_controller.y_placeholder_zoom_4, + ], + "3.0x": [ + oav.zoom_controller.x_placeholder_zoom_5, + oav.zoom_controller.y_placeholder_zoom_5, + ], + "5.0x": [ + oav.zoom_controller.x_placeholder_zoom_6, + oav.zoom_controller.y_placeholder_zoom_6, + ], + "10.0x": [ + oav.zoom_controller.x_placeholder_zoom_8, + oav.zoom_controller.y_placeholder_zoom_8, + ], + } + LOGGER.info("Preparing beamline for images...") yield from _prepare_beamline_for_scintillator_images( - robot, - beamstop, - backlight, - scintillator, - xbpm_feedback, - shutter, - initial_wait_group, - ) + robot, + beamstop, + backlight, + scintillator, + xbpm_feedback, + shutter, + initial_wait_group, + ) for zoom in zoom_levels: LOGGER.info(f"Moving to zoom level {zoom}") @@ -242,8 +289,13 @@ def automated_centring( yield from bps.sleep(1) if zoom == "7.5x" or zoom == "1.0x": LOGGER.info(f"Optimising transmission (zoom level {zoom})") - yield from optimise_oav_transmission_binary_search(100, 0, max_pixel=max_pixel, attenuator=attenuator, xbpm_feedback=xbpm_feedback) - + yield from optimise_oav_transmission_binary_search( + 100, + 0, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) yield from bps.trigger(centre_ellipse, wait=True) centre_x = yield from bps.rd(centre_ellipse.center_x_val) @@ -257,8 +309,3 @@ def automated_centring( yield from bps.mv(x_signal, centre_x, y_signal, centre_y) LOGGER.info("Done!") - - # now you can add the bit to move it to the centre - - -# A - if I put the injects then I don't think I need to put it here too From d40b3057389f2a84658fdf8c99dff331860f17c4 Mon Sep 17 00:00:00 2001 From: Dominic Oram Date: Thu, 11 Dec 2025 14:13:19 +0000 Subject: [PATCH 14/18] Fixes from review --- .../i04/oav_centering_plans/oav_imaging.py | 108 ++++++------------ 1 file changed, 38 insertions(+), 70 deletions(-) diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index 1f0aeb3cec..e5e4b41b17 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -9,7 +9,7 @@ from dodal.devices.i04.beam_centre import CentreEllipseMethod from dodal.devices.i04.max_pixel import MaxPixel from dodal.devices.mx_phase1.beamstop import Beamstop, BeamstopPositions -from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_detector import OAV, ZoomController from dodal.devices.robot import BartRobot, PinMounted from dodal.devices.scintillator import InOut, Scintillator from dodal.devices.xbpm_feedback import XBPMFeedback @@ -150,7 +150,7 @@ def _get_max_pixel_from_100_transmission( return target_brightest_pixel -def optimise_oav_transmission_binary_search( +def optimise_transmission_with_oav( upper_bound: float = 100, # in percent lower_bound: float = 0, # in percent frac_of_max: float = 0.75, @@ -215,17 +215,9 @@ def optimise_oav_transmission_binary_search( raise StopIteration("Max iterations reached") -def automated_centring( - zoom_levels: list[str] = [ - "1.0x", - "1.5x", - "2.0x", - "2.5x", - "3.0x", - "5.0x", - "7.5x", - "10.0x", - ], +def find_beam_centres( + zoom_levels_to_centre: list[str] | None = None, + zoom_levels_to_optimise_transmission: list[str] | None = None, robot: BartRobot = inject("robot"), beamstop: Beamstop = inject("beamstop"), backlight: Backlight = inject("backlight"), @@ -237,40 +229,16 @@ def automated_centring( oav: OAV = inject("oav"), shutter: ZebraShutter = inject("sample_shutter"), ) -> MsgGenerator: - zoom_level_to_dict = { - "7.5x": [ - oav.zoom_controller.x_placeholder_zoom_7, - oav.zoom_controller.y_placeholder_zoom_7, - ], - "1.0x": [ - oav.zoom_controller.x_placeholder_zoom_1, - oav.zoom_controller.y_placeholder_zoom_1, - ], - "1.5x": [ - oav.zoom_controller.x_placeholder_zoom_2, - oav.zoom_controller.y_placeholder_zoom_2, - ], - "2.0x": [ - oav.zoom_controller.x_placeholder_zoom_3, - oav.zoom_controller.y_placeholder_zoom_3, - ], - "2.5x": [ - oav.zoom_controller.x_placeholder_zoom_4, - oav.zoom_controller.y_placeholder_zoom_4, - ], - "3.0x": [ - oav.zoom_controller.x_placeholder_zoom_5, - oav.zoom_controller.y_placeholder_zoom_5, - ], - "5.0x": [ - oav.zoom_controller.x_placeholder_zoom_6, - oav.zoom_controller.y_placeholder_zoom_6, - ], - "10.0x": [ - oav.zoom_controller.x_placeholder_zoom_8, - oav.zoom_controller.y_placeholder_zoom_8, - ], - } + """ + zoom_levels: The levels to do centring at, by default runs at all known zoom levels. + """ + + assert isinstance(oav.zoom_controller, ZoomController), ( + "Zoom controller does not support setting beam centres" + ) # Can probably do better typing? Make OAV generic to zoom controller? + + if zoom_levels_to_optimise_transmission is None: + zoom_levels_to_optimise_transmission = ["1.0x", "7.5x"] LOGGER.info("Preparing beamline for images...") yield from _prepare_beamline_for_scintillator_images( @@ -283,29 +251,29 @@ def automated_centring( initial_wait_group, ) - for zoom in zoom_levels: - LOGGER.info(f"Moving to zoom level {zoom}") - yield from bps.abs_set(oav.zoom_controller, zoom, wait=True) - yield from bps.sleep(1) - if zoom == "7.5x" or zoom == "1.0x": - LOGGER.info(f"Optimising transmission (zoom level {zoom})") - yield from optimise_oav_transmission_binary_search( - 100, - 0, - max_pixel=max_pixel, - attenuator=attenuator, - xbpm_feedback=xbpm_feedback, + for centring_device in oav.zoom_controller.beam_centres.values(): + zoom_name = yield from bps.rd(centring_device.level_name) + if zoom_levels_to_centre is None or zoom_name in zoom_levels_to_centre: + LOGGER.info(f"Moving to zoom level {zoom_name}") + yield from bps.abs_set(oav.zoom_controller, zoom_name, wait=True) + if zoom_name in zoom_levels_to_optimise_transmission: + LOGGER.info(f"Optimising transmission at zoom level {zoom_name}") + yield from optimise_transmission_with_oav( + 100, + 0, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + + yield from bps.trigger(centre_ellipse, wait=True) + centre_x = yield from bps.rd(centre_ellipse.center_x_val) + centre_y = yield from bps.rd(centre_ellipse.center_y_val) + centre_x = round(centre_x) + centre_y = round(centre_y) + LOGGER.info(f"Writing centre values ({centre_x}, {centre_y}) to OAV PVs") + yield from bps.mv( + centring_device.x_centre, centre_x, centring_device.y_centre, centre_y ) - yield from bps.trigger(centre_ellipse, wait=True) - centre_x = yield from bps.rd(centre_ellipse.center_x_val) - centre_y = yield from bps.rd(centre_ellipse.center_y_val) - LOGGER.info(f"Centre X: {centre_x}, Centre Y: {centre_y}") - centre_x = round(centre_x) - centre_y = round(centre_y) - x_signal = zoom_level_to_dict[zoom][0] - y_signal = zoom_level_to_dict[zoom][1] - LOGGER.info("Writing centre values to OAV PVs") - yield from bps.mv(x_signal, centre_x, y_signal, centre_y) - LOGGER.info("Done!") From 4e5325ad5555ad41069cbaf24089dd232447f87d Mon Sep 17 00:00:00 2001 From: Dominic Oram Date: Mon, 15 Dec 2025 13:32:33 +0000 Subject: [PATCH 15/18] Use separate zoom controller --- pyproject.toml | 2 +- src/mx_bluesky/beamlines/i04/__init__.py | 8 ++++---- .../i04/oav_centering_plans/oav_imaging.py | 13 ++++--------- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f62b729a85..34f7d5d431 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ "ophyd >= 1.10.5", "ophyd-async >= 0.14.0", "bluesky >= 1.14.6", - "dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@i03-device-manager", + "dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@1726_beam_centre_device_post_testing", ] diff --git a/src/mx_bluesky/beamlines/i04/__init__.py b/src/mx_bluesky/beamlines/i04/__init__.py index 254a84d845..4ba33d98e9 100644 --- a/src/mx_bluesky/beamlines/i04/__init__.py +++ b/src/mx_bluesky/beamlines/i04/__init__.py @@ -2,8 +2,8 @@ i04_default_grid_detect_and_xray_centre, ) from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( - automated_centring, - optimise_oav_transmission_binary_search, + find_beam_centres, + optimise_transmission_with_oav, take_oav_image_with_scintillator_in, ) from mx_bluesky.beamlines.i04.thawing_plan import ( @@ -18,6 +18,6 @@ "i04_default_grid_detect_and_xray_centre", "thaw_and_murko_centre", "take_oav_image_with_scintillator_in", - "optimise_oav_transmission_binary_search", - "automated_centring", + "optimise_transmission_with_oav", + "find_beam_centres", ] diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index e5e4b41b17..94954e8ccb 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -9,7 +9,7 @@ from dodal.devices.i04.beam_centre import CentreEllipseMethod from dodal.devices.i04.max_pixel import MaxPixel from dodal.devices.mx_phase1.beamstop import Beamstop, BeamstopPositions -from dodal.devices.oav.oav_detector import OAV, ZoomController +from dodal.devices.oav.oav_detector import OAV, ZoomControllerWithBeamCentres from dodal.devices.robot import BartRobot, PinMounted from dodal.devices.scintillator import InOut, Scintillator from dodal.devices.xbpm_feedback import XBPMFeedback @@ -226,17 +226,12 @@ def find_beam_centres( max_pixel: MaxPixel = inject("max_pixel"), centre_ellipse: CentreEllipseMethod = inject("beam_centre"), attenuator: BinaryFilterAttenuator = inject("attenuator"), - oav: OAV = inject("oav"), + zoom_controller: ZoomControllerWithBeamCentres = inject("zoom_controller"), shutter: ZebraShutter = inject("sample_shutter"), ) -> MsgGenerator: """ zoom_levels: The levels to do centring at, by default runs at all known zoom levels. """ - - assert isinstance(oav.zoom_controller, ZoomController), ( - "Zoom controller does not support setting beam centres" - ) # Can probably do better typing? Make OAV generic to zoom controller? - if zoom_levels_to_optimise_transmission is None: zoom_levels_to_optimise_transmission = ["1.0x", "7.5x"] @@ -251,11 +246,11 @@ def find_beam_centres( initial_wait_group, ) - for centring_device in oav.zoom_controller.beam_centres.values(): + for centring_device in zoom_controller.beam_centres.values(): zoom_name = yield from bps.rd(centring_device.level_name) if zoom_levels_to_centre is None or zoom_name in zoom_levels_to_centre: LOGGER.info(f"Moving to zoom level {zoom_name}") - yield from bps.abs_set(oav.zoom_controller, zoom_name, wait=True) + yield from bps.abs_set(zoom_controller, zoom_name, wait=True) if zoom_name in zoom_levels_to_optimise_transmission: LOGGER.info(f"Optimising transmission at zoom level {zoom_name}") yield from optimise_transmission_with_oav( From e91646610fae13fadce776a84aa3d766e7a35e97 Mon Sep 17 00:00:00 2001 From: Dominic Oram Date: Mon, 15 Dec 2025 17:43:12 +0000 Subject: [PATCH 16/18] Add tests for optimise transmission --- .../i04/oav_centering_plans/oav_imaging.py | 81 ++-- .../beamlines/i04/test_oav_imaging.py | 400 +++++++++++++----- 2 files changed, 349 insertions(+), 132 deletions(-) diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index 94954e8ccb..75ce4a7c3e 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -58,8 +58,8 @@ def take_oav_image_with_scintillator_in( backlight, scintillator, xbpm_feedback, - initial_wait_group, shutter, + initial_wait_group, ) LOGGER.info("setting transmission") yield from bps.abs_set(attenuator, transmission, group=initial_wait_group) @@ -92,12 +92,8 @@ def _prepare_beamline_for_scintillator_images( group: str, ) -> MsgGenerator: """ - Prepares the beamline for oav image by making sure the pin is NOT mounted and + Prepares the beamline for oav image by making sure the pin is not mounted and the beam is on (feedback check). Finally, the scintillator is moved in. - - Args: - devices: These are the specific ophyd-devices used for the plan, the - defaults are always correct. """ pin_mounted = yield from bps.rd(robot.gonio_pin_sensor) if pin_mounted == PinMounted.PIN_MOUNTED: @@ -140,20 +136,22 @@ def take_and_save_oav_image( raise FileExistsError("OAV image file path already exists") -def _get_max_pixel_from_100_transmission( +def _max_pixel_at_transmission( max_pixel: MaxPixel, attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + transmission: float, ): - yield from bps.mv(attenuator, 1) # 100 % transmission + yield from bps.trigger(xbpm_feedback, wait=True) + yield from bps.mv(attenuator, transmission) yield from bps.trigger(max_pixel, wait=True) - target_brightest_pixel = yield from bps.rd(max_pixel.max_pixel_val) - return target_brightest_pixel + return (yield from bps.rd(max_pixel.max_pixel_val)) def optimise_transmission_with_oav( - upper_bound: float = 100, # in percent - lower_bound: float = 0, # in percent - frac_of_max: float = 0.75, + upper_bound: float = 100, + lower_bound: float = 0, + target_brightness_fraction: float = 0.75, tolerance: int = 5, max_iterations: int = 10, max_pixel: MaxPixel = inject("max_pixel"), @@ -162,53 +160,66 @@ def optimise_transmission_with_oav( ) -> MsgGenerator: """ Plan to find the optimal oav transmission. First the brightest pixel at 100% - transmission is taken. A fraction of this (frac_of_max) is taken as the target - - as in the optimal transmission will have it's max pixel as the set target. - A binary search is used to reach the target. + transmission is taken. A fraction of this (target_brightness_fraction) is taken + as the target - as in the optimal transmission will have it's max pixel as the set + target. A binary search is used to reach this. Args: - upper_bound: Maximum transmission which will be searched. - lower_bound: Minimum transmission which will be searched. - frac_of_max: Fraction of the brightest pixel at 100% transmission which should be - used as the target max pixel brightness. - tolerance: Amount the search can be off by and still find a match. + upper_bound: Maximum transmission which will be searched. In percent. + lower_bound: Minimum transmission which will be searched. In percent. + target_brightness_fraction: Fraction of the brightest pixel at 100% + transmission which should be used as the target max pixel brightness. + tolerance: Amount the brightness can be off by and still find a match. max_iterations: Maximum amount of iterations. """ - brightest_pixel_sat = yield from _get_max_pixel_from_100_transmission( - max_pixel, attenuator + + if upper_bound < lower_bound: + raise ValueError( + f"Upper bound ({upper_bound}) must be higher than lower bound {lower_bound}" + ) + + brightest_pixel_at_full_beam = yield from _max_pixel_at_transmission( + max_pixel, attenuator, xbpm_feedback, 1 + ) + + if brightest_pixel_at_full_beam == 0: + raise ValueError("No beam found at full transmission") + + target_pixel_brightness = brightest_pixel_at_full_beam * target_brightness_fraction + LOGGER.info( + f"Optimising until max pixel in image has a value of {target_pixel_brightness}" ) - target_pixel_l = brightest_pixel_sat * frac_of_max - LOGGER.info(f"~~Target luminosity: {target_pixel_l}~~\n") iterations = 0 while iterations < max_iterations: mid = round((upper_bound + lower_bound) / 2, 2) # limit to 2 dp - LOGGER.info(f"on iteration {iterations}") + LOGGER.info(f"On iteration {iterations}") - yield from bps.mv(attenuator, mid / 100) - yield from bps.trigger(xbpm_feedback, wait=True) - yield from bps.trigger(max_pixel, wait=True) - brightest_pixel = yield from bps.rd(max_pixel.max_pixel_val) + brightest_pixel = yield from _max_pixel_at_transmission( + max_pixel, attenuator, xbpm_feedback, mid / 100 + ) - # brightest_pixel = get_max_pixel_value_from_transmission(transmission=mid) LOGGER.info(f"Upper bound is: {upper_bound}, Lower bound is: {lower_bound}") LOGGER.info( f"Testing transmission {mid}, brightest pixel found {brightest_pixel}" ) - if target_pixel_l - tolerance < brightest_pixel < target_pixel_l + tolerance: + if ( + target_pixel_brightness - tolerance + <= brightest_pixel + <= target_pixel_brightness + tolerance + ): mid = round(mid, 0) LOGGER.info(f"\nOptimal transmission found: {mid}") - yield from bps.trigger(xbpm_feedback) return mid # condition for too low so want to try higher - elif brightest_pixel < target_pixel_l - tolerance: + elif brightest_pixel < target_pixel_brightness - tolerance: LOGGER.info("Result: Too low \n") lower_bound = mid # condition for too high so want to try lower - elif brightest_pixel > target_pixel_l + tolerance: + elif brightest_pixel > target_pixel_brightness + tolerance: LOGGER.info("Result: Too high \n") upper_bound = mid iterations += 1 diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index aad2394d01..7312be8400 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -1,5 +1,4 @@ -from collections.abc import AsyncGenerator -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import MagicMock, call, patch import pytest from bluesky.run_engine import RunEngine @@ -17,16 +16,15 @@ ZebraShutterControl, ZebraShutterState, ) -from ophyd_async.core import init_devices, set_mock_value +from ophyd_async.core import completed_status, init_devices, set_mock_value from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( _prepare_beamline_for_scintillator_images, - optimise_oav_transmission_binary_search, + optimise_transmission_with_oav, take_and_save_oav_image, take_oav_image_with_scintillator_in, ) from mx_bluesky.common.utils.exceptions import BeamlineStateError -from mx_bluesky.common.utils.log import LOGGER async def test_check_exception_raised_if_pin_mounted( @@ -60,11 +58,18 @@ def test_prepare_beamline_for_scint_images( backlight: Backlight, scintillator: Scintillator, xbpm_feedback: XBPMFeedback, + sample_shutter: ZebraShutter, ): test_group = "my_group" messages = sim_run_engine.simulate_plan( _prepare_beamline_for_scintillator_images( - robot, beamstop_phase1, backlight, scintillator, xbpm_feedback, test_group + robot, + beamstop_phase1, + backlight, + scintillator, + xbpm_feedback, + sample_shutter, + test_group, ) ) @@ -257,128 +262,329 @@ async def test_take_and_save_oav_image_in_re(run_engine: RunEngine, oav: OAV, tm oav.snapshot.trigger.assert_called_once() # type: ignore -# add test for transmission optimization -# test that everything is called in the correct order. -# mock out the different transmissions and max vals and make sure you reach what's expected. - - @pytest.fixture() -async def max_pixel() -> AsyncGenerator[MaxPixel]: +async def max_pixel() -> MaxPixel: async with init_devices(mock=True): - max_pixel = MaxPixel("TEST: MAX_PIXEL") - yield max_pixel + max_pixel = MaxPixel("TEST: MAX_PIXEL", "max_pixel") + + max_pixel.trigger = MagicMock(return_value=completed_status()) + return max_pixel -# @patch("mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.brightest_pixel_sat") -# def test_binary_search( -# mock_brightest_pixel: MagicMock, -# max_pixel: MaxPixel, -# attenuator: BinaryFilterAttenuator, -# upper_bound=100, -# lower_bound=0, -# ): -# mock_brightest_pixel.return_value() +def test_optimise_transmission_first_gets_max_pixel_at_100_percent( + sim_run_engine: RunEngineSimulator, + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, +): + max_values = [100, 75] + def return_max_values(_): + return {"readback": {"value": max_values.pop(0)}} -# def test_initial_yield_froms( -# sim_run_engine: RunEngineSimulator, -# max_pixel: MaxPixel, -# attenuator: BinaryFilterAttenuator -# ): -# Messages = sim_run_engine.simulate_plan( -# take_oav_image_with_scintillator_in( + sim_run_engine.add_handler("read", return_max_values, max_pixel.max_pixel_val.name) + messages = sim_run_engine.simulate_plan( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ) -@patch( - "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._get_max_pixel_from_100_transmission" + messages = assert_message_and_return_remaining( + messages, + lambda msg: msg.command == "trigger" and msg.obj == xbpm_feedback, + ) + + messages = assert_message_and_return_remaining( + messages, + lambda msg: msg.command == "set" and msg.obj == attenuator and msg.args[0] == 1, + ) + + messages = assert_message_and_return_remaining( + messages, lambda msg: msg.command == "trigger" and msg.obj == max_pixel + ) + + messages = assert_message_and_return_remaining( + messages, + lambda msg: msg.command == "read" and msg.obj == max_pixel.max_pixel_val, + ) + + +@pytest.mark.parametrize("iterations", [10, 6, 4]) +def test_given_max_pixel_never_changes_then_optimise_transmission_raises_stop_iteration( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + iterations: int, +): + set_mock_value(max_pixel.max_pixel_val, 100) + + with pytest.raises(RuntimeError) as e: + run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + max_iterations=iterations, + ) + ) + + # The RE hides the StopIteration behind a RuntimeError but will mention it in the message + assert "StopIteration" in e.value.args[0] + assert max_pixel.trigger.call_count == iterations + 1 # type: ignore + + +def given_max_values(max_pixel: MaxPixel, max_values: list): + def _set_max_value(): + set_mock_value(max_pixel.max_pixel_val, max_values.pop(0)) + return completed_status() + + max_pixel.trigger.side_effect = _set_max_value # type: ignore + + +@pytest.mark.parametrize( + "lower_bound, upper_bound, expected_final_transmission", + [[0, 100, 50], [0, 10, 5], [5, 25, 15]], ) -def test_optimise_oav_transmission_binary_search( - mock_brightest_pixel: MagicMock, +def test_given_max_pixel_immediately_reaches_target_then_optimise_transmission_returns_half_bounds( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, run_engine: RunEngine, + lower_bound: int, + upper_bound: int, + expected_final_transmission: int, +): + given_max_values(max_pixel, [100, 75]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + lower_bound=lower_bound, + upper_bound=upper_bound, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == expected_final_transmission + + assert attenuator.set.call_args_list == [ # type: ignore + call(1), + call(expected_final_transmission / 100), + ] + + +@pytest.mark.parametrize( + "target_fraction", + [0.75, 0.26, 0.39], +) +def test_optimise_transmission_reaches_different_target_fractions( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + target_fraction: float, +): + given_max_values(max_pixel, [100, 100 * target_fraction]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + target_brightness_fraction=target_fraction, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == 50 + + assert attenuator.set.call_args_list == [call(1), call(0.5)] # type: ignore + + +def test_max_pixel_stays_too_large_then_optimise_transmission_keeps_reducing( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, max_pixel: MaxPixel, + run_engine: RunEngine, +): + given_max_values(max_pixel, [100, 100, 100, 100, 100, 75]) + + final_transmission = run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == 3.0 + + assert attenuator.set.call_args_list == [ # type: ignore + call(1), + call(0.5), + call(pytest.approx(0.25)), + call(pytest.approx(0.125)), + call(pytest.approx(0.0625)), + call(pytest.approx(0.0312)), + ] + + +def test_max_pixel_stays_too_small_then_optimise_transmission_keeps_increasing( attenuator: BinaryFilterAttenuator, - done_status, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, ): - # Simulated transmission-to-pixel mapping - # expecting transmission 18.75 to be correct + given_max_values(max_pixel, [100, 20, 20, 20, 20, 75]) - # put this in a mark.parameterize so that you can try different dicts - mock_brightest_pixel.return_value = [255] - # transmission_map = {100: 255, 50: 180, 25: 160, 18.75: 130, 12.5: 118, 0: 0} - transmission_map = {50: 220, 25: 200, 18.75: 150, 12.5: 127} - # transmission_list = list(transmission_map.keys()) + final_transmission = run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore - max_pixel_list = list(transmission_map.values()) - max_pixel.trigger = MagicMock(return_value=done_status) - attenuator.set = MagicMock() - side_effect_input = [{"readback": i} for i in max_pixel_list] - max_pixel.locate = AsyncMock(side_effect=side_effect_input) + assert final_transmission == 97.0 - run_engine(optimise_oav_transmission_binary_search(upper_bound=100, lower_bound=0)) - assert max_pixel.trigger.call_count == 4 + assert attenuator.set.call_args_list == [ # type: ignore + call(1), + call(0.5), + call(pytest.approx(0.75)), + call(pytest.approx(0.875)), + call(pytest.approx(0.9375)), + call(pytest.approx(0.9688)), + ] - # now defining a mock function - # def mock_get_max_pixel_value_from_transmission(transmission): - # pixel_val = transmission_map.get(transmission) - # if pixel_val is None: - # raise KeyError(f"Transmission {transmission} is not in the mock dictionary") +@pytest.mark.parametrize( + "tolerance, expected_final_transmission, expected_calls", + [ + (10, 50.0, [call(1), call(0.5)]), + (3, 75.0, [call(1), call(0.5), call(0.75)]), + ], +) +def test_different_tolerances_change_when_we_accept( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + tolerance: int, + expected_final_transmission: float, + expected_calls: list, +): + given_max_values(max_pixel, [100, 68, 75]) - # return pixel_val + final_transmission = run_engine( + optimise_transmission_with_oav( + tolerance=tolerance, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore - # AsyncMock(side_effect=[50,75,etc...]) # use one for the transmissions one for the pixel values. + assert final_transmission == expected_final_transmission + assert attenuator.set.call_args_list == expected_calls # type: ignore - # with patch( - # "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.get_max_pixel_value_from_transmission", - # side_effect=mock_get_max_pixel_value_from_transmission, - # ): - # result = optimise_oav_transmission_binary_search(255 / 2, 100, 0, 5, 10) - # assert result == 18.75 +def test_brightness_alternates_above_then_below_target_bounds_shrink_both_sides( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, +): + given_max_values(max_pixel, [100, 90, 60, 85, 65, 75]) -def tranmission_to_max_pixel(transmission): - max_pixel = transmission + 30 - return max_pixel - # if target is 100 then we expect the optimal transmission to be 100 - 30 = 70 + final_transmission = run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ).plan_result # type: ignore + + assert final_transmission == 34.0 + # Note the 2 dp rounding on set values: + assert attenuator.set.call_args_list == [ # type: ignore + call(1), + call(0.5), + call(0.25), + call(pytest.approx(0.375)), + call(pytest.approx(0.3125)), + call(pytest.approx(0.3438)), + ] -def optimise_oav_transmission_binary_search_without_yields( - upper_bound: float, # in percent - lower_bound: float, # in percent - frac_of_max: float = 0.5, - tolerance: int = 1, - max_iterations: int = 5, + +@pytest.mark.parametrize( + "edge_value", + [70, 80], +) +def test_equal_to_target_plus_or_minus_tolerance_matches_target( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, + edge_value: int, ): - target_pixel_l = 255 * frac_of_max + given_max_values(max_pixel, [100, edge_value]) - while max_iterations > tolerance: - mid = round((upper_bound + lower_bound) / 2, 2) # limit to 2 dp - max_iterations -= 1 + plan = optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + plan_result = run_engine(plan).plan_result # type: ignore + + assert plan_result == 50 + assert attenuator.set.call_args_list == [call(1), call(0.5)] # type:ignore - brightest_pixel = tranmission_to_max_pixel(mid) - # brightest_pixel = get_max_pixel_value_from_transmission(transmission=mid) - LOGGER.info(f"Upper bound is: {upper_bound}, Lower bound is: {lower_bound}") - LOGGER.info( - f"Testing transmission {mid}, brightest pixel found {brightest_pixel}" + +def test_optimise_transmission_raises_value_error_when_upper_bound_less_than_lower_bound( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, +): + with pytest.raises(ValueError) as excinfo: + run_engine( + optimise_transmission_with_oav( + lower_bound=60, + upper_bound=40, + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) ) + assert "Upper bound (40) must be higher than lower bound 60" in str(excinfo.value) - if target_pixel_l - tolerance < brightest_pixel < target_pixel_l + tolerance: - mid = round(mid, 0) - LOGGER.info(f"\nOptimal transmission found - {mid}") - return mid + # Ensure nothing was moved/triggered since the + assert attenuator.set.call_count == 0 # type: ignore + assert xbpm_feedback.trigger.call_count == 0 # type: ignore - # condition for too low so want to try higher - elif brightest_pixel < target_pixel_l - tolerance: - LOGGER.info("Result: Too low \n") - lower_bound = mid - # condition for too high so want to try lower - elif brightest_pixel > target_pixel_l + tolerance: - LOGGER.info("Result: Too high \n") - upper_bound = mid - return "Max iterations reached" +def test_optimise_transmission_raises_value_error_when_full_beam_brightness_is_zero( + attenuator: BinaryFilterAttenuator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + run_engine: RunEngine, +): + given_max_values(max_pixel, [0]) + + with pytest.raises(ValueError) as excinfo: + run_engine( + optimise_transmission_with_oav( + max_pixel=max_pixel, + attenuator=attenuator, + xbpm_feedback=xbpm_feedback, + ) + ) + assert "No beam" in str(excinfo.value) -def test_binary_search_logic(): - search = optimise_oav_transmission_binary_search(100, 0) - assert search == 70 + assert attenuator.set.call_count == 1 # type:ignore From cc958af13bb1770e441850914c0e009545c8b444 Mon Sep 17 00:00:00 2001 From: Dominic Oram Date: Tue, 16 Dec 2025 11:26:37 +0000 Subject: [PATCH 17/18] Add more tests --- pyproject.toml | 2 +- .../i04/oav_centering_plans/oav_imaging.py | 20 +- tests/conftest.py | 8 +- tests/system_tests/conftest.py | 4 - .../beamlines/i04/test_oav_imaging.py | 313 +++++++++++++++++- .../unit_tests/beamlines/i04/test_thawing.py | 11 +- .../test_grid_detection_plan.py | 4 - 7 files changed, 332 insertions(+), 30 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 34f7d5d431..03ad7739d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ "ophyd >= 1.10.5", "ophyd-async >= 0.14.0", "bluesky >= 1.14.6", - "dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@1726_beam_centre_device_post_testing", + "dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@a7ccf9519416092be9a1b4eb9968124ea653c3bc", ] diff --git a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py index 75ce4a7c3e..03f64b934e 100644 --- a/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py +++ b/src/mx_bluesky/beamlines/i04/oav_centering_plans/oav_imaging.py @@ -226,6 +226,21 @@ def optimise_transmission_with_oav( raise StopIteration("Max iterations reached") +def _get_all_zoom_levels( + zoom_controller: ZoomControllerWithBeamCentres, +) -> MsgGenerator[list[str]]: + zoom_levels = [] + level_signals = [ + centring_device.level_name + for centring_device in zoom_controller.beam_centres.values() + ] + for signal in level_signals: + level_name = yield from bps.rd(signal) + if level_name: + zoom_levels.append(level_name) + return zoom_levels + + def find_beam_centres( zoom_levels_to_centre: list[str] | None = None, zoom_levels_to_optimise_transmission: list[str] | None = None, @@ -246,6 +261,9 @@ def find_beam_centres( if zoom_levels_to_optimise_transmission is None: zoom_levels_to_optimise_transmission = ["1.0x", "7.5x"] + if zoom_levels_to_centre is None: + zoom_levels_to_centre = yield from _get_all_zoom_levels(zoom_controller) + LOGGER.info("Preparing beamline for images...") yield from _prepare_beamline_for_scintillator_images( robot, @@ -259,7 +277,7 @@ def find_beam_centres( for centring_device in zoom_controller.beam_centres.values(): zoom_name = yield from bps.rd(centring_device.level_name) - if zoom_levels_to_centre is None or zoom_name in zoom_levels_to_centre: + if zoom_name in zoom_levels_to_centre: LOGGER.info(f"Moving to zoom level {zoom_name}") yield from bps.abs_set(zoom_controller, zoom_name, wait=True) if zoom_name in zoom_levels_to_optimise_transmission: diff --git a/tests/conftest.py b/tests/conftest.py index 66eea68538..6649aed1dc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ from pathlib import Path from types import ModuleType from typing import Any -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import MagicMock, patch import numpy import pydantic @@ -510,13 +510,7 @@ def oav(test_config_files): ) oav = i03.oav.build(mock=True, connect_immediately=True, params=parameters) - zoom_levels_list = ["1.0x", "3.0x", "5.0x", "7.5x", "10.0x", "15.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) - # Equivalent to previously set values for microns and beam centre set_mock_value(oav.zoom_controller.level, "5.0x") - set_mock_value(oav.grid_snapshot.x_size, 1024) set_mock_value(oav.grid_snapshot.y_size, 768) diff --git a/tests/system_tests/conftest.py b/tests/system_tests/conftest.py index 75d97ff567..12210ec4cd 100644 --- a/tests/system_tests/conftest.py +++ b/tests/system_tests/conftest.py @@ -201,10 +201,6 @@ async def trigger_with_test_image(self): ): mock_get.return_value.__aenter__.return_value = empty_response set_mock_value(oav.zoom_controller.level, "1.0x") - zoom_levels_list = ["1.0x", "3.0x", "5.0x", "7.5x", "10.0x", "15.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) yield oav diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index 7312be8400..b0665c0513 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -1,13 +1,15 @@ -from unittest.mock import MagicMock, call, patch +from unittest.mock import AsyncMock, MagicMock, call, patch +import bluesky.plan_stubs as bps import pytest from bluesky.run_engine import RunEngine from bluesky.simulators import RunEngineSimulator, assert_message_and_return_remaining from dodal.devices.attenuator.attenuator import BinaryFilterAttenuator from dodal.devices.backlight import Backlight +from dodal.devices.i04.beam_centre import CentreEllipseMethod from dodal.devices.i04.max_pixel import MaxPixel from dodal.devices.mx_phase1.beamstop import Beamstop, BeamstopPositions -from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_detector import OAV, ZoomControllerWithBeamCentres from dodal.devices.robot import BartRobot, PinMounted from dodal.devices.scintillator import InOut, Scintillator from dodal.devices.xbpm_feedback import XBPMFeedback @@ -16,10 +18,16 @@ ZebraShutterControl, ZebraShutterState, ) -from ophyd_async.core import completed_status, init_devices, set_mock_value +from ophyd_async.core import ( + completed_status, + get_mock_put, + init_devices, + set_mock_value, +) from mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging import ( _prepare_beamline_for_scintillator_images, + find_beam_centres, optimise_transmission_with_oav, take_and_save_oav_image, take_oav_image_with_scintillator_in, @@ -588,3 +596,302 @@ def test_optimise_transmission_raises_value_error_when_full_beam_brightness_is_z assert "No beam" in str(excinfo.value) assert attenuator.set.call_count == 1 # type:ignore + + +@pytest.fixture() +async def centre_ellipse() -> CentreEllipseMethod: + async with init_devices(mock=True): + centre_ellipse = CentreEllipseMethod("", "centre_ellipse") + + centre_ellipse.trigger = MagicMock(return_value=completed_status()) + return centre_ellipse + + +def initialise_zoom_centres( + zoom_controller: ZoomControllerWithBeamCentres, init_values: dict +): + for i, (level, beam_centre) in enumerate(init_values.items()): + centre_device = zoom_controller.beam_centres[i] + set_mock_value(centre_device.level_name, level) + set_mock_value(centre_device.x_centre, beam_centre[0]) + set_mock_value(centre_device.y_centre, beam_centre[1]) + + +@pytest.fixture() +async def zoom_controller_with_centres() -> ZoomControllerWithBeamCentres: + async with init_devices(mock=True): + zoom_controller_with_centres = ZoomControllerWithBeamCentres( + "", "zoom_controller_with_centres" + ) + zoom_controller_with_centres.DELAY_BETWEEN_MOTORS_AND_IMAGE_UPDATING_S = 0.001 # type:ignore + + level_names = ["1.0x", "2.0x", "3.0x", "7.5x"] + initialise_zoom_centres( + zoom_controller_with_centres, dict.fromkeys(level_names, (0, 0)) + ) + + return zoom_controller_with_centres + + +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +def test_find_beam_centres_starts_by_prepping_scintillator( + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + zoom_controller_with_centres.beam_centres = {} # type:ignore + run_engine( + find_beam_centres( + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + mock_prepare_scintillator.assert_called_once() + centre_ellipse.trigger.assert_not_called() # type:ignore + + +def mock_centre_ellipse_with_given_centres( + centre_ellipse: CentreEllipseMethod, given_centres: list[tuple[int, int]] +): + def centre_ellipse_trigger_side_effect(*args): + next_centre = given_centres.pop(0) + set_mock_value(centre_ellipse.center_x_val, next_centre[0]) + set_mock_value(centre_ellipse.center_y_val, next_centre[1]) + return completed_status() + + centre_ellipse.trigger.side_effect = centre_ellipse_trigger_side_effect # type:ignore + + +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.optimise_transmission_with_oav" +) +async def test_find_beam_centres_iterates_and_sets_centres( + mock_optimise: AsyncMock, + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + level_names = ["1.0x", "2.0x", "3.0x", "7.5x"] + new_centres = [(100, 100), (200, 200), (300, 300), (400, 400)] + expected_centres = new_centres.copy() + + mock_centre_ellipse_with_given_centres(centre_ellipse, new_centres) + + run_engine( + find_beam_centres( + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + + assert get_mock_put(zoom_controller_with_centres.level).call_count == 4 + assert get_mock_put(zoom_controller_with_centres.level).call_args_list == [ + call(level, wait=True) for level in level_names + ] + + for i, centre in zoom_controller_with_centres.beam_centres.items(): + level_name = await centre.level_name.get_value() + if level_name: + assert level_name == level_names[i] + assert (await centre.x_centre.get_value()) == expected_centres[i][0] + assert (await centre.y_centre.get_value()) == expected_centres[i][1] + + +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.optimise_transmission_with_oav" +) +async def test_if_only_some_levels_given_then_find_beam_centres_iterates_and_sets_those_centres( + mock_optimise: AsyncMock, + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + new_centres = [(100, 100), (200, 200), (300, 300), (400, 400)] + + mock_centre_ellipse_with_given_centres(centre_ellipse, new_centres) + + run_engine( + find_beam_centres( + zoom_levels_to_centre=["1.0x", "7.5x"], + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + + assert get_mock_put(zoom_controller_with_centres.level).call_count == 2 + assert get_mock_put(zoom_controller_with_centres.level).call_args_list == [ + call(level, wait=True) for level in ["1.0x", "7.5x"] + ] + + centres = list(zoom_controller_with_centres.beam_centres.values()) + + assert (await centres[0].level_name.get_value()) == "1.0x" + assert (await centres[0].x_centre.get_value()) == 100 + assert (await centres[0].y_centre.get_value()) == 100 + + for centre in [centres[1], centres[2]]: + assert (await centre.x_centre.get_value()) == 0 + assert (await centre.y_centre.get_value()) == 0 + + assert (await centres[3].level_name.get_value()) == "7.5x" + assert (await centres[3].x_centre.get_value()) == 200 + assert (await centres[3].y_centre.get_value()) == 200 + + +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.optimise_transmission_with_oav" +) +async def test_find_beam_centres_optimises_on_default_levels_only( + mock_optimise: MagicMock, + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + levels_where_optimised = [] + + def append_current_zoom(*args, **kwargs): + current_zoom = yield from bps.rd(zoom_controller_with_centres.level) + levels_where_optimised.append(current_zoom) + yield from bps.null() + + mock_optimise.side_effect = append_current_zoom + + run_engine( + find_beam_centres( + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + + assert mock_optimise.call_count == 2 + assert levels_where_optimised == ["1.0x", "7.5x"] + + +@pytest.mark.asyncio +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging._prepare_beamline_for_scintillator_images" +) +@patch( + "mx_bluesky.beamlines.i04.oav_centering_plans.oav_imaging.optimise_transmission_with_oav" +) +async def test_find_beam_centres_respects_custom_optimise_list( + mock_optimise: MagicMock, + mock_prepare_scintillator: AsyncMock, + robot: BartRobot, + beamstop_phase1: Beamstop, + backlight: Backlight, + scintillator: Scintillator, + xbpm_feedback: XBPMFeedback, + max_pixel: MaxPixel, + centre_ellipse: CentreEllipseMethod, + attenuator: BinaryFilterAttenuator, + zoom_controller_with_centres: ZoomControllerWithBeamCentres, + sample_shutter: ZebraShutter, + run_engine: RunEngine, +): + levels_where_optimised = [] + + def append_current_zoom(*args, **kwargs): + current_zoom = yield from bps.rd(zoom_controller_with_centres.level) + levels_where_optimised.append(current_zoom) + yield from bps.null() + + mock_optimise.side_effect = append_current_zoom + + run_engine( + find_beam_centres( + zoom_levels_to_optimise_transmission=["2.0x", "3.0x"], + robot=robot, + beamstop=beamstop_phase1, + backlight=backlight, + scintillator=scintillator, + xbpm_feedback=xbpm_feedback, + max_pixel=max_pixel, + centre_ellipse=centre_ellipse, + attenuator=attenuator, + zoom_controller=zoom_controller_with_centres, + shutter=sample_shutter, + ) + ) + + assert mock_optimise.call_count == 2 + assert levels_where_optimised == ["2.0x", "3.0x"] diff --git a/tests/unit_tests/beamlines/i04/test_thawing.py b/tests/unit_tests/beamlines/i04/test_thawing.py index 9b1fb7eb14..409d9fac51 100644 --- a/tests/unit_tests/beamlines/i04/test_thawing.py +++ b/tests/unit_tests/beamlines/i04/test_thawing.py @@ -1,6 +1,6 @@ import json from functools import partial -from unittest.mock import ANY, AsyncMock, MagicMock, call, patch +from unittest.mock import ANY, MagicMock, call, patch import pytest from bluesky.run_engine import RunEngine @@ -46,10 +46,6 @@ async def oav_full_screen() -> OAV: oav = OAVBeamCentrePV( "", config=oav_config, name="oav_full_screen", mjpeg_prefix="XTAL" ) - zoom_levels_list = ["1.0x", "2.0x", "5.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) set_mock_value(oav.zoom_controller.level, "1.0x") set_mock_value(oav.grid_snapshot.x_size, 1024) set_mock_value(oav.grid_snapshot.y_size, 768) @@ -61,11 +57,6 @@ async def oav_roi() -> OAV: oav_config = OAVConfig(ZOOM_LEVELS_XML) async with init_devices(mock=True, connect=True): oav = OAVBeamCentrePV("", config=oav_config, name="oav") - zoom_levels_list = ["1.0x", "2.0x", "5.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) - set_mock_value(oav.zoom_controller.level, "5.0x") set_mock_value(oav.grid_snapshot.x_size, 512) set_mock_value(oav.grid_snapshot.y_size, 384) diff --git a/tests/unit_tests/common/experiment_plans/test_grid_detection_plan.py b/tests/unit_tests/common/experiment_plans/test_grid_detection_plan.py index 1892e270f1..88ee27981c 100644 --- a/tests/unit_tests/common/experiment_plans/test_grid_detection_plan.py +++ b/tests/unit_tests/common/experiment_plans/test_grid_detection_plan.py @@ -60,10 +60,6 @@ def fake_devices( test_config_files["zoom_params_file"], test_config_files["display_config"] ) oav = i03.oav.build(connect_immediately=True, mock=True, params=params) - zoom_levels_list = ["1.0x", "3.0x", "5.0x", "7.5x", "10.0x", "15.0x"] - oav.zoom_controller._get_allowed_zoom_levels = AsyncMock( - return_value=zoom_levels_list - ) set_mock_value(oav.zoom_controller.level, "5.0x") set_mock_value(oav.grid_snapshot.x_size, 1024) set_mock_value(oav.grid_snapshot.y_size, 768) From cddb95469c6afc2c007769e78bf865e369e38496 Mon Sep 17 00:00:00 2001 From: Dominic Oram Date: Wed, 17 Dec 2025 14:32:42 +0000 Subject: [PATCH 18/18] Remove unneeded patch --- tests/unit_tests/beamlines/i04/test_oav_imaging.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit_tests/beamlines/i04/test_oav_imaging.py b/tests/unit_tests/beamlines/i04/test_oav_imaging.py index b0665c0513..9a54289112 100644 --- a/tests/unit_tests/beamlines/i04/test_oav_imaging.py +++ b/tests/unit_tests/beamlines/i04/test_oav_imaging.py @@ -623,7 +623,6 @@ async def zoom_controller_with_centres() -> ZoomControllerWithBeamCentres: zoom_controller_with_centres = ZoomControllerWithBeamCentres( "", "zoom_controller_with_centres" ) - zoom_controller_with_centres.DELAY_BETWEEN_MOTORS_AND_IMAGE_UPDATING_S = 0.001 # type:ignore level_names = ["1.0x", "2.0x", "3.0x", "7.5x"] initialise_zoom_centres(