Skip to content
31 changes: 30 additions & 1 deletion src/dodal/beamlines/i19_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
AttenuatorMotorSquad,
)
from dodal.devices.i19.access_controlled.blueapi_device import HutchState
from dodal.devices.i19.access_controlled.piezo_control import (
AccessControlledPiezoActuator,
FocusingMirrorType,
)
from dodal.devices.i19.access_controlled.shutter import (
AccessControlledShutter,
HutchState,
)
from dodal.devices.i19.beamstop import BeamStop
from dodal.devices.oav.oav_detector import OAVBeamCentreFile
Expand Down Expand Up @@ -101,3 +104,29 @@ def zebra() -> Zebra:
mapping=I19_1_ZEBRA_MAPPING,
prefix=f"{PREFIX.beamline_prefix}-EA-ZEBRA-02:",
)


@device_factory()
def hfm_piezo() -> AccessControlledPiezoActuator:
"""Get the i19-1 access controlled hfm piezo device, instantiate it if it hasn't already been.
If this is called when already instantiated, it will return the existing object.
"""
return AccessControlledPiezoActuator(
prefix=f"{PREFIX.beamline_prefix}-OP-HFM-01:",
mirror_type=FocusingMirrorType.HFM,
hutch=HutchState.EH1,
instrument_session=I19_1_COMMISSIONING_INSTR_SESSION,
)


@device_factory()
def vfm_piezo() -> AccessControlledPiezoActuator:
"""Get the i19-1 access controlled vfm piezo device, instantiate it if it hasn't already been.
If this is called when already instantiated, it will return the existing object.
"""
return AccessControlledPiezoActuator(
prefix=f"{PREFIX.beamline_prefix}-OP-VFM-01:",
mirror_type=FocusingMirrorType.VFM,
hutch=HutchState.EH1,
instrument_session=I19_1_COMMISSIONING_INSTR_SESSION,
)
30 changes: 30 additions & 0 deletions src/dodal/beamlines/i19_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
AttenuatorMotorSquad,
)
from dodal.devices.i19.access_controlled.blueapi_device import HutchState
from dodal.devices.i19.access_controlled.piezo_control import (
AccessControlledPiezoActuator,
FocusingMirrorType,
)
from dodal.devices.i19.access_controlled.shutter import AccessControlledShutter
from dodal.devices.i19.backlight import BacklightPosition
from dodal.devices.i19.beamstop import BeamStop
Expand Down Expand Up @@ -138,3 +142,29 @@ def zebra() -> Zebra:
mapping=I19_2_ZEBRA_MAPPING,
prefix=f"{PREFIX.beamline_prefix}-EA-ZEBRA-03:",
)


@device_factory()
def hfm_piezo() -> AccessControlledPiezoActuator:
"""Get the i19-2 access controlled hfm piezo device, instantiate it if it hasn't already been.
If this is called when already instantiated, it will return the existing object.
"""
return AccessControlledPiezoActuator(
prefix=f"{PREFIX.beamline_prefix}-OP-HFM-01:",
mirror_type=FocusingMirrorType.HFM,
hutch=HutchState.EH2,
instrument_session=I19_2_COMMISSIONING_INSTR_SESSION,
)


