diff --git a/hw_device_mgr/cia_301/command.py b/hw_device_mgr/cia_301/command.py
index 8dfddaa1..3c702e08 100644
--- a/hw_device_mgr/cia_301/command.py
+++ b/hw_device_mgr/cia_301/command.py
@@ -25,7 +25,9 @@ def scan_bus(self, bus=0):
"""Scan bus, returning list of addresses and IDs for each device."""
@abc.abstractmethod
- def upload(self, address=None, index=None, subindex=0, datatype=None):
+ def upload(
+ self, address=None, index=None, subindex=0, datatype=None, **kwargs
+ ):
"""Upload a value from a device SDO."""
@abc.abstractmethod
@@ -36,6 +38,7 @@ def download(
subindex=0,
value=None,
datatype=None,
+ **kwargs,
):
"""Download a value to a device SDO."""
diff --git a/hw_device_mgr/cia_301/config.py b/hw_device_mgr/cia_301/config.py
index 7033b030..a18e73b4 100644
--- a/hw_device_mgr/cia_301/config.py
+++ b/hw_device_mgr/cia_301/config.py
@@ -1,6 +1,7 @@
from .data_types import CiA301DataType
-from .command import CiA301Command, CiA301SimCommand
+from .command import CiA301Command, CiA301SimCommand, CiA301CommandException
from .sdo import CiA301SDO
+from functools import cached_property
class CiA301Config:
@@ -98,17 +99,33 @@ def sdo_ix(cls, ix):
ix = (dtc.uint16(ix[0]), dtc.uint8(ix[1]))
return ix
+ @cached_property
+ def sdos(self):
+ assert self.model_id in self._model_sdos
+ return self._model_sdos[self.model_id].values()
+
def sdo(self, ix):
if isinstance(ix, self.sdo_class):
return ix
ix = self.sdo_ix(ix)
return self._model_sdos[self.model_id][ix]
+ def dump_param_values(self):
+ res = dict()
+ for sdo in self.sdos:
+ try:
+ res[sdo] = self.upload(sdo, stderr_to_devnull=True)
+ except CiA301CommandException as e:
+ # Objects may not exist, like variable length PDO mappings
+ self.logger.debug(f"Upload {sdo} failed: {e}")
+ pass
+ return res
+
#
# Param read/write
#
- def upload(self, sdo):
+ def upload(self, sdo, **kwargs):
# Get SDO object
sdo = self.sdo(sdo)
res_raw = self.command().upload(
@@ -116,10 +133,11 @@ def upload(self, sdo):
index=sdo.index,
subindex=sdo.subindex,
datatype=sdo.data_type,
+ **kwargs,
)
return sdo.data_type(res_raw)
- def download(self, sdo, val, dry_run=False):
+ def download(self, sdo, val, dry_run=False, **kwargs):
# Get SDO object
sdo = self.sdo(sdo)
# Check before setting value to avoid unnecessary NVRAM writes
@@ -128,6 +146,7 @@ def download(self, sdo, val, dry_run=False):
index=sdo.index,
subindex=sdo.subindex,
datatype=sdo.data_type,
+ **kwargs,
)
if sdo.data_type(res_raw) == val:
return # SDO value already correct
@@ -140,6 +159,7 @@ def download(self, sdo, val, dry_run=False):
subindex=sdo.subindex,
value=val,
datatype=sdo.data_type,
+ **kwargs,
)
#
@@ -187,24 +207,21 @@ def set_device_config(cls, config):
cls._device_config.clear()
cls._device_config.extend(config)
- def munge_config(self, config_raw):
+ @classmethod
+ def munge_config(cls, config_raw, position):
+ config_cooked = config_raw.copy()
+ # Convert model ID ints
+ model_id = (config_raw["vendor_id"], config_raw["product_code"])
+ model_id = cls.format_model_id(model_id)
+ config_cooked["vendor_id"], config_cooked["product_code"] = model_id
# Flatten out param_values key
- pv = dict()
+ config_cooked["param_values"] = dict()
for ix, val in config_raw.get("param_values", dict()).items():
- ix = self.sdo_class.parse_idx_str(ix)
+ ix = cls.sdo_class.parse_idx_str(ix)
if isinstance(val, list):
- pos_ix = config_raw["positions"].index(self.position)
+ pos_ix = config_raw["positions"].index(position)
val = val[pos_ix]
- pv[ix] = val
- dtc = self.data_type_class
- config_raw["vendor_id"] = dtc.uint32(config_raw["vendor_id"])
- config_raw["product_code"] = dtc.uint32(config_raw["product_code"])
- config_cooked = dict(
- vendor_id=config_raw["vendor_id"],
- product_code=config_raw["product_code"],
- param_values=pv,
- sync_manager=config_raw.get("sync_manager", dict()),
- )
+ config_cooked["param_values"][ix] = val
# Return pruned config dict
return config_cooked
@@ -225,7 +242,7 @@ def config(self):
else:
raise KeyError(f"No config for device at {self.address}")
# Prune & cache config
- self._config = self.munge_config(conf)
+ self._config = self.munge_config(conf, self.position)
# Return cached config
return self._config
diff --git a/hw_device_mgr/cia_301/tests/base_test_class.py b/hw_device_mgr/cia_301/tests/base_test_class.py
index fec64cdc..936ba1ef 100644
--- a/hw_device_mgr/cia_301/tests/base_test_class.py
+++ b/hw_device_mgr/cia_301/tests/base_test_class.py
@@ -20,10 +20,12 @@ class BaseCiA301TestClass(BaseTestClass):
#
# The device configuration, as in a real system
- device_config_yaml = "cia_301/tests/device_config.yaml"
+ device_config_package = "hw_device_mgr.cia_301.tests"
+ device_config_yaml = "device_config.yaml"
# Device model SDOs; for test fixture
- device_sdos_yaml = "cia_301/tests/bogus_devices/sim_sdo_data.yaml"
+ device_sdos_package = "hw_device_mgr.cia_301.tests"
+ device_sdos_yaml = "sim_sdo_data.yaml"
# Classes under test in this module
data_type_class = CiA301DataType
@@ -42,18 +44,27 @@ class BaseCiA301TestClass(BaseTestClass):
@classmethod
def init_sim(cls, **kwargs):
+ """Create sim device objects with configured SDOs."""
if cls.pass_init_sim_device_sdos:
# Init sim SDO data
- path, sdo_data = cls.load_yaml(cls.device_sdos_yaml, True)
- print(f" Raw sdo_data from {path}")
+ sdo_data = cls.load_sdo_data()
+ print(f" Raw sdo_data from {cls.sdo_data_resource()}")
kwargs["sdo_data"] = cls.munge_sdo_data(sdo_data)
# Init sim device data
super().init_sim(**kwargs)
@classmethod
def munge_device_config(cls, device_config):
- # Make device_config.yaml reusable by monkey-patching device vendor_id
- # and product_code keys based on test_category key
+ """
+ Munge raw device config.
+
+ Return a copy of `device_config` with minor processing.
+
+ Optionally, to make the YAML file reusable, each configuration's
+ `vendor_id` and `product_code` keys may be replaced with a `category`
+ key matching a parent of classes listed; this fixture will re-add those
+ keys.
+ """
new_device_config = list()
for conf in device_config:
device_cls = cls.test_category_class(conf["test_category"])
@@ -84,30 +95,34 @@ def config_cls(self, device_cls, device_config):
def command_cls(self, device_cls):
yield self.command_class
+ @classmethod
+ def load_device_config(cls):
+ """
+ Load device configuration from package resource.
+
+ The `importlib.resources` resource is named by
+ `device_config_package` and `device_config_yaml` attrs.
+ """
+ rsrc = (cls.device_config_package, cls.device_config_yaml)
+ dev_conf = cls.load_yaml_resource(*rsrc)
+ assert dev_conf, f"Empty device config in package resource {rsrc}"
+ print(f" Raw device_config from {rsrc}")
+ return dev_conf
+
@pytest.fixture
def device_config(self):
"""
Device configuration data fixture.
- Load device configuration from file named in
- `device_config_yaml` attr.
+ Load device configuration with `load_device_config()` and munge with
+ `mung_device_config()`.
Device configuration in the same format as non-test
configuration, described in `Config` classes.
-
- The absolute path is stored in the test object
- `device_config_path` attribute.
-
- Optionally, to make the YAML file reusable, each
- configuration's `vendor_id` and `product_code` keys may be
- replaced with a `category` key matching a parent of classes
- listed; this fixture will re-add those keys.
"""
- path, dev_conf = self.load_yaml(self.device_config_yaml, True)
- print(f" Raw device_config from {path}")
- dev_conf = self.munge_device_config(dev_conf)
+ conf_raw = self.load_device_config()
+ dev_conf = self.munge_device_config(conf_raw)
self.device_config = dev_conf
- self.device_config_path = path
yield dev_conf
@pytest.fixture
@@ -217,6 +232,17 @@ def munge_sim_device_data(cls, sim_device_data):
dev["test_address"] = (dev["bus"], dev["position"])
return sim_device_data
+ @classmethod
+ def sdo_data_resource(cls):
+ return (cls.device_sdos_package, cls.device_sdos_yaml)
+
+ @classmethod
+ def load_sdo_data(cls):
+ rsrc = cls.sdo_data_resource()
+ sdo_data = cls.load_yaml_resource(*rsrc)
+ assert sdo_data, f"Empty SDO data in package resource {rsrc}"
+ return sdo_data
+
def pytest_generate_tests(self, metafunc):
# Dynamic parametrization from sim_device_data_yaml:
# - _sim_device_data: iterate through `sim_device_data` list
@@ -224,10 +250,8 @@ def pytest_generate_tests(self, metafunc):
# - _sdo_data: iterate through `sdo_data` values
# - bus: iterate through `sim_device_data` unique `bus` values
# *Note all three cases are mutually exclusive
- path, dev_data = self.load_yaml(self.sim_device_data_yaml, True)
- dev_data = self.munge_sim_device_data(dev_data)
- path, sdo_data = self.load_yaml(self.device_sdos_yaml, True)
- sdo_data = self.munge_sdo_data(sdo_data, conv_sdos=True)
+ dev_data = self.munge_sim_device_data(self.load_sim_device_data())
+ sdo_data = self.munge_sdo_data(self.load_sdo_data(), conv_sdos=True)
names = list()
vals, ids = (list(), list())
if "_sim_device_data" in metafunc.fixturenames:
diff --git a/hw_device_mgr/cia_301/tests/bogus_devices/sim_sdo_data.yaml b/hw_device_mgr/cia_301/tests/sim_sdo_data.yaml
similarity index 100%
rename from hw_device_mgr/cia_301/tests/bogus_devices/sim_sdo_data.yaml
rename to hw_device_mgr/cia_301/tests/sim_sdo_data.yaml
diff --git a/hw_device_mgr/cia_301/tests/test_device.py b/hw_device_mgr/cia_301/tests/test_device.py
index 09c6c692..3bcd583d 100644
--- a/hw_device_mgr/cia_301/tests/test_device.py
+++ b/hw_device_mgr/cia_301/tests/test_device.py
@@ -10,8 +10,8 @@ class TestCiA301Device(BaseCiA301TestClass, _TestDevice):
*_TestDevice.expected_mro,
]
- # Test CiA NMT init: online & operational status
- read_update_write_yaml = "cia_301/tests/read_update_write.cases.yaml"
+ # CiA NMT init online & operational status test cases
+ read_update_write_package = "hw_device_mgr.cia_301.tests"
@pytest.fixture
def obj(self, device_cls, sim_device_data):
diff --git a/hw_device_mgr/cia_402/device.py b/hw_device_mgr/cia_402/device.py
index bbdd2c6f..8ccf35f4 100644
--- a/hw_device_mgr/cia_402/device.py
+++ b/hw_device_mgr/cia_402/device.py
@@ -173,10 +173,10 @@ def get_feedback(self):
goal_reasons.append(f"state flag {flag_name} != {not flag_val}")
if not goal_reached:
- fb_out.update(
- goal_reached=False, goal_reason="; ".join(goal_reasons)
- )
- self.logger.debug(f"Device {self.address}: Goal not reached:")
+ goal_reason = "; ".join(goal_reasons)
+ fb_out.update(goal_reached=False, goal_reason=goal_reason)
+ if fb_out.changed("goal_reason"):
+ self.logger.debug(f"{self}: Goal not reached: {goal_reason}")
return fb_out
state_bits = {
@@ -492,13 +492,13 @@ def set_sim_feedback(self):
# Log changes
if self.sim_feedback.changed("control_mode_fb"):
cm = self.sim_feedback.get("control_mode_fb")
- self.logger.info(f"{self} next control_mode_fb: 0x{cm:04X}")
+ self.logger.info(f"{self} sim control_mode_fb: 0x{cm:04X}")
if self.sim_feedback.changed("status_word"):
sw = self.sim_feedback.get("status_word")
flags = ",".join(k for k, v in sw_flags.items() if v)
flags = f" flags: {flags}" if flags else ""
self.logger.info(
- f"{self} sim next status_word: 0x{sw:04X} {state} {flags}"
+ f"{self} sim status_word: 0x{sw:04X} {state} {flags}"
)
return sfb
diff --git a/hw_device_mgr/cia_402/tests/base_test_class.py b/hw_device_mgr/cia_402/tests/base_test_class.py
index e547c5df..d3b049e7 100644
--- a/hw_device_mgr/cia_402/tests/base_test_class.py
+++ b/hw_device_mgr/cia_402/tests/base_test_class.py
@@ -10,7 +10,8 @@
class BaseCiA402TestClass(BaseCiA301TestClass):
# test_read_update_write_402() configuration
- read_update_write_402_yaml = "cia_402/tests/read_update_write.cases.yaml"
+ read_update_write_402_package = "hw_device_mgr.cia_402.tests"
+ read_update_write_402_yaml = "read_update_write.cases.yaml"
# Classes under test in this module
device_class = BogusCiA402Device
diff --git a/hw_device_mgr/cia_402/tests/test_device.py b/hw_device_mgr/cia_402/tests/test_device.py
index 698627cb..d69d5cb2 100644
--- a/hw_device_mgr/cia_402/tests/test_device.py
+++ b/hw_device_mgr/cia_402/tests/test_device.py
@@ -29,11 +29,12 @@ def read_update_write_conv_test_data(self):
if key in intf_data:
intf_data[key] = uint16(intf_data[key])
- def test_read_update_write(self, obj, fpath):
+ def test_read_update_write(self, obj):
if hasattr(obj, "MODE_CSP"):
# CiA 402 device
+ self.read_update_write_package = self.read_update_write_402_package
self.read_update_write_yaml = self.read_update_write_402_yaml
self.is_402_device = True
else:
self.is_402_device = False
- super().test_read_update_write(obj, fpath)
+ super().test_read_update_write(obj)
diff --git a/hw_device_mgr/config_io.py b/hw_device_mgr/config_io.py
new file mode 100644
index 00000000..03c20b13
--- /dev/null
+++ b/hw_device_mgr/config_io.py
@@ -0,0 +1,39 @@
+import ruamel.yaml
+from pathlib import Path
+from importlib.resources import open_binary
+
+
+class ConfigIO:
+ @classmethod
+ def open_path(cls, path, *args, **kwargs):
+ """Return open file object for `path`."""
+ path_obj = Path(path)
+ return path_obj.open(*args, **kwargs)
+
+ @classmethod
+ def open_resource(cls, package, resource):
+ """Return open file object for importlib package resource."""
+ return open_binary(package, resource)
+
+ @classmethod
+ def load_yaml_path(cls, path):
+ """Read and return `data` from YAML formatted file `path`."""
+ yaml = ruamel.yaml.YAML()
+ with cls.open_path(path) as f:
+ data = yaml.load(f)
+ return data
+
+ @classmethod
+ def dump_yaml_path(cls, path, data):
+ """Dump `data` in YAML format to `path`."""
+ yaml = ruamel.yaml.YAML()
+ with cls.open_path(path, "w") as f:
+ yaml.dump(data, f)
+
+ @classmethod
+ def load_yaml_resource(cls, package, resource):
+ """Load YAML from importlib package resource."""
+ yaml = ruamel.yaml.YAML()
+ with cls.open_resource(package, resource) as f:
+ data = yaml.load(f)
+ return data
diff --git a/hw_device_mgr/device.py b/hw_device_mgr/device.py
index e1a2a98f..2f9ed336 100644
--- a/hw_device_mgr/device.py
+++ b/hw_device_mgr/device.py
@@ -1,8 +1,9 @@
import abc
-from importlib.resources import path as imp_path
from .logging import Logging
from .interface import Interface
from .data_types import DataType
+from functools import cached_property
+import re
class Device(abc.ABC):
@@ -36,7 +37,7 @@ def __init__(self, address=None):
self.address = address
self.init_interfaces()
- def init(self, index=None):
+ def init(self):
"""
Initialize device.
@@ -44,7 +45,7 @@ def init(self, index=None):
outside the constructor. Implementations should always call
`super().init()`.
"""
- self.index = index
+ pass
@classmethod
def merge_dict_attrs(cls, attr):
@@ -62,6 +63,21 @@ def merge_dict_attrs(cls, attr):
res.update(c_attr)
return res
+ slug_separator = "."
+
+ @cached_property
+ def addr_slug(self):
+ """
+ Return a slug generated from the device address.
+
+ The slug is computed by separating numeric components of the
+ device `address` string with the `slug_separator` character,
+ default `.`, e.g. `(0,5)` -> `0.5`. This is intended to be
+ useful for inclusion into identifiers.
+ """
+ addr_prefix = re.sub(r"[^0-9]+", self.slug_separator, str(self.address))
+ return addr_prefix.strip(self.slug_separator)
+
def init_interfaces(self):
intfs = self._interfaces = dict()
dt_name2cls = self.data_type_class.by_shared_name
@@ -95,6 +111,7 @@ def interface_changed(self, what, key, return_vals=False):
def read(self):
"""Read `feedback_in` from hardware interface."""
+ self._interfaces["feedback_in"].set()
def get_feedback(self):
"""Process `feedback_in` and return `feedback_out` interface."""
@@ -114,14 +131,6 @@ def write(self):
def log_status(self):
pass
- @classmethod
- def pkg_path(cls, path):
- """Return `pathlib.Path` object for this package's directory."""
- # Find cls's module & package
- pkg = ".".join(cls.__module__.split(".")[:-1])
- with imp_path(pkg, path) as p:
- return p
-
def __str__(self):
return f"<{self.name}@{self.address}>"
@@ -338,7 +347,12 @@ def sim_device_data_address(cls, sim_device_data):
@classmethod
def init_sim(cls, *, sim_device_data):
- """Massage device test data for usability."""
+ """
+ Create sim device objects for tests.
+
+ Construct sim device objects with device class, address, etc.
+ from `sim_device_data`.
+ """
cls_sim_data = cls._sim_device_data[cls.category] = dict()
for dev in sim_device_data:
@@ -369,7 +383,7 @@ def read(self):
"""Read `feedback_in` from hardware interface."""
super().read()
sfb = self._interfaces["sim_feedback"].get()
- self._interfaces["feedback_in"].set(**sfb)
+ self._interfaces["feedback_in"].update(**sfb)
def set_sim_feedback(self):
"""Simulate feedback from command and feedback."""
diff --git a/hw_device_mgr/devices/bogus.py b/hw_device_mgr/devices/bogus.py
index fe25f20b..e88738a4 100644
--- a/hw_device_mgr/devices/bogus.py
+++ b/hw_device_mgr/devices/bogus.py
@@ -7,6 +7,7 @@ class BogusDevice(EtherCATSimDevice, CiA402SimDevice):
category = "bogus_servo"
vendor_id = 0xB090C0
+ xml_description_package = "hw_device_mgr.devices.device_xml"
xml_description_fname = "BogusServo.xml"
diff --git a/hw_device_mgr/devices/device_xml/SV660_EOE_1Axis_V9.12.xml b/hw_device_mgr/devices/device_xml/SV660_EOE_1Axis_V9.12.xml
index 631a7da5..648b9f3f 100644
--- a/hw_device_mgr/devices/device_xml/SV660_EOE_1Axis_V9.12.xml
+++ b/hw_device_mgr/devices/device_xml/SV660_EOE_1Axis_V9.12.xml
@@ -2093,6 +2093,28 @@
rw
+
+
+ 6
+ Stop mode at S-ON OFF
+ UINT
+ 16
+ 96
+
+ rw
+
+
+
+
+ 7
+ Stop mode at No. 2 fault
+ INT
+ 16
+ 112
+
+ rw
+
+
8
Stop mode at limit switch signal
@@ -2541,6 +2563,17 @@
rw
+
+
+ 24
+ EtherCAT forced DO output logic in non-OP status
+ UINT
+ 16
+ 112
+
+ rw
+
+
DT2005
@@ -2585,6 +2618,28 @@
rw
+
+
+ 8
+ Numerator of electronic gear ratio
+ UDINT
+ 32
+ 128
+
+ rw
+
+
+
+
+ 10
+ Denominator of electronic gear ratio
+ UDINT
+ 32
+ 160
+
+ rw
+
+
20
Speed feedforward control selection
@@ -4841,6 +4896,17 @@
rw
+
+
+ 3
+ Offline inertia autotuning selection
+ UINT
+ 16
+ 48
+
+ rw
+
+
5
Encoder ROM reading/writing
@@ -6923,6 +6989,24 @@
0
+
+
+ Stop mode at S-ON OFF
+
+ -3
+ 1
+ 0
+
+
+
+
+ Stop mode at No. 2 fault
+
+ -5
+ 3
+ 2
+
+
Stop mode at limit switch signal
@@ -7295,6 +7379,15 @@
0
+
+
+ EtherCAT forced DO output logic in non-OP status
+
+ 0
+ 7
+ 1
+
+
ro
@@ -7337,6 +7430,24 @@
0
+
+
+ Numerator of electronic gear ratio
+
+ 0
+ 4294967295
+ 1
+
+
+
+
+ Denominator of electronic gear ratio
+
+ 0
+ 4294967295
+ 1
+
+
Speed feedforward control selection
@@ -9059,6 +9170,15 @@
0
+
+
+ Offline inertia autotuning selection
+
+ 0
+ 1
+ 0
+
+
Encoder ROM reading/writing
diff --git a/hw_device_mgr/devices/elmo_gold.py b/hw_device_mgr/devices/elmo_gold.py
index c70a4bc8..98097442 100644
--- a/hw_device_mgr/devices/elmo_gold.py
+++ b/hw_device_mgr/devices/elmo_gold.py
@@ -6,6 +6,7 @@ class ElmoGold(EtherCATDevice, CiA402Device):
"""Base class for Elmo Gold EtherCAT Family Devices."""
vendor_id = 0x0000009A
+ xml_description_package = "hw_device_mgr.devices.device_xml"
# FIXME The original ESI has models that differ only by revision,
# but the ESI parser doesn't support this yet
# xml_description_fname = "Elmo_ECAT_00010420_V11.xml"
diff --git a/hw_device_mgr/devices/inovance_is620n.py b/hw_device_mgr/devices/inovance_is620n.py
index 70191481..6e310e05 100644
--- a/hw_device_mgr/devices/inovance_is620n.py
+++ b/hw_device_mgr/devices/inovance_is620n.py
@@ -7,6 +7,7 @@ class InovanceIS620N(EtherCATDevice, CiA402Device):
vendor_id = 0x00100000
product_code = 0x000C0108
+ xml_description_package = "hw_device_mgr.devices.device_xml"
xml_description_fname = "IS620N_v2.6.7.xml"
def set_params_volatile(self, nv=False):
diff --git a/hw_device_mgr/devices/inovance_sv660.py b/hw_device_mgr/devices/inovance_sv660.py
index 08b06ff5..95089a74 100644
--- a/hw_device_mgr/devices/inovance_sv660.py
+++ b/hw_device_mgr/devices/inovance_sv660.py
@@ -7,6 +7,7 @@ class InovanceSV660(EtherCATDevice, CiA402Device):
vendor_id = 0x00100000
product_code = 0x000C010D
+ xml_description_package = "hw_device_mgr.devices.device_xml"
xml_description_fname = "SV660_EOE_1Axis_V9.12.xml"
def set_params_volatile(self, nv=False):
diff --git a/hw_device_mgr/devices/itegva_e7x.py b/hw_device_mgr/devices/itegva_e7x.py
index d7e5bb2a..f93823d7 100644
--- a/hw_device_mgr/devices/itegva_e7x.py
+++ b/hw_device_mgr/devices/itegva_e7x.py
@@ -5,6 +5,7 @@ class ITegvaE7xDevice(EtherCATDevice):
"""Base class for iTegva E7x series IO modules."""
vendor_id = 0x00000A09
+ xml_description_package = "hw_device_mgr.devices.device_xml"
xml_description_fname = "iTegva_E7x_Series.xml"
diff --git a/hw_device_mgr/devices/tests/base_test_class.py b/hw_device_mgr/devices/tests/base_test_class.py
index eeba5ddc..75223d8b 100644
--- a/hw_device_mgr/devices/tests/base_test_class.py
+++ b/hw_device_mgr/devices/tests/base_test_class.py
@@ -21,6 +21,6 @@ class BaseDevicesTestClass(BaseLCECTestClass):
EVEXCREForTest,
)
- device_config_yaml = "devices/tests/device_config.yaml"
- sim_device_data_yaml = "devices/tests/sim_devices.yaml"
- device_sdos_yaml = "devices/tests/sim_sdo_data.yaml"
+ device_config_package = "hw_device_mgr.devices.tests"
+ sim_device_data_package = "hw_device_mgr.devices.tests"
+ device_sdos_package = "hw_device_mgr.devices.tests"
diff --git a/hw_device_mgr/devices/tests/device_xml b/hw_device_mgr/devices/tests/device_xml
deleted file mode 120000
index c58533e3..00000000
--- a/hw_device_mgr/devices/tests/device_xml
+++ /dev/null
@@ -1 +0,0 @@
-../device_xml
\ No newline at end of file
diff --git a/hw_device_mgr/errors/device.py b/hw_device_mgr/errors/device.py
index 0dcfc544..69918069 100644
--- a/hw_device_mgr/errors/device.py
+++ b/hw_device_mgr/errors/device.py
@@ -1,9 +1,9 @@
from ..device import Device, SimDevice
from ..data_types import DataType
-import ruamel.yaml
+from ..config_io import ConfigIO
-class ErrorDevice(Device):
+class ErrorDevice(Device, ConfigIO):
"""
Abstract class representing a device error code handling.
@@ -14,57 +14,63 @@ class ErrorDevice(Device):
strings to feedback.
"""
- device_error_dir = "device_err"
+ device_error_package = None
+ device_error_yaml = None
- feedback_data_types = dict(error_code="uint32")
- feedback_defaults = dict(
+ feedback_in_data_types = dict(error_code="uint32")
+ feedback_in_defaults = dict(error_code=0)
+
+ feedback_out_defaults = dict(
error_code=0, description="No error", advice="No error"
)
- no_error = feedback_defaults
+
+ no_error = feedback_out_defaults
data_type_class = DataType
_error_descriptions = dict()
- @classmethod
- def error_descriptions_yaml(cls):
- return cls.pkg_path(cls.device_error_dir) / f"{cls.name}.yaml"
-
@classmethod
def error_descriptions(cls):
"""
Return dictionary of error code data.
- Data is read from YAML file `{device_error_dir}/{name}.yaml` and
- cached.
+ Data is read from YAML resource from package
+ `device_error_package`, name `device_error_yaml`.
"""
if cls.name not in cls._error_descriptions:
errs = cls._error_descriptions[cls.name] = dict()
- path = cls.error_descriptions_yaml()
- if path.exists():
- yaml = ruamel.yaml.YAML()
- with path.open() as f:
- err_yaml = yaml.load(f)
+ if cls.device_error_yaml:
+ err_yaml = cls.load_yaml_resource(
+ cls.device_error_package, cls.device_error_yaml
+ )
for err_code_str, err_data in err_yaml.items():
errs[int(err_code_str, 0)] = err_data
return cls._error_descriptions[cls.name]
- def set_feedback(self, error_code=0, **kwargs):
- super().set_feedback(**kwargs)
+ def get_feedback(self):
+ fb_out = super().get_feedback()
+ error_code = self.feedback_in.get("error_code")
if not error_code:
- self.feedback.update(**self.no_error)
- return
+ self.feedback_out.update(**self.no_error)
+ return fb_out
error_info = self.error_descriptions().get(error_code, None)
if error_info is None:
- self.feedback.update(
+ fb_out.update(
description=f"Unknown error code {error_code}",
advice="Please consult with hardware vendor",
error_code=error_code,
)
- return
-
- self.feedback.update(error_code=error_code, **error_info)
+ return fb_out
+ else:
+ fb_out.update(error_code=error_code, **error_info)
+ if fb_out.changed("error_code"):
+ msg = "error code {}: {}".format(
+ error_code, fb_out.get("description")
+ )
+ self.logger.error(msg)
+ return fb_out
class ErrorSimDevice(ErrorDevice, SimDevice):
diff --git a/hw_device_mgr/errors/tests/bogus_devices/device.py b/hw_device_mgr/errors/tests/bogus_devices/device.py
index 59432342..1380e477 100644
--- a/hw_device_mgr/errors/tests/bogus_devices/device.py
+++ b/hw_device_mgr/errors/tests/bogus_devices/device.py
@@ -3,6 +3,7 @@
class BogusErrorDevice(ErrorSimDevice):
category = "bogus_error_devices"
+ device_error_package = "hw_device_mgr.errors.tests.bogus_devices.device_err"
@classmethod
def scan_devices(cls, **kwargs):
@@ -13,15 +14,18 @@ class BogusErrorV1Servo(BogusErrorDevice):
name = "bogus_v1_error_servo"
test_category = "bogus_v1_servo"
model_id = 0xB0905041
+ device_error_yaml = "bogus_v1_v2_error_servo.yaml"
class BogusErrorV2Servo(BogusErrorDevice):
name = "bogus_v2_error_servo"
test_category = "bogus_v2_servo"
model_id = 0xB0905042
+ device_error_yaml = "bogus_v1_v2_error_servo.yaml"
class BogusErrorV1IO(BogusErrorDevice):
name = "bogus_v1_error_io"
test_category = "bogus_v1_io"
model_id = 0xB0901041
+ device_error_yaml = "bogus_v1_error_io.yaml"
diff --git a/hw_device_mgr/errors/tests/bogus_devices/device_err/__init__.py b/hw_device_mgr/errors/tests/bogus_devices/device_err/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/hw_device_mgr/errors/tests/bogus_devices/device_err/bogus_v1_error_servo.yaml b/hw_device_mgr/errors/tests/bogus_devices/device_err/bogus_v1_v2_error_servo.yaml
similarity index 100%
rename from hw_device_mgr/errors/tests/bogus_devices/device_err/bogus_v1_error_servo.yaml
rename to hw_device_mgr/errors/tests/bogus_devices/device_err/bogus_v1_v2_error_servo.yaml
diff --git a/hw_device_mgr/errors/tests/bogus_devices/device_err/bogus_v2_error_servo.yaml b/hw_device_mgr/errors/tests/bogus_devices/device_err/bogus_v2_error_servo.yaml
deleted file mode 120000
index e9045116..00000000
--- a/hw_device_mgr/errors/tests/bogus_devices/device_err/bogus_v2_error_servo.yaml
+++ /dev/null
@@ -1 +0,0 @@
-bogus_v1_error_servo.yaml
\ No newline at end of file
diff --git a/hw_device_mgr/errors/tests/test_device.py b/hw_device_mgr/errors/tests/test_device.py
index 98e106da..d710e7ce 100644
--- a/hw_device_mgr/errors/tests/test_device.py
+++ b/hw_device_mgr/errors/tests/test_device.py
@@ -8,6 +8,7 @@ class TestErrorDevice(ErrorBaseTestClass, _TestDevice):
"ErrorSimDevice",
"ErrorDevice",
*_TestDevice.expected_mro,
+ "ConfigIO",
]
@pytest.fixture
@@ -19,7 +20,7 @@ def obj(self):
def test_error_descriptions(self):
for cls in self.device_model_classes:
print("cls:", cls)
- print("yaml:", cls.error_descriptions_yaml())
+ print("yaml:", cls.device_error_package, cls.device_error_yaml)
errs = cls.error_descriptions()
assert isinstance(errs, dict)
assert len(errs) > 0
diff --git a/hw_device_mgr/ethercat/config.py b/hw_device_mgr/ethercat/config.py
index df5d1898..9edf1b13 100644
--- a/hw_device_mgr/ethercat/config.py
+++ b/hw_device_mgr/ethercat/config.py
@@ -27,10 +27,15 @@ class EtherCATConfig(CiA301Config):
#
@classmethod
- def get_device_sdos_from_esi(cls, esi_path):
+ def get_device_sdos_from_esi(cls, package, fname):
"""Read in device configuration from ESI file at `esi_path`."""
esi_reader = cls.esi_reader_class()
- return esi_reader.add_device_descriptions(esi_path)
+ if package:
+ return esi_reader.add_device_descriptions_from_resource(
+ package, fname
+ )
+ else:
+ return esi_reader.add_device_descriptions_from_path(fname)
class EtherCATSimConfig(EtherCATConfig, CiA301SimConfig):
diff --git a/hw_device_mgr/ethercat/device.py b/hw_device_mgr/ethercat/device.py
index c9f26fe0..9734e043 100644
--- a/hw_device_mgr/ethercat/device.py
+++ b/hw_device_mgr/ethercat/device.py
@@ -18,7 +18,8 @@ class EtherCATDevice(CiA301Device, abc.ABC):
# Resource names for locating device description XML and error files
device_xml_dir = "device_xml"
- # Filename of XML description
+ # Package and filename of XML description resource
+ xml_description_package = None
xml_description_fname = None
# Swappable utility classes
@@ -45,28 +46,14 @@ def set_params_volatile(self, nv=False):
Concrete subclasses may optionally implement this
"""
- @classmethod
- def xml_description_path(cls):
- """
- Return path to device ESI file.
-
- Path is under the module directory,
- `{device_xml_dir}/{xml_description_fname}`.
- """
- path = cls.pkg_path(cls.device_xml_dir) / cls.xml_description_fname
- return path.resolve()
-
@classmethod
def read_device_sdos_from_esi(cls):
sdo_data = dict()
- dev_esi_paths = set()
for dev in cls.get_model():
- esi_path = dev.xml_description_path()
- if esi_path in dev_esi_paths:
- assert dev.device_model_id() in sdo_data
- continue
- dev_esi_paths.add(esi_path)
- dev_sdo_data = dev.config_class.get_device_sdos_from_esi(esi_path)
+ conf = dev.config_class
+ dev_sdo_data = conf.get_device_sdos_from_esi(
+ dev.xml_description_package, dev.xml_description_fname
+ )
sdo_data.update(dev_sdo_data)
return sdo_data
diff --git a/hw_device_mgr/ethercat/tests/base_test_class.py b/hw_device_mgr/ethercat/tests/base_test_class.py
index 2e18ff84..4b53faf9 100644
--- a/hw_device_mgr/ethercat/tests/base_test_class.py
+++ b/hw_device_mgr/ethercat/tests/base_test_class.py
@@ -8,7 +8,6 @@
BogusEtherCATServo,
BogusOtherCATServo,
BogusEtherCATIO,
- RelocatableESIDevice,
)
import re
import pytest
@@ -45,38 +44,38 @@ def model_id_clone_map(self):
@pytest.fixture
def device_xml(self, tmp_path):
- if not issubclass(self.device_class, RelocatableESIDevice):
- # Don't rewrite ESI files
- yield
- else:
- # Subclasses will have different product_code, so customize ESI file
- self.device_class.set_device_xml_dir(tmp_path)
- finished_paths = set()
- re_str = "|".join(rf"{pc[1]:08X}" for pc in self.sdo_model_id_clone)
- re_str = r"#x(" + re_str + r")"
- pat = re.compile(re_str)
- # Map of orig ESI file product code to new ESI file product code
- cm = {
- k[1]: v.device_model_id()[1]
- for k, v in self.model_id_clone_map.items()
- }
- for id_orig, cls in self.model_id_clone_map.items():
- esi_orig = cls.orig_xml_description_path()
- esi_new = cls.xml_description_path()
- if esi_orig in finished_paths:
- continue
- finished_paths.add(esi_orig)
- esi_new.parent.mkdir(exist_ok=True)
- with open(esi_orig) as f_orig:
- with open(esi_new, "w") as f_new:
- for line in f_orig:
- line = pat.sub(
- lambda m: f"#x{cm[int(m.group(1), 16)]}", line
- )
- f_new.write(line)
- print(f"Wrote ESI file to {esi_new}")
- print(f" Original in {esi_orig}")
- yield
+ # Subclasses will have different product_code, so customize ESI file
+ finished = set()
+ re_str = "|".join(rf"{pc[1]:08X}" for pc in self.sdo_model_id_clone)
+ re_str = r"#x(" + re_str + r")"
+ pat = re.compile(re_str)
+ # Map of orig ESI file product code to new ESI file product code
+ cm = {
+ k[1]: f"{v.device_model_id()[1]:08X}"
+ for k, v in self.model_id_clone_map.items()
+ }
+ for id_orig, cls in self.model_id_clone_map.items():
+ if not hasattr(cls, "alt_xml_description"):
+ print(f"Using original ESI file for device {cls.name}")
+ continue
+ esi_orig = (cls.xml_description_package, cls.xml_description_fname)
+ print(f"Model {cls.name} ESI resource: {esi_orig}")
+ esi_new = tmp_path / cls.xml_description_fname
+ cls.alt_xml_description = esi_new
+ if esi_new in finished:
+ print(f" Already written to {esi_new}")
+ continue # Only process each ESI file once
+ finished.add(esi_new)
+ print(f" Writing to {esi_new}")
+ with self.open_resource(*esi_orig) as f_orig:
+ with self.open_path(esi_new, "w") as f_new:
+ for line in f_orig:
+ line = line.decode()
+ line = pat.sub(
+ lambda m: f"#x{cm[int(m.group(1), 16)]}", line
+ )
+ f_new.write(line)
+ yield
@pytest.fixture
def extra_fixtures(self, device_xml):
diff --git a/hw_device_mgr/ethercat/tests/bogus_devices/device.py b/hw_device_mgr/ethercat/tests/bogus_devices/device.py
index 3ba32537..c8bfbfa7 100644
--- a/hw_device_mgr/ethercat/tests/bogus_devices/device.py
+++ b/hw_device_mgr/ethercat/tests/bogus_devices/device.py
@@ -5,6 +5,7 @@
class BogusEtherCATDevice(RelocatableESIDevice):
category = "bogus_ethercat_devices"
vendor_id = 0xB090C0
+ xml_description_package = "hw_device_mgr.devices.device_xml"
class BogusEtherCATServo(BogusEtherCATDevice, CiA402SimDevice):
diff --git a/hw_device_mgr/ethercat/tests/bogus_devices/device_xml/BogusIO.xml b/hw_device_mgr/ethercat/tests/bogus_devices/device_xml/BogusIO.xml
deleted file mode 120000
index f20a1a22..00000000
--- a/hw_device_mgr/ethercat/tests/bogus_devices/device_xml/BogusIO.xml
+++ /dev/null
@@ -1 +0,0 @@
-../../../../devices/device_xml/BogusIO.xml
\ No newline at end of file
diff --git a/hw_device_mgr/ethercat/tests/bogus_devices/device_xml/BogusServo.xml b/hw_device_mgr/ethercat/tests/bogus_devices/device_xml/BogusServo.xml
deleted file mode 120000
index 3c31ec11..00000000
--- a/hw_device_mgr/ethercat/tests/bogus_devices/device_xml/BogusServo.xml
+++ /dev/null
@@ -1 +0,0 @@
-../../../../devices/device_xml/BogusServo.xml
\ No newline at end of file
diff --git a/hw_device_mgr/ethercat/tests/relocatable_esi_device.py b/hw_device_mgr/ethercat/tests/relocatable_esi_device.py
index 3f2ca2ab..942a8ced 100644
--- a/hw_device_mgr/ethercat/tests/relocatable_esi_device.py
+++ b/hw_device_mgr/ethercat/tests/relocatable_esi_device.py
@@ -4,18 +4,15 @@
class RelocatableESIDevice(EtherCATSimDevice):
"""A class whose ESI description file can be moved (for tests)."""
- @classmethod
- def set_device_xml_dir(cls, path):
- # Tests generate customized ESI file in temp directory; provide a hook
- # to point fixtures to it
- cls.xml_base_dir = path
-
- @classmethod
- def xml_description_path(cls):
- if not hasattr(cls, "xml_base_dir"):
- return super().xml_description_path()
- return cls.xml_base_dir / cls.device_xml_dir / cls.xml_description_fname
+ alt_xml_description = None
@classmethod
- def orig_xml_description_path(cls):
- return super().xml_description_path()
+ def read_device_sdos_from_esi(cls):
+ sdo_data = dict()
+ for dev in cls.get_model():
+ conf = dev.config_class
+ dev_sdo_data = conf.get_device_sdos_from_esi(
+ None, dev.alt_xml_description
+ )
+ sdo_data.update(dev_sdo_data)
+ return sdo_data
diff --git a/hw_device_mgr/ethercat/tests/test_device.py b/hw_device_mgr/ethercat/tests/test_device.py
index 479b950a..3692bfee 100644
--- a/hw_device_mgr/ethercat/tests/test_device.py
+++ b/hw_device_mgr/ethercat/tests/test_device.py
@@ -13,6 +13,8 @@ class TestEtherCATDevice(BaseEtherCATTestClass, _TestCiA402Device):
def test_xml_description_path(self):
for cls in self.device_model_classes:
- esi_path = cls.xml_description_path()
- print(esi_path)
- assert esi_path.exists()
+ assert cls.xml_description_fname
+ if cls.xml_description_package is None:
+ assert "/" in cls.xml_description_fname
+ else:
+ assert "/" not in cls.xml_description_fname
diff --git a/hw_device_mgr/ethercat/xml_reader.py b/hw_device_mgr/ethercat/xml_reader.py
index 051734d4..2d8e7789 100644
--- a/hw_device_mgr/ethercat/xml_reader.py
+++ b/hw_device_mgr/ethercat/xml_reader.py
@@ -1,16 +1,19 @@
from .sdo import EtherCATSDO
+from ..config_io import ConfigIO
+from ..logging import Logging
from lxml import etree
from pprint import pprint
__all__ = ("EtherCATXMLReader",)
-class EtherCATXMLReader:
+class EtherCATXMLReader(ConfigIO):
"""Parse EtherCAT Slave Information "ESI" XML files."""
sdo_class = EtherCATSDO
_device_registry = dict()
- _fpath_registry = dict()
+
+ logger = Logging.getLogger(__name__)
@classmethod
def str_to_int(cls, s):
@@ -167,9 +170,7 @@ def vendor_xml(self):
#
vendors = self.tree.xpath("/EtherCATInfo/Vendor")
if len(vendors) != 1:
- raise RuntimeError(
- f"{len(vendors)} sections in {self.fpath}"
- )
+ raise RuntimeError(f"{len(vendors)} sections in XML")
return vendors[0]
@property
@@ -412,7 +413,7 @@ def read_objects(self, device):
ecat_type = self.data_type_class.by_name(type_name)
except KeyError as e:
print(self.data_type_class._name_re_registry)
- raise KeyError(f"Reading {self.fpath}: {str(e)}")
+ raise KeyError(f"Reading XML: {str(e)}")
self.safe_set(osub, "DataType", ecat_type)
# Flatten out Flags, Info
@@ -457,14 +458,6 @@ def read_objects(self, device):
# print(f"Unused: {list(self._unused.keys())}")
return sdos
- @property
- def tree(self):
- if hasattr(self, "_tree"):
- return self._tree
- with self.fpath.open() as f:
- self._tree = etree.parse(f)
- return self._tree
-
sdo_translations = dict(
# Translate SDO data from read_objects() to SDOs.add_sdo() args
Index="index",
@@ -492,12 +485,9 @@ def add_sdo(self, sdos, data):
subidx = sdo["subindex"] = dtc.uint8(sdo.pop("subindex") or 0)
sdos[idx, subidx] = sdo
- def add_device_descriptions(self, fpath):
- """Parse ESI file and cache device information."""
- if fpath in self._fpath_registry:
- return self._fpath_registry[fpath]
-
- self.fpath = fpath
+ def add_device_descriptions(self, stream):
+ """Parse ESI file stream and cache device information."""
+ self.tree = etree.parse(stream)
model_sdos = dict()
for dxml in self.devices_xml:
sdos = dict()
@@ -510,7 +500,28 @@ def add_device_descriptions(self, fpath):
sdo_data = self.read_objects(dxml)
for sd in sdo_data:
self.add_sdo(sdos, sd)
- self._fpath_registry[fpath] = model_sdos
+ return model_sdos
+
+ @classmethod
+ def open_device_description_resource(cls, package, resource):
+ return cls.open_resource(package, resource)
+
+ @classmethod
+ def open_device_description_path(cls, path, *args, **kwargs):
+ return cls.open_path(path, *args, **kwargs)
+
+ def add_device_descriptions_from_resource(self, package, resource):
+ """Parse ESI from package resource."""
+ self.logger.info(f"Reading ESI from ({package}, {resource})")
+ with self.open_device_description_resource(package, resource) as f:
+ model_sdos = self.add_device_descriptions(f)
+ return model_sdos
+
+ def add_device_descriptions_from_path(self, fpath):
+ """Parse ESI from XML file path."""
+ self.logger.info(f"Reading ESI from {fpath}")
+ with self.open_device_description_path(fpath) as f:
+ model_sdos = self.add_device_descriptions(f)
return model_sdos
@classmethod
diff --git a/hw_device_mgr/hal/data_types.py b/hw_device_mgr/hal/data_types.py
index cb56dd49..a89cd4b5 100644
--- a/hw_device_mgr/hal/data_types.py
+++ b/hw_device_mgr/hal/data_types.py
@@ -11,15 +11,22 @@ class HALDataType(DataType, HALMixin):
int8=dict(hal_type=HALMixin.HAL_S32),
int16=dict(hal_type=HALMixin.HAL_S32),
int32=dict(hal_type=HALMixin.HAL_S32),
- int64=dict(hal_type=HALMixin.HAL_S64),
uint8=dict(hal_type=HALMixin.HAL_U32),
uint16=dict(hal_type=HALMixin.HAL_U32),
uint32=dict(hal_type=HALMixin.HAL_U32),
- uint64=dict(hal_type=HALMixin.HAL_U64),
float=dict(hal_type=HALMixin.HAL_FLOAT),
double=dict(hal_type=HALMixin.HAL_FLOAT),
# No HAL_STR type
)
+ have_64 = hasattr(HALMixin, "HAL_S64")
+ if have_64:
+ # Machinekit HAL has 64-bit int types, but not LCNC
+ subtype_data.update(
+ dict(
+ int64=dict(hal_type=HALMixin.HAL_S64),
+ uint64=dict(hal_type=HALMixin.HAL_U64),
+ )
+ )
@classmethod
def hal_type_str(cls):
diff --git a/hw_device_mgr/hal/device.py b/hw_device_mgr/hal/device.py
index f2be2dc7..6de2a154 100644
--- a/hw_device_mgr/hal/device.py
+++ b/hw_device_mgr/hal/device.py
@@ -1,6 +1,7 @@
from ..device import Device, SimDevice
from .base import HALMixin
from .data_types import HALDataType
+from functools import cached_property
class HALPinDevice(Device, HALMixin):
@@ -14,26 +15,31 @@ class HALPinDevice(Device, HALMixin):
command_out=(HALMixin.HAL_OUT, ""),
)
- # Prepend this to HAL pin names
- dev_pin_prefix = "d"
-
- @property
+ @cached_property
def compname(self):
return self.comp.getprefix()
def pin_name(self, interface, pname):
return self.pin_prefix + self.pin_interfaces[interface][1] + pname
- def init(self, *, comp, **kwargs):
+ @cached_property
+ def pin_prefix(self):
+ """
+ HAL pin prefix for this device.
+
+ Pin prefix is computed by separating numeric components of the
+ device `address` string with `.` and adding a final `.`, e.g.
+ `(0,5)` -> `0.5.`.
+ """
+ return f"{self.addr_slug}{self.slug_separator}"
+
+ def init(self, /, comp, **kwargs):
super().init(**kwargs)
self.comp = comp
# Get specs for all pins in all interfaces; shared pin names must match,
# except for direction, which becomes HAL_IO if different
all_specs = dict()
- self.pin_prefix = (
- "" if self.index is None else f"{self.dev_pin_prefix}{self.index}_"
- )
for iface, params in self.pin_interfaces.items():
for base_pname, new_spec in self.iface_pin_specs(iface).items():
if base_pname not in all_specs:
diff --git a/hw_device_mgr/hal/tests/test_data_types.py b/hw_device_mgr/hal/tests/test_data_types.py
index b4f2b23e..5307b3a8 100644
--- a/hw_device_mgr/hal/tests/test_data_types.py
+++ b/hw_device_mgr/hal/tests/test_data_types.py
@@ -20,6 +20,11 @@ class TestHALDataType(BaseHALTestClass, _TestDataType):
def test_hal_type_str(self):
for shared_name, exp_str in self.sname_to_typestr.items():
+ if shared_name not in self.data_type_class.subtype_data:
+ # LinuxCNC HAL doesn't have 64-bit int types
+ assert shared_name.endswith("64")
+ print(f"Skipping 64-bit int type {shared_name}")
+ continue
cls = self.data_type_class.by_shared_name(shared_name)
exp_int = self.data_type_class.hal_enum(exp_str[4:])
cls_str, cls_int = (cls.hal_type_str(), cls.hal_type)
diff --git a/hw_device_mgr/lcec/command.py b/hw_device_mgr/lcec/command.py
index 050efbe0..9c9bd146 100644
--- a/hw_device_mgr/lcec/command.py
+++ b/hw_device_mgr/lcec/command.py
@@ -17,7 +17,9 @@ class LCECCommand(EtherCATCommand):
def _parse_output(cls, resp, kwargs):
return resp
- def _ethercat(self, *args, log_lev="debug", dry_run=False):
+ def _ethercat(
+ self, *args, log_lev="debug", dry_run=False, stderr_to_devnull=False
+ ):
"""
Run IgH EtherCAT Master `ethercat` utility.
@@ -30,8 +32,9 @@ def _ethercat(self, *args, log_lev="debug", dry_run=False):
return
getattr(self.logger, log_lev)(" ".join(cmd_args))
+ stderr = subprocess.DEVNULL if stderr_to_devnull else None
try:
- resp = subprocess.check_output(cmd_args)
+ resp = subprocess.check_output(cmd_args, stderr=stderr)
except subprocess.CalledProcessError as e:
raise EtherCATCommandException(str(e))
@@ -40,10 +43,12 @@ def _ethercat(self, *args, log_lev="debug", dry_run=False):
_device_location_re = re.compile(r"=== Master ([0-9]), Slave ([0-9]+) ")
- def scan_bus(self, bus=None):
+ def scan_bus(self, bus=None, **kwargs):
bus = self.default_bus if bus is None else bus
devices = list()
- output = self._ethercat("slaves", f"--master={bus}", "--verbose")
+ output = self._ethercat(
+ "slaves", f"--master={bus}", "--verbose", **kwargs
+ )
for line in output:
line = line.strip()
if line.startswith("==="):
@@ -84,7 +89,9 @@ def master_nic(self, bus=None):
else:
return None
- def upload(self, address=None, index=None, subindex=0, datatype=None):
+ def upload(
+ self, address=None, index=None, subindex=0, datatype=None, **kwargs
+ ):
index = self.data_type_class.uint16(index)
subindex = self.data_type_class.uint16(subindex)
output = self._ethercat(
@@ -94,11 +101,14 @@ def upload(self, address=None, index=None, subindex=0, datatype=None):
f"0x{index:04X}",
f"0x{subindex:02X}",
f"--type={datatype.igh_type}",
+ **kwargs,
)
- # FIXME Handle non-int types
- val_hex, val = output[0].split(" ", 1)
- val = int(val, 10)
- return val
+ if datatype.shared_name == "str":
+ return output[0]
+ else:
+ val_hex, val = output[0].split(" ", 1)
+ val = int(val, 10)
+ return val
def download(
self,
@@ -107,18 +117,19 @@ def download(
subindex=0,
value=None,
datatype=None,
- dry_run=False,
+ **kwargs,
):
self._ethercat(
"download",
f"--master={address[0]}",
f"--position={address[1]}",
+ f"--type={datatype.igh_type}",
+ "--",
f"0x{index:04X}",
f"0x{subindex:02X}",
str(value),
- f"--type={datatype.igh_type}",
log_lev="info",
- dry_run=dry_run,
+ **kwargs,
)
diff --git a/hw_device_mgr/lcec/data_types.py b/hw_device_mgr/lcec/data_types.py
index 48027e83..cf7de9a9 100644
--- a/hw_device_mgr/lcec/data_types.py
+++ b/hw_device_mgr/lcec/data_types.py
@@ -11,12 +11,18 @@ class LCECDataType(EtherCATDataType, HALDataType):
int8=dict(igh_type="int8"),
int16=dict(igh_type="int16"),
int32=dict(igh_type="int32"),
- int64=dict(igh_type="int64"),
uint8=dict(igh_type="uint8"),
uint16=dict(igh_type="uint16"),
uint32=dict(igh_type="uint32"),
- uint64=dict(igh_type="uint64"),
float=dict(igh_type="float"),
double=dict(igh_type="double"),
- # Strings not usable by `ethercat` tool
+ str=dict(igh_type="string"),
)
+ if HALDataType.have_64:
+ # Machinekit HAL has 64-bit int types, but not LCNC
+ subtype_data.update(
+ dict(
+ int64=dict(igh_type="int64"),
+ uint64=dict(igh_type="uint64"),
+ )
+ )
diff --git a/hw_device_mgr/lcec/tests/base_test_class.py b/hw_device_mgr/lcec/tests/base_test_class.py
index 0f343ab9..f81d61ef 100644
--- a/hw_device_mgr/lcec/tests/base_test_class.py
+++ b/hw_device_mgr/lcec/tests/base_test_class.py
@@ -45,8 +45,9 @@ def mock_ethercat_command(self):
commands. Patches `subprocess.check_output()`.
"""
- def emulate_ethercat_command(args):
+ def emulate_ethercat_command(args, **kwargs):
print(f'mocking command: {" ".join(args)}')
+ print(f" subprocess.check_output kwargs: {repr(kwargs)}")
# Parse out args, kwargs
assert args.pop(0) == "ethercat"
cmd = args.pop(0)
diff --git a/hw_device_mgr/lcec/tests/bogus_devices/device.py b/hw_device_mgr/lcec/tests/bogus_devices/device.py
index 7f547978..a1c8967d 100644
--- a/hw_device_mgr/lcec/tests/bogus_devices/device.py
+++ b/hw_device_mgr/lcec/tests/bogus_devices/device.py
@@ -6,6 +6,7 @@
class BogusLCECDevice(LCECSimDevice, RelocatableESIDevice):
category = "bogus_lcec_devices"
vendor_id = 0xB090C0
+ xml_description_package = "hw_device_mgr.devices.device_xml"
class BogusLCECV1Servo(BogusLCECDevice, CiA402SimDevice):
diff --git a/hw_device_mgr/lcec/tests/bogus_devices/device_xml b/hw_device_mgr/lcec/tests/bogus_devices/device_xml
deleted file mode 120000
index 7de15941..00000000
--- a/hw_device_mgr/lcec/tests/bogus_devices/device_xml
+++ /dev/null
@@ -1 +0,0 @@
-../../../ethercat/tests/bogus_devices/device_xml
\ No newline at end of file
diff --git a/hw_device_mgr/lcec/tests/test_data_types.py b/hw_device_mgr/lcec/tests/test_data_types.py
index a0b9677a..b05e6357 100644
--- a/hw_device_mgr/lcec/tests/test_data_types.py
+++ b/hw_device_mgr/lcec/tests/test_data_types.py
@@ -25,6 +25,11 @@ class TestLCECDataType(
def test_igh_type_attr(self):
for shared_name in self.defined_shared_types:
+ if shared_name not in self.data_type_class.subtype_data:
+ # LinuxCNC HAL doesn't have 64-bit int types
+ assert shared_name.endswith("64")
+ print(f"Skipping 64-bit int type {shared_name}")
+ continue
cls = self.data_type_class.by_shared_name(shared_name)
print("cls:", cls)
assert hasattr(cls, "igh_type")
diff --git a/hw_device_mgr/lcec/tests/test_device.py b/hw_device_mgr/lcec/tests/test_device.py
index 51cb78ca..0d613301 100644
--- a/hw_device_mgr/lcec/tests/test_device.py
+++ b/hw_device_mgr/lcec/tests/test_device.py
@@ -19,7 +19,7 @@ class TestLCECDevice(BaseLCECTestClass, _TestEtherCATDevice, _TestHALDevice):
]
@pytest.fixture
- def obj(self, device_cls, sim_device_data, sdo_data, mock_halcomp):
+ def obj(self, sim_device_data, mock_halcomp):
self.obj = self.device_model_cls(
address=sim_device_data["test_address"]
)
diff --git a/hw_device_mgr/mgr/mgr.py b/hw_device_mgr/mgr/mgr.py
index 5d30c4ad..93e5c498 100644
--- a/hw_device_mgr/mgr/mgr.py
+++ b/hw_device_mgr/mgr/mgr.py
@@ -15,6 +15,7 @@ class HWDeviceMgr(FysomGlobalMixin, Device):
data_type_class = CiA402Device.data_type_class
device_base_class = CiA402Device
device_classes = None
+ slug_separator = ""
@classmethod
def device_model_id(cls):
@@ -89,7 +90,7 @@ def scan_devices(cls, **kwargs):
def init_device_instances(self, **kwargs):
for i, dev in enumerate(self.devices):
- dev.init(index=i, **kwargs)
+ dev.init(**kwargs)
self.logger.info(f"Adding device #{i}: {dev}")
####################################################
@@ -402,7 +403,7 @@ def fsm_finalize_command(self, e):
####################################################
# Execution
- def run(self):
+ def run_loop(self):
"""Program main loop."""
update_period = 1.0 / self.mgr_config.get("update_rate", 10.0)
while not self.shutdown:
@@ -424,6 +425,21 @@ def run(self):
continue
time.sleep(update_period)
+ def run(self):
+ """Program main."""
+ try:
+ self.run_loop()
+ except KeyboardInterrupt:
+ self.logger.info("Exiting at keyboard interrupt")
+ return 0
+ except Exception:
+ self.logger.error("Exiting at unrecoverable exception:")
+ for line in traceback.format_exc().splitlines():
+ self.logger.error(line)
+ return 1
+ self.logger.info("Exiting")
+ return 0
+
def read_update_write(self):
"""
Read hardware, update controller, write hardware.
diff --git a/hw_device_mgr/mgr/tests/base_test_class.py b/hw_device_mgr/mgr/tests/base_test_class.py
index d8f82b5a..248c7e5e 100644
--- a/hw_device_mgr/mgr/tests/base_test_class.py
+++ b/hw_device_mgr/mgr/tests/base_test_class.py
@@ -21,14 +21,16 @@ class BaseMgrTestClass(BaseDevicesTestClass):
# major reason for the separate test base classes: to provide relevant
# fixtures without dragging in irrelevant tests.)
- # test_read_update_write() configuration
- read_update_write_yaml = "mgr/tests/read_update_write.cases.yaml"
+ # test_read_update_write() configuration:
+ # CiA NMT init online & operational status
+ read_update_write_package = "hw_device_mgr.mgr.tests"
# Manager configuration
- mgr_config_yaml = "mgr/tests/bogus_devices/mgr_config.yaml"
+ mgr_config_package = "hw_device_mgr.mgr.tests.bogus_devices"
+ mgr_config_yaml = "mgr_config.yaml"
# Device model SDOs; for test fixture
- device_sdos_yaml = "devices/tests/sim_sdo_data.yaml"
+ device_sdos_package = "hw_device_mgr.devices.tests"
# Manager class
device_class = HWDeviceMgrTest
@@ -47,8 +49,10 @@ class BaseMgrTestClass(BaseDevicesTestClass):
@pytest.fixture
def mgr_config(self):
- self.mgr_config = self.load_yaml(self.mgr_config_yaml)
- return self.mgr_config
+ rsrc = self.mgr_config_package, self.mgr_config_yaml
+ mgr_config = self.load_yaml_resource(*rsrc)
+ assert mgr_config, f"Empty YAML package resource {rsrc}"
+ return mgr_config
@pytest.fixture
def device_cls(self, device_config, extra_fixtures):
diff --git a/hw_device_mgr/mgr/tests/bogus_devices/device_xml b/hw_device_mgr/mgr/tests/bogus_devices/device_xml
deleted file mode 120000
index 19f66975..00000000
--- a/hw_device_mgr/mgr/tests/bogus_devices/device_xml
+++ /dev/null
@@ -1 +0,0 @@
-../../../devices/device_xml
\ No newline at end of file
diff --git a/hw_device_mgr/mgr/tests/test_mgr.py b/hw_device_mgr/mgr/tests/test_mgr.py
index ac0a5a5f..45739888 100644
--- a/hw_device_mgr/mgr/tests/test_mgr.py
+++ b/hw_device_mgr/mgr/tests/test_mgr.py
@@ -14,9 +14,6 @@ class TestHWDeviceMgr(BaseMgrTestClass, _TestDevice):
*_TestDevice.expected_mro,
]
- # Test CiA NMT init: online & operational status
- read_update_write_yaml = "mgr/tests/read_update_write.cases.yaml"
-
@pytest.fixture
def obj(self, device_cls, mgr_config, device_config, all_device_data):
self.obj = device_cls()
diff --git a/hw_device_mgr/mgr_hal/tests/bogus_devices/device_xml b/hw_device_mgr/mgr_hal/tests/bogus_devices/device_xml
deleted file mode 120000
index 19f66975..00000000
--- a/hw_device_mgr/mgr_hal/tests/bogus_devices/device_xml
+++ /dev/null
@@ -1 +0,0 @@
-../../../devices/device_xml
\ No newline at end of file
diff --git a/hw_device_mgr/mgr_ros/mgr.py b/hw_device_mgr/mgr_ros/mgr.py
index 2f720967..6afbb47f 100644
--- a/hw_device_mgr/mgr_ros/mgr.py
+++ b/hw_device_mgr/mgr_ros/mgr.py
@@ -1,11 +1,10 @@
from ..mgr.mgr import HWDeviceMgr, SimHWDeviceMgr
+from ..config_io import ConfigIO
import rclpy
-import yaml
-import os
import traceback
-class ROSHWDeviceMgr(HWDeviceMgr):
+class ROSHWDeviceMgr(HWDeviceMgr, ConfigIO):
def get_param(self, name, default=None):
if self.ros_node.has_parameter(name):
param = self.ros_node.get_parameter(name)
@@ -44,23 +43,15 @@ def init_devices(self, **kwargs):
device_config_path = self.get_param("device_config_path")
assert device_config_path, "No 'device_config_path' param defined"
self.logger.info(f"Reading device config from '{device_config_path}'")
- assert os.path.exists(device_config_path)
- with open(device_config_path, "r") as f:
- device_config = yaml.safe_load(f)
- assert device_config
+ device_config = self.load_yaml_path(device_config_path)
+ assert device_config, f"Empty YAML file '{device_config_path}'"
super().init_devices(device_config=device_config, **kwargs)
def init_sim_from_rosparams(self, **kwargs):
sim_device_data_path = self.get_param("sim_device_data_path")
assert sim_device_data_path, "No 'sim_device_data_path' param defined"
- assert os.path.exists(
- sim_device_data_path
- ), f"Device data path doesn't exist: '{sim_device_data_path}'"
- self.logger.info(
- f"Reading sim device config from {sim_device_data_path}"
- )
- with open(sim_device_data_path, "r") as f:
- sim_device_data = yaml.safe_load(f)
+ sim_device_data = self.load_yaml_path(sim_device_data_path)
+ assert sim_device_data, f"Empty YAML file '{sim_device_data_path}'"
self.init_sim(sim_device_data=sim_device_data, **kwargs)
def read_update_write(self):
diff --git a/hw_device_mgr/mgr_ros/tests/base_test_class.py b/hw_device_mgr/mgr_ros/tests/base_test_class.py
index f2929c46..e6f1740b 100644
--- a/hw_device_mgr/mgr_ros/tests/base_test_class.py
+++ b/hw_device_mgr/mgr_ros/tests/base_test_class.py
@@ -1,5 +1,4 @@
from ...mgr.tests.base_test_class import BaseMgrTestClass
-import yaml
import pytest
try:
@@ -58,8 +57,7 @@ def sim_device_data_path(self, tmp_path, mock_rclpy):
for d in sim_device_data:
d["product_code"] = int(d["product_code"])
d["vendor_id"] = int(d["vendor_id"])
- with open(tmpfile, "w") as f:
- f.write(yaml.safe_dump(sim_device_data))
+ self.dump_yaml_path(tmpfile, sim_device_data)
self.rosparams["sim_device_data_path"] = tmpfile
yield tmpfile
@@ -73,8 +71,7 @@ def device_config_path(self, tmp_path, device_config, mock_rclpy):
dc["product_code"] = int(dc["product_code"])
dc["vendor_id"] = int(dc["vendor_id"])
tmpfile = tmp_path / "device_config.yaml"
- with open(tmpfile, "w") as f:
- f.write(yaml.safe_dump(device_config))
+ self.dump_yaml_path(tmpfile, device_config)
self.rosparams["device_config_path"] = tmpfile
print(f"Cleaned device config written to {tmpfile}")
yield tmpfile
diff --git a/hw_device_mgr/mgr_ros/tests/bogus_devices/device_xml b/hw_device_mgr/mgr_ros/tests/bogus_devices/device_xml
deleted file mode 120000
index 19f66975..00000000
--- a/hw_device_mgr/mgr_ros/tests/bogus_devices/device_xml
+++ /dev/null
@@ -1 +0,0 @@
-../../../devices/device_xml
\ No newline at end of file
diff --git a/hw_device_mgr/mgr_ros/tests/test_mgr.py b/hw_device_mgr/mgr_ros/tests/test_mgr.py
index 79e11d28..9fe1bea4 100644
--- a/hw_device_mgr/mgr_ros/tests/test_mgr.py
+++ b/hw_device_mgr/mgr_ros/tests/test_mgr.py
@@ -11,6 +11,7 @@ class TestROSHWDeviceMgr(BaseROSMgrTestClass, _TestHWDeviceMgr):
"ROSSimHWDeviceMgr",
"ROSHWDeviceMgr",
*_TestHWDeviceMgr.expected_mro[1:],
+ "ConfigIO",
]
rclpy_patches = [
"hw_device_mgr.mgr_ros.mgr.rclpy",
diff --git a/hw_device_mgr/mgr_ros_hal/device_xml b/hw_device_mgr/mgr_ros_hal/device_xml
deleted file mode 120000
index 08f7d8e8..00000000
--- a/hw_device_mgr/mgr_ros_hal/device_xml
+++ /dev/null
@@ -1 +0,0 @@
-../devices/device_xml
\ No newline at end of file
diff --git a/hw_device_mgr/mgr_ros_hal/tests/bogus_devices/device_xml b/hw_device_mgr/mgr_ros_hal/tests/bogus_devices/device_xml
deleted file mode 120000
index 19f66975..00000000
--- a/hw_device_mgr/mgr_ros_hal/tests/bogus_devices/device_xml
+++ /dev/null
@@ -1 +0,0 @@
-../../../devices/device_xml
\ No newline at end of file
diff --git a/hw_device_mgr/mgr_ros_hal/tests/test_mgr.py b/hw_device_mgr/mgr_ros_hal/tests/test_mgr.py
index 87f01d8c..70d6cdaf 100644
--- a/hw_device_mgr/mgr_ros_hal/tests/test_mgr.py
+++ b/hw_device_mgr/mgr_ros_hal/tests/test_mgr.py
@@ -12,5 +12,7 @@ class TestROSHWDeviceMgr(
"ROSHALSimHWDeviceMgr",
"ROSHALHWDeviceMgr",
*_TestROSHWDeviceMgr.expected_mro[1:3], # ROS{Sim...}HWDeviceMgr
- *_TestHALHWDeviceMgr.expected_mro[1:], # HALSimHWDeviceMgr...HALMixin
+ *_TestHALHWDeviceMgr.expected_mro[1:-1], # HALSimHWDeviceMgr...ABC
+ "ConfigIO",
+ _TestHALHWDeviceMgr.expected_mro[-1], # HALMixin
]
diff --git a/hw_device_mgr/tests/base_test_class.py b/hw_device_mgr/tests/base_test_class.py
index 5d0c7ebf..95718fbe 100644
--- a/hw_device_mgr/tests/base_test_class.py
+++ b/hw_device_mgr/tests/base_test_class.py
@@ -1,7 +1,5 @@
import pytest
-from pathlib import Path
-import os
-import yaml
+from ..config_io import ConfigIO
from .bogus_devices.data_types import BogusDataType
from .bogus_devices.device import (
BogusDevice,
@@ -11,11 +9,12 @@
)
-class BaseTestClass:
+class BaseTestClass(ConfigIO):
"""Base test class providing fixtures for use with `bogus_devices`."""
# Device scan data; for test fixture
- sim_device_data_yaml = "tests/sim_devices.yaml"
+ sim_device_data_package = "hw_device_mgr.tests"
+ sim_device_data_yaml = "sim_devices.yaml"
# Data types
# Classes under test in this module
@@ -26,20 +25,15 @@ class BaseTestClass:
# Sim mode by default
sim = True
- @classmethod
- def load_yaml(cls, fname, return_path=False):
- p = Path(__file__).parent.parent.joinpath(fname)
- with p.open() as f:
- data = yaml.safe_load(f)
- return (p, data) if return_path else data
-
@classmethod
def test_category_class(cls, test_category):
for dmc in cls.device_model_classes:
assert dmc.name
if dmc.test_category == test_category:
return dmc
- raise ValueError(f"No device in test category class '{test_category}'")
+ raise ValueError(
+ f"{cls}: No device in test category class '{test_category}'"
+ )
@classmethod
def munge_sim_device_data(cls, sim_device_data):
@@ -69,13 +63,17 @@ def init_sim(cls, **kwargs):
cls.device_class.clear_devices()
cls.device_class.init_sim(**kwargs)
+ @classmethod
+ def load_sim_device_data(cls):
+ rsrc = cls.sim_device_data_package, cls.sim_device_data_yaml
+ dev_data = cls.load_yaml_resource(*rsrc)
+ assert dev_data, f"Empty device data in package resource {rsrc}"
+ return dev_data
+
@classmethod
def init_sim_device_data(cls):
- # Set up sim devices: munge YAML data & pass to sim device class
- cls.sim_device_data_path, dev_data = cls.load_yaml(
- cls.sim_device_data_yaml, True
- )
- print(f" Raw sim_device_data from {cls.sim_device_data_path}")
+ # Set up sim devices: munge data & pass to sim device class
+ dev_data = cls.load_sim_device_data()
return cls.munge_sim_device_data(dev_data)
@pytest.fixture
@@ -100,24 +98,10 @@ def pytest_generate_tests(self, metafunc):
if "sim_device_data" not in metafunc.fixturenames:
return
- path, sim_device_data = self.load_yaml(self.sim_device_data_yaml, True)
- sim_device_data = self.munge_sim_device_data(sim_device_data)
+ data_raw = self.load_sim_device_data()
+ sim_device_data = self.munge_sim_device_data(data_raw)
vals, ids = (list(), list())
for dev in sim_device_data:
ids.append(f"{dev['test_name']}@{dev['test_address']}")
vals.append(dev)
metafunc.parametrize("sim_device_data", vals, ids=ids, scope="class")
-
- @pytest.fixture
- def fpath(self):
- """Fixture that returns test directory."""
- # This line resolves black & pep257 conflicts. :P
-
- def func(base_name=None):
- cwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
- if base_name is None:
- return cwd
- else:
- return os.path.join(cwd, base_name)
-
- return func
diff --git a/hw_device_mgr/tests/test_device.py b/hw_device_mgr/tests/test_device.py
index a2236bd6..9b48cea9 100644
--- a/hw_device_mgr/tests/test_device.py
+++ b/hw_device_mgr/tests/test_device.py
@@ -3,7 +3,6 @@
from ..device import Device
import subprocess
from pprint import pformat
-import ruamel.yaml
class TestDevice(BaseTestClass):
@@ -122,7 +121,7 @@ def obj(self, device_cls, sim_device_data):
yield self.obj
def test_init(self, obj):
- assert hasattr(obj, "index")
+ pass # Base class init() method does nothing
def test_set_sim_feedback(self, obj):
res = obj.set_sim_feedback()
@@ -136,8 +135,9 @@ def test_set_sim_feedback(self, obj):
# - Check expected feedback & command, in & out
# Configuration
- # - Path to .yaml test cases (relative to `tests/` directory)
- read_update_write_yaml = None
+ # - YAML test cases package resource
+ read_update_write_package = None # Skip tests if None
+ read_update_write_yaml = "read_update_write.cases.yaml"
# - Translate feedback/command test input params from values
# human-readable in .yaml to values matching actual params
read_update_write_translate_feedback_in = dict()
@@ -338,21 +338,19 @@ def read_update_write_loop(self, test_case):
self.set_command_and_check()
self.write_and_check()
- def test_read_update_write(self, obj, fpath):
- test_cases_yaml = getattr(self, "read_update_write_yaml", None)
- if test_cases_yaml is None:
+ def test_read_update_write(self, obj):
+ if self.read_update_write_package is None:
return # No test cases defined for this class
- with open(fpath(test_cases_yaml)) as f:
- yaml = ruamel.yaml.YAML()
- test_cases = yaml.load(f)
- print(f"Read test cases from {fpath(test_cases_yaml)}")
-
+ rsrc = (self.read_update_write_package, self.read_update_write_yaml)
+ test_cases = self.load_yaml_resource(*rsrc)
+ assert test_cases, f"Empty YAML from package resource {rsrc}"
+ print(f"Read test cases from package resource {rsrc}")
for test_case in test_cases:
self.read_update_write_loop(test_case)
def test_dot(self, tmp_path):
# Test class diagram
- gv_file = tmp_path / ".." / f"{self.device_class.category}.gv"
+ gv_file = tmp_path / f"{self.device_class.category}.gv"
assert not gv_file.exists()
with gv_file.open("w") as f:
f.write(self.device_class.dot())