diff --git a/src/dodal/beamlines/i19_1.py b/src/dodal/beamlines/i19_1.py index 02588e05f21..5e44ff969ba 100644 --- a/src/dodal/beamlines/i19_1.py +++ b/src/dodal/beamlines/i19_1.py @@ -8,9 +8,12 @@ AttenuatorMotorSquad, ) from dodal.devices.i19.access_controlled.blueapi_device import HutchState +from dodal.devices.i19.access_controlled.piezo_control import ( + AccessControlledPiezoActuator, + FocusingMirrorType, +) from dodal.devices.i19.access_controlled.shutter import ( AccessControlledShutter, - HutchState, ) from dodal.devices.i19.beamstop import BeamStop from dodal.devices.oav.oav_detector import OAVBeamCentreFile @@ -101,3 +104,29 @@ def zebra() -> Zebra: mapping=I19_1_ZEBRA_MAPPING, prefix=f"{PREFIX.beamline_prefix}-EA-ZEBRA-02:", ) + + +@device_factory() +def hfm_piezo() -> AccessControlledPiezoActuator: + """Get the i19-1 access controlled hfm piezo device, instantiate it if it hasn't already been. + If this is called when already instantiated, it will return the existing object. + """ + return AccessControlledPiezoActuator( + prefix=f"{PREFIX.beamline_prefix}-OP-HFM-01:", + mirror_type=FocusingMirrorType.HFM, + hutch=HutchState.EH1, + instrument_session=I19_1_COMMISSIONING_INSTR_SESSION, + ) + + +@device_factory() +def vfm_piezo() -> AccessControlledPiezoActuator: + """Get the i19-1 access controlled vfm piezo device, instantiate it if it hasn't already been. + If this is called when already instantiated, it will return the existing object. + """ + return AccessControlledPiezoActuator( + prefix=f"{PREFIX.beamline_prefix}-OP-VFM-01:", + mirror_type=FocusingMirrorType.VFM, + hutch=HutchState.EH1, + instrument_session=I19_1_COMMISSIONING_INSTR_SESSION, + ) diff --git a/src/dodal/beamlines/i19_2.py b/src/dodal/beamlines/i19_2.py index 77d118000e9..1ec536e6640 100644 --- a/src/dodal/beamlines/i19_2.py +++ b/src/dodal/beamlines/i19_2.py @@ -16,6 +16,10 @@ AttenuatorMotorSquad, ) from dodal.devices.i19.access_controlled.blueapi_device import HutchState +from dodal.devices.i19.access_controlled.piezo_control import ( + AccessControlledPiezoActuator, + FocusingMirrorType, +) from dodal.devices.i19.access_controlled.shutter import AccessControlledShutter from dodal.devices.i19.backlight import BacklightPosition from dodal.devices.i19.beamstop import BeamStop @@ -138,3 +142,29 @@ def zebra() -> Zebra: mapping=I19_2_ZEBRA_MAPPING, prefix=f"{PREFIX.beamline_prefix}-EA-ZEBRA-03:", ) + + +@device_factory() +def hfm_piezo() -> AccessControlledPiezoActuator: + """Get the i19-2 access controlled hfm piezo device, instantiate it if it hasn't already been. + If this is called when already instantiated, it will return the existing object. + """ + return AccessControlledPiezoActuator( + prefix=f"{PREFIX.beamline_prefix}-OP-HFM-01:", + mirror_type=FocusingMirrorType.HFM, + hutch=HutchState.EH2, + instrument_session=I19_2_COMMISSIONING_INSTR_SESSION, + ) + + +@device_factory() +def vfm_piezo() -> AccessControlledPiezoActuator: + """Get the i19-2 access controlled vfm piezo device, instantiate it if it hasn't already been. + If this is called when already instantiated, it will return the existing object. + """ + return AccessControlledPiezoActuator( + prefix=f"{PREFIX.beamline_prefix}-OP-VFM-01:", + mirror_type=FocusingMirrorType.VFM, + hutch=HutchState.EH2, + instrument_session=I19_2_COMMISSIONING_INSTR_SESSION, + ) diff --git a/src/dodal/beamlines/i19_optics.py b/src/dodal/beamlines/i19_optics.py index fe171575637..6d00fe99dd4 100644 --- a/src/dodal/beamlines/i19_optics.py +++ b/src/dodal/beamlines/i19_optics.py @@ -4,6 +4,7 @@ from dodal.common.beamlines.beamline_utils import ( set_beamline as set_utils_beamline, ) +from dodal.devices.focusing_mirror import FocusingMirrorWithPiezo from dodal.devices.hutch_shutter import HutchShutter from dodal.devices.i19.access_controlled.hutch_access import ( ACCESS_DEVICE_NAME, @@ -34,3 +35,19 @@ def access_control() -> HutchAccessControl: return HutchAccessControl( f"{PREFIX.beamline_prefix}-OP-STAT-01:", ACCESS_DEVICE_NAME ) + + +@device_factory() +def vfm() -> FocusingMirrorWithPiezo: + """Get the i19 vfm device, instantiate it if it hasn't already been. + If this is called when already instantiated, it will return the existing object. + """ + return FocusingMirrorWithPiezo(f"{PREFIX.beamline_prefix}-OP-VFM-01:") + + +@device_factory() +def hfm() -> FocusingMirrorWithPiezo: + """Get the i19 hfm device, instantiate it if it hasn't already been. + If this is called when already instantiated, it will return the existing object. + """ + return FocusingMirrorWithPiezo(f"{PREFIX.beamline_prefix}-OP-HFM-01:") diff --git a/src/dodal/devices/focusing_mirror.py b/src/dodal/devices/focusing_mirror.py index a0af981fd01..440b6f28d6d 100644 --- a/src/dodal/devices/focusing_mirror.py +++ b/src/dodal/devices/focusing_mirror.py @@ -218,3 +218,16 @@ def energy_to_stripe(self, energy_kev) -> MirrorStripeConfiguration: return {"stripe": MirrorStripe.BARE, "yaw_mrad": 6.2, "lat_mm": 0.0} else: return {"stripe": MirrorStripe.RHODIUM, "yaw_mrad": 0.0, "lat_mm": 10.0} + + +class FocusingMirrorWithPiezo(FocusingMirror): + """A focusing mirror which also has a piezoelectric actuator. + A voltage can be applied to the piezo to steer the beam by making the material + shrink or expand. + """ + + def __init__(self, prefix: str, name: str = "", *args, **kwargs): + with self.add_children_as_readables(): + self.piezo = epics_signal_rw(float, f"{prefix}AOFPITCH") + self.piezo_rbv = epics_signal_r(float, f"{prefix}AOFPITCH:RBV") + super().__init__(prefix, name, *args, **kwargs) diff --git a/src/dodal/devices/i19/access_controlled/piezo_control.py b/src/dodal/devices/i19/access_controlled/piezo_control.py new file mode 100644 index 00000000000..e9eb0913831 --- /dev/null +++ b/src/dodal/devices/i19/access_controlled/piezo_control.py @@ -0,0 +1,72 @@ +from enum import StrEnum + +from ophyd_async.core import AsyncStatus, StandardReadableFormat +from ophyd_async.epics.core import epics_signal_r + +from dodal.devices.i19.access_controlled.blueapi_device import ( + HutchState, + OpticsBlueAPIDevice, +) +from dodal.devices.i19.access_controlled.hutch_access import ACCESS_DEVICE_NAME + + +class FocusingMirrorType(StrEnum): + VFM = "vfm" + HFM = "hfm" + + +PIEZO_CONTROL_PLAN_NAME = { + FocusingMirrorType.VFM: "apply_voltage_to_vfm_piezo", + FocusingMirrorType.HFM: "apply_voltage_to_hfm_piezo", +} + + +# NOTE This device is only meant to control the piezo. There should be a separate device +# to control the actual focusing mirror motors, as the two operations are often done +# independently. +class AccessControlledPiezoActuator(OpticsBlueAPIDevice): + """I19-specific device to set a voltage on the focusing mirror piezoelectric + actuator. + + This device will send a REST call to the blueapi instance controlling the optics + hutch running on the I19 cluster, which will evaluate the current hutch in use vs + the hutch sending the request and decide if the plan will be run or not. + As the two hutches are located in series, checking the hutch in use is necessary to + avoid accidentally operating the shutter from one hutch while the other has beamtime. + + The name of the hutch that wants to operate the shutter, as well as a commissioning + directory to act as a placehlder for the instrument_session,should be passed to the + device upon instantiation. + + A mirror type (vfm or hfm) also needs to be set upon instantiation so that the + correct plan can be run and the correct optics device is injected. + + For details see the architecture described in + https://diamondlightsource.github.io/i19-bluesky/main/explanations/decisions/0004-optics-blueapi-architecture.html + """ + + def __init__( + self, + prefix: str, + mirror_type: FocusingMirrorType, + hutch: HutchState, + instrument_session: str = "", + name: str = "", + ): + with self.add_children_as_readables(StandardReadableFormat.HINTED_SIGNAL): + self.readback = epics_signal_r(float, f"{prefix}AOFPITCH:RBV") + self.mirror = mirror_type + super().__init__(hutch=hutch, instrument_session=instrument_session, name=name) + + @AsyncStatus.wrap + async def set(self, value: float): + request_params = { + "name": PIEZO_CONTROL_PLAN_NAME[self.mirror], + "params": { + "experiment_hutch": self._invoking_hutch, + "access_device": ACCESS_DEVICE_NAME, + "voltage_demand": value, + }, + "instrument_session": self.instrument_session, + } + await super().set(request_params) diff --git a/tests/devices/i19/access_controlled/test_piezo_control.py b/tests/devices/i19/access_controlled/test_piezo_control.py new file mode 100644 index 00000000000..980cf438d0b --- /dev/null +++ b/tests/devices/i19/access_controlled/test_piezo_control.py @@ -0,0 +1,169 @@ +import json +from unittest.mock import AsyncMock, patch + +import pytest +from ophyd_async.core import SignalR, init_devices, set_mock_value +from ophyd_async.testing import assert_reading, partial_reading + +from dodal.devices.i19.access_controlled.blueapi_device import HEADERS, HutchState +from dodal.devices.i19.access_controlled.piezo_control import ( + AccessControlledPiezoActuator, + FocusingMirrorType, +) + + +@pytest.fixture +def eh1_vfm_piezo() -> AccessControlledPiezoActuator: + with init_devices(mock=True): + v_piezo = AccessControlledPiezoActuator( + "", + FocusingMirrorType.VFM, + HutchState.EH1, + "cm12345-1", + name="mock_vfm_piezo", + ) + v_piezo.url = "http://test.url" + set_mock_value(v_piezo.readback, 1.0) + return v_piezo + + +@pytest.fixture +def eh2_hfm_piezo() -> AccessControlledPiezoActuator: + with init_devices(mock=True): + h_piezo = AccessControlledPiezoActuator( + "", + FocusingMirrorType.HFM, + HutchState.EH2, + "cm12345-1", + name="mock_hfm_piezo", + ) + h_piezo.url = "http://test.url" + set_mock_value(h_piezo.readback, 1.0) + return h_piezo + + +@pytest.mark.parametrize( + "hutch_name, mirror_type", + [ + (HutchState.EH1, FocusingMirrorType.HFM), + (HutchState.EH2, FocusingMirrorType.VFM), + ], +) +def test_device_created_without_errors( + hutch_name: HutchState, mirror_type: FocusingMirrorType +): + test_device = AccessControlledPiezoActuator( + "", mirror_type, hutch_name, "cm12345-1", "fake_piezo" + ) + assert isinstance(test_device, AccessControlledPiezoActuator) + assert isinstance(test_device.readback, SignalR) + + +async def test_vfm_piezo_rbv_value_can_be_read( + eh1_vfm_piezo: AccessControlledPiezoActuator, +): + await assert_reading( + eh1_vfm_piezo, {"mock_vfm_piezo-readback": partial_reading(1.0)} + ) + + +async def test_hfm_piezo_rbv_value_can_be_read( + eh2_hfm_piezo: AccessControlledPiezoActuator, +): + await assert_reading( + eh2_hfm_piezo, {"mock_hfm_piezo-readback": partial_reading(1.0)} + ) + + +async def test_vfm_piezo_makes_the_correct_rest_call( + eh1_vfm_piezo: AccessControlledPiezoActuator, +): + voltage_demand = 3.2 + expected_params = { + "name": "apply_voltage_to_vfm_piezo", + "params": { + "experiment_hutch": "EH1", + "access_device": "access_control", + "voltage_demand": voltage_demand, + }, + "instrument_session": "cm12345-1", + } + expected_params_json = json.dumps(expected_params) + with ( + patch( + "dodal.devices.i19.access_controlled.blueapi_device.ClientSession.post" + ) as mock_post, + patch( + "dodal.devices.i19.access_controlled.blueapi_device.ClientSession.put" + ) as mock_put, + patch( + "dodal.devices.i19.access_controlled.blueapi_device.ClientSession.get" + ) as mock_get, + ): + mock_post.return_value.__aenter__.return_value = (mock_response := AsyncMock()) + mock_response.ok = True + mock_response.json.return_value = {"task_id": "abc0-3fg8"} + mock_put.return_value.__aenter__.return_value = ( + mock_put_response := AsyncMock() + ) + mock_put_response.ok = True + mock_get.return_value.__aenter__.return_value = ( + mock_get_response := AsyncMock() + ) + mock_get_response.json.return_value = {"is_complete": True, "errors": []} + + await eh1_vfm_piezo.set(voltage_demand) + + mock_post.assert_called_with( + "/tasks", data=expected_params_json, headers=HEADERS + ) + mock_put.assert_called_with( + "/worker/task", data='{"task_id": "abc0-3fg8"}', headers=HEADERS + ) + + +async def test_hfm_piezo_makes_the_correct_rest_call( + eh2_hfm_piezo: AccessControlledPiezoActuator, +): + voltage_demand = 1.5 + expected_params = { + "name": "apply_voltage_to_hfm_piezo", + "params": { + "experiment_hutch": "EH2", + "access_device": "access_control", + "voltage_demand": voltage_demand, + }, + "instrument_session": "cm12345-1", + } + expected_params_json = json.dumps(expected_params) + with ( + patch( + "dodal.devices.i19.access_controlled.blueapi_device.ClientSession.post" + ) as mock_post, + patch( + "dodal.devices.i19.access_controlled.blueapi_device.ClientSession.put" + ) as mock_put, + patch( + "dodal.devices.i19.access_controlled.blueapi_device.ClientSession.get" + ) as mock_get, + ): + mock_post.return_value.__aenter__.return_value = (mock_response := AsyncMock()) + mock_response.ok = True + mock_response.json.return_value = {"task_id": "abc0-3fg8"} + mock_put.return_value.__aenter__.return_value = ( + mock_put_response := AsyncMock() + ) + mock_put_response.ok = True + mock_get.return_value.__aenter__.return_value = ( + mock_get_response := AsyncMock() + ) + mock_get_response.json.return_value = {"is_complete": True, "errors": []} + + await eh2_hfm_piezo.set(voltage_demand) + + mock_post.assert_called_with( + "/tasks", data=expected_params_json, headers=HEADERS + ) + mock_put.assert_called_with( + "/worker/task", data='{"task_id": "abc0-3fg8"}', headers=HEADERS + ) diff --git a/tests/devices/test_focusing_mirror.py b/tests/devices/test_focusing_mirror.py index 441acd97954..e4e008af9fd 100644 --- a/tests/devices/test_focusing_mirror.py +++ b/tests/devices/test_focusing_mirror.py @@ -6,6 +6,7 @@ from bluesky.run_engine import RunEngine from bluesky.utils import FailedStatus from ophyd_async.core import ( + SignalR, callback_on_mock_put, get_mock_put, init_devices, @@ -13,6 +14,7 @@ ) from dodal.devices.focusing_mirror import ( + FocusingMirrorWithPiezo, FocusingMirrorWithStripes, MirrorStripe, MirrorStripeConfiguration, @@ -253,3 +255,13 @@ async def test_given_striped_focussing_mirror_then_energy_to_stripe_returns_expe with init_devices(mock=True): device = FocusingMirrorWithStripes(prefix="-OP-VFM-01:", name="mirror") assert device.energy_to_stripe(energy_kev) == expected_config + + +async def test_focusing_mirror_with_piezo(): + with init_devices(mock=True): + device = FocusingMirrorWithPiezo(prefix="-OP-VFM-01:", name="vfm") + assert isinstance(device.piezo_rbv, SignalR) + + await device.piezo.set(3.795) + + assert await device.piezo.get_value() == 3.795