@device_factory()
def vfm_piezo() -> AccessControlledPiezoActuator:
"""Get the i19-2 access controlled vfm piezo device, instantiate it if it hasn't already been.
If this is called when already instantiated, it will return the existing object.
"""
return AccessControlledPiezoActuator(
prefix=f"{PREFIX.beamline_prefix}-OP-VFM-01:",
mirror_type=FocusingMirrorType.VFM,
hutch=HutchState.EH2,
instrument_session=I19_2_COMMISSIONING_INSTR_SESSION,
)
17 changes: 17 additions & 0 deletions src/dodal/beamlines/i19_optics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dodal.common.beamlines.beamline_utils import (
set_beamline as set_utils_beamline,
)
from dodal.devices.focusing_mirror import FocusingMirrorWithPiezo
from dodal.devices.hutch_shutter import HutchShutter
from dodal.devices.i19.access_controlled.hutch_access import (
ACCESS_DEVICE_NAME,
Expand Down Expand Up @@ -34,3 +35,19 @@ def access_control() -> HutchAccessControl:
return HutchAccessControl(
f"{PREFIX.beamline_prefix}-OP-STAT-01:", ACCESS_DEVICE_NAME
)


@device_factory()
def vfm() -> FocusingMirrorWithPiezo:
"""Get the i19 vfm device, instantiate it if it hasn't already been.
If this is called when already instantiated, it will return the existing object.
"""
return FocusingMirrorWithPiezo(f"{PREFIX.beamline_prefix}-OP-VFM-01:")


@device_factory()
def hfm() -> FocusingMirrorWithPiezo:
"""Get the i19 hfm device, instantiate it if it hasn't already been.
If this is called when already instantiated, it will return the existing object.
"""
return FocusingMirrorWithPiezo(f"{PREFIX.beamline_prefix}-OP-HFM-01:")
13 changes: 13 additions & 0 deletions src/dodal/devices/focusing_mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,16 @@ def energy_to_stripe(self, energy_kev) -> MirrorStripeConfiguration:
return {"stripe": MirrorStripe.BARE, "yaw_mrad": 6.2, "lat_mm": 0.0}
else:
return {"stripe": MirrorStripe.RHODIUM, "yaw_mrad": 0.0, "lat_mm": 10.0}


class FocusingMirrorWithPiezo(FocusingMirror):
"""A focusing mirror which also has a piezoelectric actuator.
A voltage can be applied to the piezo to steer the beam by making the material
shrink or expand.
"""

def __init__(self, prefix: str, name: str = "", *args, **kwargs):
with self.add_children_as_readables():
self.piezo = epics_signal_rw(float, f"{prefix}AOFPITCH")
self.piezo_rbv = epics_signal_r(float, f"{prefix}AOFPITCH:RBV")
super().__init__(prefix, name, *args, **kwargs)
72 changes: 72 additions & 0 deletions src/dodal/devices/i19/access_controlled/piezo_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from enum import StrEnum

from ophyd_async.core import AsyncStatus, StandardReadableFormat
from ophyd_async.epics.core import epics_signal_r

from dodal.devices.i19.access_controlled.blueapi_device import (
HutchState,
OpticsBlueAPIDevice,
)
from dodal.devices.i19.access_controlled.hutch_access import ACCESS_DEVICE_NAME


class FocusingMirrorType(StrEnum):
VFM = "vfm"
HFM = "hfm"


PIEZO_CONTROL_PLAN_NAME = {
FocusingMirrorType.VFM: "apply_voltage_to_vfm_piezo",
FocusingMirrorType.HFM: "apply_voltage_to_hfm_piezo",
}


# NOTE This device is only meant to control the piezo. There should be a separate device
# to control the actual focusing mirror motors, as the two operations are often done
# independently.
class AccessControlledPiezoActuator(OpticsBlueAPIDevice):
"""I19-specific device to set a voltage on the focusing mirror piezoelectric
actuator.

This device will send a REST call to the blueapi instance controlling the optics
hutch running on the I19 cluster, which will evaluate the current hutch in use vs
the hutch sending the request and decide if the plan will be run or not.
As the two hutches are located in series, checking the hutch in use is necessary to
avoid accidentally operating the shutter from one hutch while the other has beamtime.

The name of the hutch that wants to operate the shutter, as well as a commissioning
directory to act as a placehlder for the instrument_session,should be passed to the
device upon instantiation.

A mirror type (vfm or hfm) also needs to be set upon instantiation so that the
correct plan can be run and the correct optics device is injected.

For details see the architecture described in
https://diamondlightsource.github.io/i19-bluesky/main/explanations/decisions/0004-optics-blueapi-architecture.html
"""

def __init__(
self,
prefix: str,
mirror_type: FocusingMirrorType,
hutch: HutchState,
instrument_session: str = "",
name: str = "",
):
with self.add_children_as_readables(StandardReadableFormat.HINTED_SIGNAL):
self.readback = epics_signal_r(float, f"{prefix}AOFPITCH:RBV")
self.mirror = mirror_type
super().__init__(hutch=hutch, instrument_session=instrument_session, name=name)

@AsyncStatus.wrap
async def set(self, value: float):
request_params = {
"name": PIEZO_CONTROL_PLAN_NAME[self.mirror],
"params": {
"experiment_hutch": self._invoking_hutch,
"access_device": ACCESS_DEVICE_NAME,
"voltage_demand": value,
},
"instrument_session": self.instrument_session,
}
await super().set(request_params)
169 changes: 169 additions & 0 deletions tests/devices/i19/access_controlled/test_piezo_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import json
from unittest.mock import AsyncMock, patch

import pytest
from ophyd_async.core import SignalR, init_devices, set_mock_value
from ophyd_async.testing import assert_reading, partial_reading

from dodal.devices.i19.access_controlled.blueapi_device import HEADERS, HutchState
from dodal.devices.i19.access_controlled.piezo_control import (
AccessControlledPiezoActuator,
FocusingMirrorType,
)


@pytest.fixture
def eh1_vfm_piezo() -> AccessControlledPiezoActuator:
with init_devices(mock=True):
v_piezo = AccessControlledPiezoActuator(
"",
FocusingMirrorType.VFM,
HutchState.EH1,
"cm12345-1",
name="mock_vfm_piezo",
)
v_piezo.url = "http://test.url"
set_mock_value(v_piezo.readback, 1.0)
return v_piezo


@pytest.fixture
def eh2_hfm_piezo() -> AccessControlledPiezoActuator:
with init_devices(mock=True):
h_piezo = AccessControlledPiezoActuator(
"",
FocusingMirrorType.HFM,
HutchState.EH2,
"cm12345-1",
name="mock_hfm_piezo",
)
h_piezo.url = "http://test.url"
set_mock_value(h_piezo.readback, 1.0)
return h_piezo


@pytest.mark.parametrize(
"hutch_name, mirror_type",
[
(HutchState.EH1, FocusingMirrorType.HFM),
(HutchState.EH2, FocusingMirrorType.VFM),
],
)
def test_device_created_without_errors(
hutch_name: HutchState, mirror_type: FocusingMirrorType
):
test_device = AccessControlledPiezoActuator(
"", mirror_type, hutch_name, "cm12345-1", "fake_piezo"
)
assert isinstance(test_device, AccessControlledPiezoActuator)
assert isinstance(test_device.readback, SignalR)


async def test_vfm_piezo_rbv_value_can_be_read(
eh1_vfm_piezo: AccessControlledPiezoActuator,
):
await assert_reading(
eh1_vfm_piezo, {"mock_vfm_piezo-readback": partial_reading(1.0)}
)


async def test_hfm_piezo_rbv_value_can_be_read(
eh2_hfm_piezo: AccessControlledPiezoActuator,
):
await assert_reading(
eh2_hfm_piezo, {"mock_hfm_piezo-readback": partial_reading(1.0)}
)


async def test_vfm_piezo_makes_the_correct_rest_call(
eh1_vfm_piezo: AccessControlledPiezoActuator,
):
voltage_demand = 3.2
expected_params = {
"name": "apply_voltage_to_vfm_piezo",
"params": {
"experiment_hutch": "EH1",
"access_device": "access_control",
"voltage_demand": voltage_demand,
},
"instrument_session": "cm12345-1",
}
expected_params_json = json.dumps(expected_params)
with (
patch(
"dodal.devices.i19.access_controlled.blueapi_device.ClientSession.post"
) as mock_post,
patch(
"dodal.devices.i19.access_controlled.blueapi_device.ClientSession.put"
) as mock_put,
patch(
"dodal.devices.i19.access_controlled.blueapi_device.ClientSession.get"
) as mock_get,
):
mock_post.return_value.__aenter__.return_value = (mock_response := AsyncMock())
mock_response.ok = True
mock_response.json.return_value = {"task_id": "abc0-3fg8"}
mock_put.return_value.__aenter__.return_value = (
mock_put_response := AsyncMock()
)
mock_put_response.ok = True
mock_get.return_value.__aenter__.return_value = (
mock_get_response := AsyncMock()
)
mock_get_response.json.return_value = {"is_complete": True, "errors": []}

await eh1_vfm_piezo.set(voltage_demand)

mock_post.assert_called_with(
"/tasks", data=expected_params_json, headers=HEADERS
)
mock_put.assert_called_with(
"/worker/task", data='{"task_id": "abc0-3fg8"}', headers=HEADERS
)


async def test_hfm_piezo_makes_the_correct_rest_call(
eh2_hfm_piezo: AccessControlledPiezoActuator,
):
voltage_demand = 1.5
expected_params = {
"name": "apply_voltage_to_hfm_piezo",
"params": {
"experiment_hutch": "EH2",
"access_device": "access_control",
"voltage_demand": voltage_demand,
},
"instrument_session": "cm12345-1",
}
expected_params_json = json.dumps(expected_params)
with (
patch(
"dodal.devices.i19.access_controlled.blueapi_device.ClientSession.post"
) as mock_post,
patch(
"dodal.devices.i19.access_controlled.blueapi_device.ClientSession.put"
) as mock_put,
patch(
"dodal.devices.i19.access_controlled.blueapi_device.ClientSession.get"
) as mock_get,
):
mock_post.return_value.__aenter__.return_value = (mock_response := AsyncMock())
mock_response.ok = True
mock_response.json.return_value = {"task_id": "abc0-3fg8"}
mock_put.return_value.__aenter__.return_value = (
mock_put_response := AsyncMock()
)
mock_put_response.ok = True
mock_get.return_value.__aenter__.return_value = (
mock_get_response := AsyncMock()
)
mock_get_response.json.return_value = {"is_complete": True, "errors": []}

await eh2_hfm_piezo.set(voltage_demand)

mock_post.assert_called_with(
"/tasks", data=expected_params_json, headers=HEADERS
)
mock_put.assert_called_with(
"/worker/task", data='{"task_id": "abc0-3fg8"}', headers=HEADERS
)
Loading