diff --git a/hw_device_mgr/cia_301/command.py b/hw_device_mgr/cia_301/command.py
index 8dfddaa1..3c702e08 100644
--- a/hw_device_mgr/cia_301/command.py
+++ b/hw_device_mgr/cia_301/command.py
@@ -25,7 +25,9 @@ def scan_bus(self, bus=0):
"""Scan bus, returning list of addresses and IDs for each device."""
@abc.abstractmethod
- def upload(self, address=None, index=None, subindex=0, datatype=None):
+ def upload(
+ self, address=None, index=None, subindex=0, datatype=None, **kwargs
+ ):
"""Upload a value from a device SDO."""
@abc.abstractmethod
@@ -36,6 +38,7 @@ def download(
subindex=0,
value=None,
datatype=None,
+ **kwargs,
):
"""Download a value to a device SDO."""
diff --git a/hw_device_mgr/cia_301/config.py b/hw_device_mgr/cia_301/config.py
index 7033b030..a18e73b4 100644
--- a/hw_device_mgr/cia_301/config.py
+++ b/hw_device_mgr/cia_301/config.py
@@ -1,6 +1,7 @@
from .data_types import CiA301DataType
-from .command import CiA301Command, CiA301SimCommand
+from .command import CiA301Command, CiA301SimCommand, CiA301CommandException
from .sdo import CiA301SDO
+from functools import cached_property
class CiA301Config:
@@ -98,17 +99,33 @@ def sdo_ix(cls, ix):
ix = (dtc.uint16(ix[0]), dtc.uint8(ix[1]))
return ix
+ @cached_property
+ def sdos(self):
+ assert self.model_id in self._model_sdos
+ return self._model_sdos[self.model_id].values()
+
def sdo(self, ix):
if isinstance(ix, self.sdo_class):
return ix
ix = self.sdo_ix(ix)
return self._model_sdos[self.model_id][ix]
+ def dump_param_values(self):
+ res = dict()
+ for sdo in self.sdos:
+ try:
+ res[sdo] = self.upload(sdo, stderr_to_devnull=True)
+ except CiA301CommandException as e:
+ # Objects may not exist, like variable length PDO mappings
+ self.logger.debug(f"Upload {sdo} failed: {e}")
+ pass
+ return res
+
#
# Param read/write
#
- def upload(self, sdo):
+ def upload(self, sdo, **kwargs):
# Get SDO object
sdo = self.sdo(sdo)
res_raw = self.command().upload(
@@ -116,10 +133,11 @@ def upload(self, sdo):
index=sdo.index,
subindex=sdo.subindex,
datatype=sdo.data_type,
+ **kwargs,
)
return sdo.data_type(res_raw)
- def download(self, sdo, val, dry_run=False):
+ def download(self, sdo, val, dry_run=False, **kwargs):
# Get SDO object
sdo = self.sdo(sdo)
# Check before setting value to avoid unnecessary NVRAM writes
@@ -128,6 +146,7 @@ def download(self, sdo, val, dry_run=False):
index=sdo.index,
subindex=sdo.subindex,
datatype=sdo.data_type,
+ **kwargs,
)
if sdo.data_type(res_raw) == val:
return # SDO value already correct
@@ -140,6 +159,7 @@ def download(self, sdo, val, dry_run=False):
subindex=sdo.subindex,
value=val,
datatype=sdo.data_type,
+ **kwargs,
)
#
@@ -187,24 +207,21 @@ def set_device_config(cls, config):
cls._device_config.clear()
cls._device_config.extend(config)
- def munge_config(self, config_raw):
+ @classmethod
+ def munge_config(cls, config_raw, position):
+ config_cooked = config_raw.copy()
+ # Convert model ID ints
+ model_id = (config_raw["vendor_id"], config_raw["product_code"])
+ model_id = cls.format_model_id(model_id)
+ config_cooked["vendor_id"], config_cooked["product_code"] = model_id
# Flatten out param_values key
- pv = dict()
+ config_cooked["param_values"] = dict()
for ix, val in config_raw.get("param_values", dict()).items():
- ix = self.sdo_class.parse_idx_str(ix)
+ ix = cls.sdo_class.parse_idx_str(ix)
if isinstance(val, list):
- pos_ix = config_raw["positions"].index(self.position)
+ pos_ix = config_raw["positions"].index(position)
val = val[pos_ix]
- pv[ix] = val
- dtc = self.data_type_class
- config_raw["vendor_id"] = dtc.uint32(config_raw["vendor_id"])
- config_raw["product_code"] = dtc.uint32(config_raw["product_code"])
- config_cooked = dict(
- vendor_id=config_raw["vendor_id"],
- product_code=config_raw["product_code"],
- param_values=pv,
- sync_manager=config_raw.get("sync_manager", dict()),
- )
+ config_cooked["param_values"][ix] = val
# Return pruned config dict
return config_cooked
@@ -225,7 +242,7 @@ def config(self):
else:
raise KeyError(f"No config for device at {self.address}")
# Prune & cache config
- self._config = self.munge_config(conf)
+ self._config = self.munge_config(conf, self.position)
# Return cached config
return self._config
diff --git a/hw_device_mgr/cia_402/device.py b/hw_device_mgr/cia_402/device.py
index bbdd2c6f..8ccf35f4 100644
--- a/hw_device_mgr/cia_402/device.py
+++ b/hw_device_mgr/cia_402/device.py
@@ -173,10 +173,10 @@ def get_feedback(self):
goal_reasons.append(f"state flag {flag_name} != {not flag_val}")
if not goal_reached:
- fb_out.update(
- goal_reached=False, goal_reason="; ".join(goal_reasons)
- )
- self.logger.debug(f"Device {self.address}: Goal not reached:")
+ goal_reason = "; ".join(goal_reasons)
+ fb_out.update(goal_reached=False, goal_reason=goal_reason)
+ if fb_out.changed("goal_reason"):
+ self.logger.debug(f"{self}: Goal not reached: {goal_reason}")
return fb_out
state_bits = {
@@ -492,13 +492,13 @@ def set_sim_feedback(self):
# Log changes
if self.sim_feedback.changed("control_mode_fb"):
cm = self.sim_feedback.get("control_mode_fb")
- self.logger.info(f"{self} next control_mode_fb: 0x{cm:04X}")
+ self.logger.info(f"{self} sim control_mode_fb: 0x{cm:04X}")
if self.sim_feedback.changed("status_word"):
sw = self.sim_feedback.get("status_word")
flags = ",".join(k for k, v in sw_flags.items() if v)
flags = f" flags: {flags}" if flags else ""
self.logger.info(
- f"{self} sim next status_word: 0x{sw:04X} {state} {flags}"
+ f"{self} sim status_word: 0x{sw:04X} {state} {flags}"
)
return sfb
diff --git a/hw_device_mgr/device.py b/hw_device_mgr/device.py
index e1a2a98f..096d40e0 100644
--- a/hw_device_mgr/device.py
+++ b/hw_device_mgr/device.py
@@ -3,6 +3,8 @@
from .logging import Logging
from .interface import Interface
from .data_types import DataType
+from functools import cached_property
+import re
class Device(abc.ABC):
@@ -36,7 +38,7 @@ def __init__(self, address=None):
self.address = address
self.init_interfaces()
- def init(self, index=None):
+ def init(self):
"""
Initialize device.
@@ -44,7 +46,7 @@ def init(self, index=None):
outside the constructor. Implementations should always call
`super().init()`.
"""
- self.index = index
+ pass
@classmethod
def merge_dict_attrs(cls, attr):
@@ -62,6 +64,21 @@ def merge_dict_attrs(cls, attr):
res.update(c_attr)
return res
+ slug_separator = "."
+
+ @cached_property
+ def addr_slug(self):
+ """
+ Return a slug generated from the device address.
+
+ The slug is computed by separating numeric components of the
+ device `address` string with the `slug_separator` character,
+ default `.`, e.g. `(0,5)` -> `0.5`. This is intended to be
+ useful for inclusion into identifiers.
+ """
+ addr_prefix = re.sub(r"[^0-9]+", self.slug_separator, str(self.address))
+ return addr_prefix.strip(self.slug_separator)
+
def init_interfaces(self):
intfs = self._interfaces = dict()
dt_name2cls = self.data_type_class.by_shared_name
@@ -95,6 +112,7 @@ def interface_changed(self, what, key, return_vals=False):
def read(self):
"""Read `feedback_in` from hardware interface."""
+ self._interfaces["feedback_in"].set()
def get_feedback(self):
"""Process `feedback_in` and return `feedback_out` interface."""
@@ -369,7 +387,7 @@ def read(self):
"""Read `feedback_in` from hardware interface."""
super().read()
sfb = self._interfaces["sim_feedback"].get()
- self._interfaces["feedback_in"].set(**sfb)
+ self._interfaces["feedback_in"].update(**sfb)
def set_sim_feedback(self):
"""Simulate feedback from command and feedback."""
diff --git a/hw_device_mgr/devices/device_xml/SV660_EOE_1Axis_V9.12.xml b/hw_device_mgr/devices/device_xml/SV660_EOE_1Axis_V9.12.xml
index 631a7da5..648b9f3f 100644
--- a/hw_device_mgr/devices/device_xml/SV660_EOE_1Axis_V9.12.xml
+++ b/hw_device_mgr/devices/device_xml/SV660_EOE_1Axis_V9.12.xml
@@ -2093,6 +2093,28 @@
rw
+
+
+ 6
+ Stop mode at S-ON OFF
+ UINT
+ 16
+ 96
+
+ rw
+
+
+
+
+ 7
+ Stop mode at No. 2 fault
+ INT
+ 16
+ 112
+
+ rw
+
+
8
Stop mode at limit switch signal
@@ -2541,6 +2563,17 @@
rw
+
+
+ 24
+ EtherCAT forced DO output logic in non-OP status
+ UINT
+ 16
+ 112
+
+ rw
+
+
DT2005
@@ -2585,6 +2618,28 @@
rw
+
+
+ 8
+ Numerator of electronic gear ratio
+ UDINT
+ 32
+ 128
+
+ rw
+
+
+
+
+ 10
+ Denominator of electronic gear ratio
+ UDINT
+ 32
+ 160
+
+ rw
+
+
20
Speed feedforward control selection
@@ -4841,6 +4896,17 @@
rw
+
+
+ 3
+ Offline inertia autotuning selection
+ UINT
+ 16
+ 48
+
+ rw
+
+
5
Encoder ROM reading/writing
@@ -6923,6 +6989,24 @@
0
+
+
+ Stop mode at S-ON OFF
+
+ -3
+ 1
+ 0
+
+
+
+
+ Stop mode at No. 2 fault
+
+ -5
+ 3
+ 2
+
+
Stop mode at limit switch signal
@@ -7295,6 +7379,15 @@
0
+
+
+ EtherCAT forced DO output logic in non-OP status
+
+ 0
+ 7
+ 1
+
+
ro
@@ -7337,6 +7430,24 @@
0
+
+
+ Numerator of electronic gear ratio
+
+ 0
+ 4294967295
+ 1
+
+
+
+
+ Denominator of electronic gear ratio
+
+ 0
+ 4294967295
+ 1
+
+
Speed feedforward control selection
@@ -9059,6 +9170,15 @@
0
+
+
+ Offline inertia autotuning selection
+
+ 0
+ 1
+ 0
+
+
Encoder ROM reading/writing
diff --git a/hw_device_mgr/errors/device.py b/hw_device_mgr/errors/device.py
index 0dcfc544..2a43b4b0 100644
--- a/hw_device_mgr/errors/device.py
+++ b/hw_device_mgr/errors/device.py
@@ -16,11 +16,14 @@ class ErrorDevice(Device):
device_error_dir = "device_err"
- feedback_data_types = dict(error_code="uint32")
- feedback_defaults = dict(
+ feedback_in_data_types = dict(error_code="uint32")
+ feedback_in_defaults = dict(error_code=0)
+
+ feedback_out_defaults = dict(
error_code=0, description="No error", advice="No error"
)
- no_error = feedback_defaults
+
+ no_error = feedback_out_defaults
data_type_class = DataType
@@ -49,22 +52,29 @@ def error_descriptions(cls):
errs[int(err_code_str, 0)] = err_data
return cls._error_descriptions[cls.name]
- def set_feedback(self, error_code=0, **kwargs):
- super().set_feedback(**kwargs)
+ def get_feedback(self):
+ fb_out = super().get_feedback()
+ error_code = self.feedback_in.get("error_code")
if not error_code:
- self.feedback.update(**self.no_error)
- return
+ self.feedback_out.update(**self.no_error)
+ return fb_out
error_info = self.error_descriptions().get(error_code, None)
if error_info is None:
- self.feedback.update(
+ fb_out.update(
description=f"Unknown error code {error_code}",
advice="Please consult with hardware vendor",
error_code=error_code,
)
- return
-
- self.feedback.update(error_code=error_code, **error_info)
+ return fb_out
+ else:
+ fb_out.update(error_code=error_code, **error_info)
+ if fb_out.changed("error_code"):
+ msg = "error code {}: {}".format(
+ error_code, fb_out.get("description")
+ )
+ self.logger.error(msg)
+ return fb_out
class ErrorSimDevice(ErrorDevice, SimDevice):
diff --git a/hw_device_mgr/hal/data_types.py b/hw_device_mgr/hal/data_types.py
index cb56dd49..a89cd4b5 100644
--- a/hw_device_mgr/hal/data_types.py
+++ b/hw_device_mgr/hal/data_types.py
@@ -11,15 +11,22 @@ class HALDataType(DataType, HALMixin):
int8=dict(hal_type=HALMixin.HAL_S32),
int16=dict(hal_type=HALMixin.HAL_S32),
int32=dict(hal_type=HALMixin.HAL_S32),
- int64=dict(hal_type=HALMixin.HAL_S64),
uint8=dict(hal_type=HALMixin.HAL_U32),
uint16=dict(hal_type=HALMixin.HAL_U32),
uint32=dict(hal_type=HALMixin.HAL_U32),
- uint64=dict(hal_type=HALMixin.HAL_U64),
float=dict(hal_type=HALMixin.HAL_FLOAT),
double=dict(hal_type=HALMixin.HAL_FLOAT),
# No HAL_STR type
)
+ have_64 = hasattr(HALMixin, "HAL_S64")
+ if have_64:
+ # Machinekit HAL has 64-bit int types, but not LCNC
+ subtype_data.update(
+ dict(
+ int64=dict(hal_type=HALMixin.HAL_S64),
+ uint64=dict(hal_type=HALMixin.HAL_U64),
+ )
+ )
@classmethod
def hal_type_str(cls):
diff --git a/hw_device_mgr/hal/device.py b/hw_device_mgr/hal/device.py
index f2be2dc7..6de2a154 100644
--- a/hw_device_mgr/hal/device.py
+++ b/hw_device_mgr/hal/device.py
@@ -1,6 +1,7 @@
from ..device import Device, SimDevice
from .base import HALMixin
from .data_types import HALDataType
+from functools import cached_property
class HALPinDevice(Device, HALMixin):
@@ -14,26 +15,31 @@ class HALPinDevice(Device, HALMixin):
command_out=(HALMixin.HAL_OUT, ""),
)
- # Prepend this to HAL pin names
- dev_pin_prefix = "d"
-
- @property
+ @cached_property
def compname(self):
return self.comp.getprefix()
def pin_name(self, interface, pname):
return self.pin_prefix + self.pin_interfaces[interface][1] + pname
- def init(self, *, comp, **kwargs):
+ @cached_property
+ def pin_prefix(self):
+ """
+ HAL pin prefix for this device.
+
+ Pin prefix is computed by separating numeric components of the
+ device `address` string with `.` and adding a final `.`, e.g.
+ `(0,5)` -> `0.5.`.
+ """
+ return f"{self.addr_slug}{self.slug_separator}"
+
+ def init(self, /, comp, **kwargs):
super().init(**kwargs)
self.comp = comp
# Get specs for all pins in all interfaces; shared pin names must match,
# except for direction, which becomes HAL_IO if different
all_specs = dict()
- self.pin_prefix = (
- "" if self.index is None else f"{self.dev_pin_prefix}{self.index}_"
- )
for iface, params in self.pin_interfaces.items():
for base_pname, new_spec in self.iface_pin_specs(iface).items():
if base_pname not in all_specs:
diff --git a/hw_device_mgr/hal/tests/test_data_types.py b/hw_device_mgr/hal/tests/test_data_types.py
index b4f2b23e..5307b3a8 100644
--- a/hw_device_mgr/hal/tests/test_data_types.py
+++ b/hw_device_mgr/hal/tests/test_data_types.py
@@ -20,6 +20,11 @@ class TestHALDataType(BaseHALTestClass, _TestDataType):
def test_hal_type_str(self):
for shared_name, exp_str in self.sname_to_typestr.items():
+ if shared_name not in self.data_type_class.subtype_data:
+ # LinuxCNC HAL doesn't have 64-bit int types
+ assert shared_name.endswith("64")
+ print(f"Skipping 64-bit int type {shared_name}")
+ continue
cls = self.data_type_class.by_shared_name(shared_name)
exp_int = self.data_type_class.hal_enum(exp_str[4:])
cls_str, cls_int = (cls.hal_type_str(), cls.hal_type)
diff --git a/hw_device_mgr/lcec/command.py b/hw_device_mgr/lcec/command.py
index 050efbe0..9c9bd146 100644
--- a/hw_device_mgr/lcec/command.py
+++ b/hw_device_mgr/lcec/command.py
@@ -17,7 +17,9 @@ class LCECCommand(EtherCATCommand):
def _parse_output(cls, resp, kwargs):
return resp
- def _ethercat(self, *args, log_lev="debug", dry_run=False):
+ def _ethercat(
+ self, *args, log_lev="debug", dry_run=False, stderr_to_devnull=False
+ ):
"""
Run IgH EtherCAT Master `ethercat` utility.
@@ -30,8 +32,9 @@ def _ethercat(self, *args, log_lev="debug", dry_run=False):
return
getattr(self.logger, log_lev)(" ".join(cmd_args))
+ stderr = subprocess.DEVNULL if stderr_to_devnull else None
try:
- resp = subprocess.check_output(cmd_args)
+ resp = subprocess.check_output(cmd_args, stderr=stderr)
except subprocess.CalledProcessError as e:
raise EtherCATCommandException(str(e))
@@ -40,10 +43,12 @@ def _ethercat(self, *args, log_lev="debug", dry_run=False):
_device_location_re = re.compile(r"=== Master ([0-9]), Slave ([0-9]+) ")
- def scan_bus(self, bus=None):
+ def scan_bus(self, bus=None, **kwargs):
bus = self.default_bus if bus is None else bus
devices = list()
- output = self._ethercat("slaves", f"--master={bus}", "--verbose")
+ output = self._ethercat(
+ "slaves", f"--master={bus}", "--verbose", **kwargs
+ )
for line in output:
line = line.strip()
if line.startswith("==="):
@@ -84,7 +89,9 @@ def master_nic(self, bus=None):
else:
return None
- def upload(self, address=None, index=None, subindex=0, datatype=None):
+ def upload(
+ self, address=None, index=None, subindex=0, datatype=None, **kwargs
+ ):
index = self.data_type_class.uint16(index)
subindex = self.data_type_class.uint16(subindex)
output = self._ethercat(
@@ -94,11 +101,14 @@ def upload(self, address=None, index=None, subindex=0, datatype=None):
f"0x{index:04X}",
f"0x{subindex:02X}",
f"--type={datatype.igh_type}",
+ **kwargs,
)
- # FIXME Handle non-int types
- val_hex, val = output[0].split(" ", 1)
- val = int(val, 10)
- return val
+ if datatype.shared_name == "str":
+ return output[0]
+ else:
+ val_hex, val = output[0].split(" ", 1)
+ val = int(val, 10)
+ return val
def download(
self,
@@ -107,18 +117,19 @@ def download(
subindex=0,
value=None,
datatype=None,
- dry_run=False,
+ **kwargs,
):
self._ethercat(
"download",
f"--master={address[0]}",
f"--position={address[1]}",
+ f"--type={datatype.igh_type}",
+ "--",
f"0x{index:04X}",
f"0x{subindex:02X}",
str(value),
- f"--type={datatype.igh_type}",
log_lev="info",
- dry_run=dry_run,
+ **kwargs,
)
diff --git a/hw_device_mgr/lcec/data_types.py b/hw_device_mgr/lcec/data_types.py
index 48027e83..cf7de9a9 100644
--- a/hw_device_mgr/lcec/data_types.py
+++ b/hw_device_mgr/lcec/data_types.py
@@ -11,12 +11,18 @@ class LCECDataType(EtherCATDataType, HALDataType):
int8=dict(igh_type="int8"),
int16=dict(igh_type="int16"),
int32=dict(igh_type="int32"),
- int64=dict(igh_type="int64"),
uint8=dict(igh_type="uint8"),
uint16=dict(igh_type="uint16"),
uint32=dict(igh_type="uint32"),
- uint64=dict(igh_type="uint64"),
float=dict(igh_type="float"),
double=dict(igh_type="double"),
- # Strings not usable by `ethercat` tool
+ str=dict(igh_type="string"),
)
+ if HALDataType.have_64:
+ # Machinekit HAL has 64-bit int types, but not LCNC
+ subtype_data.update(
+ dict(
+ int64=dict(igh_type="int64"),
+ uint64=dict(igh_type="uint64"),
+ )
+ )
diff --git a/hw_device_mgr/lcec/tests/base_test_class.py b/hw_device_mgr/lcec/tests/base_test_class.py
index 0f343ab9..f81d61ef 100644
--- a/hw_device_mgr/lcec/tests/base_test_class.py
+++ b/hw_device_mgr/lcec/tests/base_test_class.py
@@ -45,8 +45,9 @@ def mock_ethercat_command(self):
commands. Patches `subprocess.check_output()`.
"""
- def emulate_ethercat_command(args):
+ def emulate_ethercat_command(args, **kwargs):
print(f'mocking command: {" ".join(args)}')
+ print(f" subprocess.check_output kwargs: {repr(kwargs)}")
# Parse out args, kwargs
assert args.pop(0) == "ethercat"
cmd = args.pop(0)
diff --git a/hw_device_mgr/lcec/tests/test_data_types.py b/hw_device_mgr/lcec/tests/test_data_types.py
index a0b9677a..b05e6357 100644
--- a/hw_device_mgr/lcec/tests/test_data_types.py
+++ b/hw_device_mgr/lcec/tests/test_data_types.py
@@ -25,6 +25,11 @@ class TestLCECDataType(
def test_igh_type_attr(self):
for shared_name in self.defined_shared_types:
+ if shared_name not in self.data_type_class.subtype_data:
+ # LinuxCNC HAL doesn't have 64-bit int types
+ assert shared_name.endswith("64")
+ print(f"Skipping 64-bit int type {shared_name}")
+ continue
cls = self.data_type_class.by_shared_name(shared_name)
print("cls:", cls)
assert hasattr(cls, "igh_type")
diff --git a/hw_device_mgr/lcec/tests/test_device.py b/hw_device_mgr/lcec/tests/test_device.py
index 51cb78ca..0d613301 100644
--- a/hw_device_mgr/lcec/tests/test_device.py
+++ b/hw_device_mgr/lcec/tests/test_device.py
@@ -19,7 +19,7 @@ class TestLCECDevice(BaseLCECTestClass, _TestEtherCATDevice, _TestHALDevice):
]
@pytest.fixture
- def obj(self, device_cls, sim_device_data, sdo_data, mock_halcomp):
+ def obj(self, sim_device_data, mock_halcomp):
self.obj = self.device_model_cls(
address=sim_device_data["test_address"]
)
diff --git a/hw_device_mgr/mgr/mgr.py b/hw_device_mgr/mgr/mgr.py
index 5d30c4ad..93e5c498 100644
--- a/hw_device_mgr/mgr/mgr.py
+++ b/hw_device_mgr/mgr/mgr.py
@@ -15,6 +15,7 @@ class HWDeviceMgr(FysomGlobalMixin, Device):
data_type_class = CiA402Device.data_type_class
device_base_class = CiA402Device
device_classes = None
+ slug_separator = ""
@classmethod
def device_model_id(cls):
@@ -89,7 +90,7 @@ def scan_devices(cls, **kwargs):
def init_device_instances(self, **kwargs):
for i, dev in enumerate(self.devices):
- dev.init(index=i, **kwargs)
+ dev.init(**kwargs)
self.logger.info(f"Adding device #{i}: {dev}")
####################################################
@@ -402,7 +403,7 @@ def fsm_finalize_command(self, e):
####################################################
# Execution
- def run(self):
+ def run_loop(self):
"""Program main loop."""
update_period = 1.0 / self.mgr_config.get("update_rate", 10.0)
while not self.shutdown:
@@ -424,6 +425,21 @@ def run(self):
continue
time.sleep(update_period)
+ def run(self):
+ """Program main."""
+ try:
+ self.run_loop()
+ except KeyboardInterrupt:
+ self.logger.info("Exiting at keyboard interrupt")
+ return 0
+ except Exception:
+ self.logger.error("Exiting at unrecoverable exception:")
+ for line in traceback.format_exc().splitlines():
+ self.logger.error(line)
+ return 1
+ self.logger.info("Exiting")
+ return 0
+
def read_update_write(self):
"""
Read hardware, update controller, write hardware.
diff --git a/hw_device_mgr/tests/base_test_class.py b/hw_device_mgr/tests/base_test_class.py
index 5d0c7ebf..cc0fbca0 100644
--- a/hw_device_mgr/tests/base_test_class.py
+++ b/hw_device_mgr/tests/base_test_class.py
@@ -39,7 +39,9 @@ def test_category_class(cls, test_category):
assert dmc.name
if dmc.test_category == test_category:
return dmc
- raise ValueError(f"No device in test category class '{test_category}'")
+ raise ValueError(
+ f"{cls}: No device in test category class '{test_category}'"
+ )
@classmethod
def munge_sim_device_data(cls, sim_device_data):
diff --git a/hw_device_mgr/tests/test_device.py b/hw_device_mgr/tests/test_device.py
index a2236bd6..ca7afae9 100644
--- a/hw_device_mgr/tests/test_device.py
+++ b/hw_device_mgr/tests/test_device.py
@@ -122,7 +122,7 @@ def obj(self, device_cls, sim_device_data):
yield self.obj
def test_init(self, obj):
- assert hasattr(obj, "index")
+ pass # Base class init() method does nothing
def test_set_sim_feedback(self, obj):
res = obj.set_sim_feedback()
@@ -352,7 +352,7 @@ def test_read_update_write(self, obj, fpath):
def test_dot(self, tmp_path):
# Test class diagram
- gv_file = tmp_path / ".." / f"{self.device_class.category}.gv"
+ gv_file = tmp_path / f"{self.device_class.category}.gv"
assert not gv_file.exists()
with gv_file.open("w") as f:
f.write(self.device_class.dot())