Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .env.example

This file was deleted.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ lockfiles/
bob/
bob_custom/
saved.yaml
controller.yaml
10 changes: 10 additions & 0 deletions controller_example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
controller:
uris:
pitch: SIM/000000
yaw: SIM/000001
transport:
- ioc:
pv_prefix: PV_PREFIX
gui:
output_path: /epics/opi/output.bob
title: Alignment Mirror
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies = [
"pytest-asyncio>=1.3.0",
"python-dotenv>=1.2.1",
"softioc==4.7.0",
"stdio-socket>=1.3.1",
"typer>=0.20.0",
] # Add project dependencies here, e.g. ["click", "numpy"]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
82 changes: 44 additions & 38 deletions src/fastcs_standa_mirror/__main__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Interface for ``python -m fastcs_standa_mirror``."""

import logging
import os
from argparse import ArgumentParser
from functools import cache
from pathlib import Path

from dotenv import load_dotenv
import typer
import yaml
from fastcs.launch import FastCS
from fastcs.transports.epics import EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.ca.transport import EpicsCATransport

from fastcs_standa_mirror.config import Config
from fastcs_standa_mirror.mirror_controller import MirrorController
from fastcs_standa_mirror.utils import (
load_devices,
Expand All @@ -22,60 +23,65 @@

logging.basicConfig(level=logging.INFO)

app = typer.Typer()

def main() -> None:
"""Argument parser for the CLI."""
parser = ArgumentParser()
parser.add_argument(
"-v",

def version_callback(value: bool):
if value:
typer.echo(__version__)
raise typer.Exit()


@app.callback()
def main(
version: bool | None = typer.Option(
None,
"--version",
action="version",
version=__version__,
)
parser.parse_args()
callback=version_callback,
is_eager=True,
help="Print the version and exit",
),
):
pass

# Load environment variables
load_dotenv()

# Get device uris
device_pitch_uri = os.getenv("DEVICE_PITCH_URI")
device_yaw_uri = os.getenv("DEVICE_YAW_URI")
@cache
def load_config(config_file: Path) -> Config:
config = Config(**yaml.safe_load(config_file.read_text()))

if device_pitch_uri is None or device_yaw_uri is None:
raise ValueError("DEVICE_PITCH_URI and DEVICE_YAW_URI must be set")
config.controller.sim = any(
uri.startswith("SIM")
for uri in [config.controller.uris.pitch, config.controller.uris.yaw]
)

# Detect if we're using a sim
use_sim = device_pitch_uri.startswith("SIM") or device_yaw_uri.startswith("SIM")
return config

# Load pv prefix
pv_prefix = os.getenv("PV_PREFIX")

if pv_prefix is None:
raise ValueError("PV_PREFIX environment variable must be set")
@app.command()
def run(config_file: Path):
config = load_config(config_file)

if use_sim:
logging.info(f"Using simulated devices with PV_PREFIX -> {pv_prefix}")
else:
print(pv_prefix)
if config.controller.sim:
logging.info("Using simulated devices")
logging.info(f"PV PREFIX = {config.transport[0].ioc.pv_prefix}")

saved_positions = load_or_create_saved_pos()
uris = load_devices(use_sim=use_sim)

# epics setup
gui_options = EpicsGUIOptions(
output_path=Path(".") / "bob/Mirror.bob", title="Mirror Controller"
)
uris = load_devices(use_sim=config.controller.sim, uris=config.controller.uris)

epics_ca = EpicsCATransport(
gui=gui_options, epicsca=EpicsIOCOptions(pv_prefix=pv_prefix)
gui=EpicsGUIOptions(
output_path=Path(config.transport[0].gui.output_path),
title=config.transport[0].gui.title,
),
epicsca=EpicsIOCOptions(pv_prefix=config.transport[0].ioc.pv_prefix),
)

# run fastcs instance
controller = MirrorController(uris["pitch"], uris["yaw"], saved_positions)
controller = MirrorController(uris, saved_positions)
fastcs = FastCS(controller, [epics_ca])

fastcs.run()


if __name__ == "__main__":
main()
app()
30 changes: 30 additions & 0 deletions src/fastcs_standa_mirror/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pydantic import BaseModel


class URIs(BaseModel):
pitch: str
yaw: str


class ControllerConfig(BaseModel):
uris: URIs
sim: bool = False


class IOCConfig(BaseModel):
pv_prefix: str


class GUIConfig(BaseModel):
output_path: str
title: str


class TransportConfig(BaseModel):
ioc: IOCConfig
gui: GUIConfig


class Config(BaseModel):
controller: ControllerConfig
transport: list[TransportConfig]
7 changes: 4 additions & 3 deletions src/fastcs_standa_mirror/mirror_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastcs.datatypes import Float
from fastcs.methods import command

from fastcs_standa_mirror.config import URIs
from fastcs_standa_mirror.io.mirror_attribute import (
MirrorAttributeIO,
MirrorAttributeIORef,
Expand All @@ -19,11 +20,11 @@ class MirrorController(Controller):
speed = AttrRW(Float(), io_ref=MirrorAttributeIORef("speed"), group="Global")
jog_step = AttrRW(Float(), io_ref=MirrorAttributeIORef("jog_step"), group="Global")

def __init__(self, pitch_uri: str, yaw_uri: str, saved_positions: dict):
def __init__(self, uris: URIs, saved_positions: dict):
super().__init__(ios=[MirrorAttributeIO(self)])

pitch = MotorController("pitch", pitch_uri)
yaw = MotorController("yaw", yaw_uri)
pitch = MotorController("pitch", uris.pitch)
yaw = MotorController("yaw", uris.yaw)

self.pitch: MotorController
self.yaw: MotorController
Expand Down
34 changes: 14 additions & 20 deletions src/fastcs_standa_mirror/utils.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,31 @@
import logging
import os
from pathlib import Path

import libximc.highlevel as ximc
import yaml

from fastcs_standa_mirror.config import URIs


class DeviceNotFoundError(Exception):
"""Raised when expected device uris are not found"""

pass


def load_devices(use_sim: bool) -> dict:
"""Load device uris for pitch and yaw controllers"""
def load_devices(use_sim: bool, uris: URIs) -> URIs:
"""Load devices for pitch and yaw controllers"""

return create_simulated_devices() if use_sim else load_real_devices()
return create_simulated_devices() if use_sim else load_real_devices(uris)


def load_real_devices() -> dict:
def load_real_devices(uris: URIs) -> URIs:
"""Discover and validate real device uris against config"""

logging.info("Looking for real standa devices")

target_uris = {
"pitch": os.getenv("DEVICE_PITCH_URI"),
"yaw": os.getenv("DEVICE_YAW_URI"),
}

logging.debug("Target uris:")
for v in target_uris.values():
for v in uris.model_dump().values():
logging.debug(v)

devices = ximc.enumerate_devices(ximc.EnumerateFlags.ENUMERATE_ALL_COM)
Expand All @@ -41,32 +37,30 @@ def load_real_devices() -> dict:

missing_devices = []

for name, uri in target_uris.items():
for name, uri in uris.model_dump().items():
if uri in real_uris:
logging.info(f"Found {name} controller")
else:
missing_devices.append(name)

if missing_devices:
raise DeviceNotFoundError(
f"Expected devices not found: {', '.join(missing_devices)}"
)

return target_uris
return uris


def create_simulated_devices() -> dict:
def create_simulated_devices() -> URIs:
"""Create simulated devices and return uris"""
logging.info("Creating simulated standa devices")

sim_dir = Path.cwd() / "sim"

device_uri_base = f"xi-emu:///{sim_dir}/simulated_motor_controller"

return {
"pitch": f"{device_uri_base}_pitch.bin",
"yaw": f"{device_uri_base}_yaw.bin",
}
return URIs(
pitch=f"{device_uri_base}_pitch.bin",
yaw=f"{device_uri_base}_yaw.bin",
)


def load_or_create_saved_pos() -> dict:
Expand Down
Loading