Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,4 @@ lockfiles/
.env
bob/
bob_custom/
virt/
home.yaml
saved.yaml
4 changes: 4 additions & 0 deletions sim/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Ignore every file in this folder
*
# Except this one
!.gitignore
46 changes: 22 additions & 24 deletions src/fastcs_standa_mirror/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from fastcs_standa_mirror.mirror_controller import MirrorController
from fastcs_standa_mirror.utils import (
load_devices,
load_or_create_home_pos,
load_or_create_saved_pos,
)

from . import __version__
Expand All @@ -32,35 +32,33 @@ def main() -> None:
action="version",
version=__version__,
)
parser.add_argument(
"--sim",
action="store_true",
dest="use_sim",
help="Use simulated device",
)
parser.parse_args()

parsed_args = parser.parse_args()
use_sim = parsed_args.use_sim
# Load environment variables
load_dotenv()

# Validate device URIs and PV_PREFIX only if not using simulation
if not use_sim:
load_dotenv()
# Get device uris
device_pitch_uri = os.getenv("DEVICE_PITCH_URI")
device_yaw_uri = os.getenv("DEVICE_YAW_URI")

pv_prefix = os.getenv("PV_PREFIX")
print(pv_prefix)
if pv_prefix is None:
raise ValueError("PV_PREFIX environment variable must be set")
if device_pitch_uri is None or device_yaw_uri is None:
raise ValueError("DEVICE_PITCH_URI and DEVICE_YAW_URI must be set")

device_pitch_uri = os.getenv("DEVICE_PITCH_URI")
device_yaw_uri = os.getenv("DEVICE_YAW_URI")
if device_pitch_uri is None or device_yaw_uri is None:
raise ValueError("DEVICE_PITCH_URI and DEVICE_YAW_URI must be set")
# Detect if we're using a sim
use_sim = device_pitch_uri.startswith("SIM") or device_yaw_uri.startswith("SIM")

# Load pv prefix
pv_prefix = os.getenv("PV_PREFIX")

if pv_prefix is None:
raise ValueError("PV_PREFIX environment variable must be set")

if use_sim:
logging.info(f"Using simulated devices with PV_PREFIX -> {pv_prefix}")
else:
pv_prefix = "MIRROR-SIM-001"
logging.info(f"Simulated device PV_PREFIX -> {pv_prefix}")
print(pv_prefix)

home_positions = load_or_create_home_pos()
saved_positions = load_or_create_saved_pos()
uris = load_devices(use_sim=use_sim)

# epics setup
Expand All @@ -73,7 +71,7 @@ def main() -> None:
)

# run fastcs instance
controller = MirrorController(uris["pitch"], uris["yaw"], home_positions)
controller = MirrorController(uris["pitch"], uris["yaw"], saved_positions)
fastcs = FastCS(controller, [epics_ca])

fastcs.run()
Expand Down
42 changes: 21 additions & 21 deletions src/fastcs_standa_mirror/mirror_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
MirrorAttributeIORef,
)
from fastcs_standa_mirror.motor_controller import MotorController
from fastcs_standa_mirror.utils import save_home_pos
from fastcs_standa_mirror.utils import save_pos


class MirrorController(Controller):
Expand All @@ -19,7 +19,7 @@ class MirrorController(Controller):
speed = AttrRW(Float(), io_ref=MirrorAttributeIORef("speed"), group="Global")
jog_step = AttrRW(Float(), io_ref=MirrorAttributeIORef("jog_step"), group="Global")

def __init__(self, pitch_uri: str, yaw_uri: str, home_positions: dict):
def __init__(self, pitch_uri: str, yaw_uri: str, saved_positions: dict):
super().__init__(ios=[MirrorAttributeIO(self)])

pitch = MotorController("pitch", pitch_uri)
Expand All @@ -31,44 +31,44 @@ def __init__(self, pitch_uri: str, yaw_uri: str, home_positions: dict):
self.add_sub_controller("pitch", pitch)
self.add_sub_controller("yaw", yaw)

self.pitch.set_home_position(home_positions.get("pitch", 0))
self.yaw.set_home_position(home_positions.get("yaw", 0))
self.pitch.set_saved_position(saved_positions.get("pitch", 0))
self.yaw.set_saved_position(saved_positions.get("yaw", 0))

self.jog_step_size: int = 100

async def initialise(self):
await self.calibrate()
return await super().initialise()

async def calibrate(self) -> None:
logging.info("Calibrating motor zero positions")
await self.pitch.calibrate()
await self.yaw.calibrate()
@command()
async def home(self) -> None:
logging.info("Homing motors")
await self.pitch.home()
await self.yaw.home()

@command()
async def stop_moving(self) -> None:
"""Stop all motors"""
await self.pitch.stop_moving()
await self.yaw.stop_moving()

@command(group="Home")
async def rehome(self) -> None:
"""Return to home"""
logging.info("Returning to home position")
await self.pitch.move_home()
await self.yaw.move_home()
@command(group="Saved")
async def return_to_saved(self) -> None:
"""Return to saved position"""
logging.info("Returning to saved position")
await self.pitch.move_to_saved()
await self.yaw.move_to_saved()

@command(group="Home")
@command(group="Saved")
async def save(self) -> None:
"""Save home location"""
"""Save location"""
pitch = await self.pitch.get_current_position()
yaw = await self.yaw.get_current_position()

logging.info(f"Saving home position - (pitch: {pitch} - yaw: {yaw})")
self.pitch.set_home_position(pitch)
self.yaw.set_home_position(yaw)
logging.info(f"Saving position - (pitch: {pitch} - yaw: {yaw})")
self.pitch.set_saved_position(pitch)
self.yaw.set_saved_position(yaw)

save_home_pos({"pitch": pitch, "yaw": yaw})
save_pos({"pitch": pitch, "yaw": yaw})

@command(group="Jog")
async def up(self) -> None:
Expand Down
28 changes: 14 additions & 14 deletions src/fastcs_standa_mirror/motor_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class MotorController(Controller):
_device_uri: str = ""

current = AttrR(Float(), io_ref=MotorAttributeIORef("current"), group="Position")
home = AttrR(Float(), io_ref=MotorAttributeIORef("home"), group="Position")
saved = AttrR(Float(), io_ref=MotorAttributeIORef("saved"), group="Position")
moving = AttrR(Bool(), io_ref=MotorAttributeIORef("moving"), group="Status")

def __init__(self, name: str, device_uri: str):
Expand All @@ -39,13 +39,13 @@ def __init__(self, name: str, device_uri: str):
)
raise

self.home_position: int = 0
self.saved_position: int = 0

super().__init__(name, ios=[MotorAttributeIO(self)])

async def calibrate(self) -> None:
logging.info(f"Setting {self._name} zero position")
self.motor.command_homezero()
async def home(self) -> None:
logging.info(f"Homing {self._name}")
self.motor.command_home()

@command()
async def stop_moving(self) -> None:
Expand All @@ -61,18 +61,18 @@ async def move_relative(self, distance: int) -> None:
"""Move by relative distance"""
self.motor.command_movr(distance, 0)

async def move_home(self) -> None:
"""Move to home position"""
self.motor.command_move(self.home_position, 0)
async def move_to_saved(self) -> None:
"""Move to saved position"""
self.motor.command_move(self.saved_position, 0)

async def get_current_position(self) -> int:
"""Get current position"""
return self.motor.get_position().Position

async def get_home_position(self) -> int:
"""Get home position"""
return self.home_position
async def get_saved_position(self) -> int:
"""Get saved position"""
return self.saved_position

def set_home_position(self, new_home_position) -> None:
"""Set a new home position"""
self.home_position = new_home_position
def set_saved_position(self, new_saved_position) -> None:
"""Set a new saved position"""
self.saved_position = new_saved_position
24 changes: 12 additions & 12 deletions src/fastcs_standa_mirror/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,26 @@ def create_simulated_devices() -> dict:
"""Create simulated devices and return uris"""
logging.info("Creating simulated standa devices")

virt_dir = Path.cwd() / "virt"
sim_dir = Path.cwd() / "sim"

device_uri_base = f"xi-emu:///{virt_dir}/simulated_motor_controller"
device_uri_base = f"xi-emu:///{sim_dir}/simulated_motor_controller"

return {
"pitch": f"{device_uri_base}_pitch.bin",
"yaw": f"{device_uri_base}_yaw.bin",
}


def load_or_create_home_pos() -> dict:
"""Load home positions from yaml file or create if not exists"""
def load_or_create_saved_pos() -> dict:
"""Load saved positions from yaml file or create if not exists"""

if Path("home.yaml").exists():
home_positions = load_yaml("home.yaml")
if Path("saved.yaml").exists():
saved_positions = load_yaml("saved.yaml")
else:
home_positions = {"pitch": 0, "yaw": 0}
save_home_pos(home_positions)
saved_positions = {"pitch": 0, "yaw": 0}
save_pos(saved_positions)

return home_positions
return saved_positions


def load_yaml(filename: str) -> dict:
Expand All @@ -90,8 +90,8 @@ def load_yaml(filename: str) -> dict:
return data


def save_home_pos(data: dict) -> None:
"""save dict data to home.yaml"""
def save_pos(data: dict) -> None:
"""save dict data to saved.yaml"""

with open("home.yaml", "w") as file:
with open("saved.yaml", "w") as file:
yaml.dump(data, file, default_flow_style=False)
18 changes: 9 additions & 9 deletions tests/test_controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from fastcs_standa_mirror.mirror_controller import MirrorController
from fastcs_standa_mirror.utils import (
DeviceNotFoundError,
load_or_create_home_pos,
load_or_create_saved_pos,
load_real_devices,
save_home_pos,
save_pos,
)


Expand Down Expand Up @@ -86,30 +86,30 @@ async def test_jog_commands_use_correct_step_size(mock_axis):

@patch("fastcs_standa_mirror.motor_controller.ximc.Axis")
@pytest.mark.asyncio
async def test_rehome_moves_both_motors_to_home(mock_axis):
async def test_return_moves_both_motors_to_saved(mock_axis):
mock_motor = Mock()
mock_axis.return_value = mock_motor

controller = MirrorController(
"xi-com:\\\\.\\COM3", "xi-com:\\\\.\\COM4", {"pitch": 1000, "yaw": 2000}
)

await controller.rehome()
await controller.return_to_saved()

assert mock_motor.command_move.call_count == 2


def test_home_position_save_and_load(tmp_path):
def test_saved_position_save_and_load(tmp_path):
original_dir = os.getcwd()
os.chdir(tmp_path)

try:
# Save home positions
home_data = {"pitch": 1500, "yaw": 2500}
save_home_pos(home_data)
# Save positions
saved_data = {"pitch": 1500, "yaw": 2500}
save_pos(saved_data)

# Load them back (simulating restart)
loaded_data = load_or_create_home_pos()
loaded_data = load_or_create_saved_pos()

# Verify they match
assert loaded_data["pitch"] == 1500
Expand Down