diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml new file mode 100644 index 0000000..81c844a --- /dev/null +++ b/.github/workflows/build-validation.yml @@ -0,0 +1,29 @@ +name: Build validation + +on: push + +jobs: + build: + name: Build distribution + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + with: + persist-credentials: false + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.x" + - name: Install pypa/build/pylint + run: >- + python3 -m + pip install + build + pyserial + pylint + --user + - name: Build a binary wheel and a source tarball + run: python3 -m build + - name: Run linter + run: pylint ./src/science_mode_4 diff --git a/.pylintrc b/.pylintrc index 1b8ae2f..a716a25 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,12 @@ [MAIN] -max-line-length=140 \ No newline at end of file +max-line-length=140 +max-attributes=11 + +[DESIGN] +max-statements=100 + +[STRING] +check-quote-consistency=yes + +[MESSAGES CONTROL] +disable=too-few-public-methods \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 266e7b4..b54df0a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,7 +9,7 @@ "type": "debugpy", "request": "launch", // "program": "__main__.py", - "module": "examples.dyscom.example_dyscom_write_csv", + "module": "examples.mid_level.example_mid_level", "justMyCode": false, // "args": ["COM3"], "console": "integratedTerminal" diff --git a/HINTS.md b/HINTS.md index 50752d9..e50b1f8 100644 --- a/HINTS.md +++ b/HINTS.md @@ -1,5 +1,62 @@ # Introduction -This page describes implementation hints. +This page describes implementation details. + +# General information + +## Basic information +- Starting point is using a _Device_ object matching attached hardware: _DeviceP24_ or _DeviceI24_ + - If you have multiple devices, create multiple _Device_ instances + - With _capabilities_ it is possible to query for available layer +- To create a _Device_ object a _Connection_ object is required, use _SerialConnection_ to connect to a serial port + - _Connection_ must be opened and closed +- Call _device.initialize()_ to get a defined state of the device (it stops any active stimulation/measurement) +- _Device_ object has layers to access commands + - _Layer_ object has functions to send commands to the device and process acknowledges + - To access layer, use helper functions _get\_layer\_xxx_ + - _DeviceP24_ has layer general, low level and mid level + - _DeviceI24_ has layer general and dyscom + - Do not mix usage of layers because device has a single internal state, e.g. calling low level _init()_ and afterwards mid level _update()_ will not work + +## Device communication +- Most functions communicating with the device are async functions, because they wait for a matching acknowledge and return values from acknowledge + - If no matching acknowledge or no acknowledge arrives in time, an exception is raised +- Additionally functions with naming schema _send_xxx_ are normal functions not waiting for acknowledge + - The acknowledge needs to handled manually by using _PacketBuffer_ object from device + - _PacketBuffer_ reads data from connection and separates packets from data stream + +## General layer +- Contains functions to get common information like device serial or firmware version + +## Mid level layer +- Contains functions for mid level stimulation +- This mode is good to let the device stimulate a predefined pattern until _stop()_ is send +- Usage + - Call _init()_ to set device in mid level mode + - Call _update()_ with stimulation pattern + - Call _get_current_data()_ every 1.5s to keep stimulation ongoing + - Call _stop()_ to end stimulation and leave mid level mode + +## Low level layer +- Contains functions for low level stimulation +- This mode is good to react to a external trigger to change stimulation pattern +- Without _send_channel_config()_ the device will not stimulate +- Usage + - Call _init()_ to set device in low level mode + - Update configuration with _send_channel_config()_ when external trigger occurs + - As soon as _send_channel_config()_ arrives at device, it stimulates according stimulation pattern + - It stops stimulation when stimulation pattern is over + - Call _stop()_ to leave low level mode + +## Dyscom layer +- Contains functions for dyscom level +- This mode is used by I24 to measure EMG or BI +- Usage + - Call _power_module()_ to power on measurement module + - Call _init()_ with parameter for measurement + - Call _start()_ to start measurement + - Device sends now _DlSendLiveData_ packets with measurement data + - Call _stop()_ to end measurement + - Call _power_module()_ to power off measurement module # Deviation from Instruction for Use @@ -20,15 +77,15 @@ This page describes implementation hints. - 0 -> Battery (was Unused) ### DL_get_ack for type battery -- energy state, 1 byte, is a flag, bit 1: cable connected, bit 2: device is loading -- percentage, 1 byte, [0, 100] in percent -- temperature, 1 byte, [-128, 127] in degrees -- current, 4 bytes, [-327675, 327675] in milli ampere -- voltage, 4 bytes, [0, 65535] in milli volt +- Energy state, 1 byte, is a flag, bit 1: cable connected, bit 2: device is loading +- Percentage, 1 byte, [0, 100] in percent +- Temperature, 1 byte, [-128, 127] in degrees +- Current, 4 bytes, [-327675, 327675] in milliampere +- Voltage, 4 bytes, [0, 65535] in millivolt ### DL_send_file_ack -- block number, 4 byte, block number of DL_send_file +- Block number, 4 byte, block number of DL_send_file ### DL_send_live_data -- parameters are big endian? +- Parameters are big endian? - SignalType for each sample is always 0 \ No newline at end of file diff --git a/README.md b/README.md index aa80da9..5b92f42 100644 --- a/README.md +++ b/README.md @@ -21,18 +21,22 @@ Python 3.11 or higher - PySerial - https://pypi.org/project/pyserial/ - `pip install pyserial` -- PyUSB +- PyUSB - currently not used - https://pypi.org/project/pyusb/ - `pip install pyusb` - On Windows - Download libusb from https://libusb.info/ - - Copy libusb-XX.dll into venv root folder (besides python.exe) - - Under Windows it there are driver issues + - Copy libusb-XX.dll into environment root folder (besides python.exe) + - Under Windows there are driver issues + - Code is currently commented out and not usable ## Build library -- Only necessary, if you made changes to the library +- Only necessary, if you made changes to the library or install a version from a branch - Install dependencies - `python -m pip install --upgrade build` +- Optional run linter + - `pip install pylint` + - `pylint .\src\science_mode_4\` - Build project - `python -m build` - Install local library @@ -44,17 +48,20 @@ Python 3.11 or higher - Located in folder `examples` - Run examples with `python -m examples..` - Example: `python -m examples.dyscom.example_dyscom_fastplotlib` +- Examples have own dependencies, see [Dependencies for examples](#dependencies-for-examples) - General layer - `example_general.py` - Demonstrates how to use general layer to initialize device and get serial number and firmware version - Mid level layer + - `example_mid_level_simple` + - Demonstrates how to use mid level layer, where a stimulation pattern is send to the stimulator and the device automatically executes the pattern by itself for 15s - `example_mid_level.py` - - Demonstrates how to use mid level layer, where a stimulation pattern is send to the stimulator and the device automatically executes the pattern by itself until stopped + - Demonstrates how to use mid level layer, where a stimulation pattern is send to the stimulator and the device automatically executes the pattern by itself until user ends stimulation by keyboard - Low level layer - `example_low_level.py` - Demonstrates how to use low level layer, where host has to trigger stimulation manually, in this case by pressing a key - `example_low_level_plot.py` - - Demonstrates how to use low level layer to measure current and plot it in a graph using PyPlot + - Demonstrates how to use low level layer to stimulate, measure current and plot it in a graph using PyPlot - Dyscom layer - `example_dyscom_get` - Demonstrate how to use different get commands from dyscom layer @@ -66,10 +73,11 @@ Python 3.11 or higher - Demonstrate how to use dyscom layer to measure BI and EMG and writing measurement data to a .csv-file ## Dependencies for examples - -- Keyboard - - https://pypi.org/project/keyboard/ - - `pip install keyboard` +- Install all dependencies + - `pip install -r examples/requirements.txt` +- Py-Getch + - https://pypi.org/project/py-getch/ + - `pip install py-getch` - NumPy - https://pypi.org/project/numpy/ - `pip install numpy` diff --git a/__main__.py b/__main__.py index 1daed4a..1407cc3 100644 --- a/__main__.py +++ b/__main__.py @@ -1,14 +1,13 @@ """Test program how to use library without installing the library, DO NOT USE THIS FILE, USE EXAMPLES INSTEAD""" -from imaplib import Commands import sys import asyncio import matplotlib.pyplot as plt import numpy as np -from src.science_mode_4 import LayerDyscom, LayerLowLevel, LayerMidLevel,\ +from src.science_mode_4 import LayerDyscom, LayerLowLevel,\ Commands, Connector, Channel, ChannelPoint,\ SerialPortConnection,\ DeviceI24,\ @@ -84,10 +83,8 @@ def update_ylim(data: list[float]): new_min = data[0] new_max = data[0] for x in data: - if x < new_min: - new_min = x - if x > new_max: - new_max = x + new_min = min(new_min, x) + new_max = max(new_max, x) offset = (new_max - new_min) * 0.1 plt.ylim(new_min - offset, new_max + offset) @@ -140,71 +137,6 @@ def update_ylim(data: list[float]): await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_OFF) print(used_signals) - # c1p1: ChannelPoint = ChannelPoint(200, 20) - # c1p2: ChannelPoint = ChannelPoint(100, 0) - # c1p3: ChannelPoint = ChannelPoint(200, -20) - # cc1 = MidLevelChannelConfiguration(True, 3, 20, [c1p1, c1p2, c1p3]) - - # c2p1: ChannelPoint = ChannelPoint(100, 100) - # c2p2: ChannelPoint = ChannelPoint(100, 0) - # c2p3: ChannelPoint = ChannelPoint(100, -100) - # cc2 = MidLevelChannelConfiguration(True, 3, 10, [c2p1, c2p2, c2p3]) - - # mid_level = device.get_layer_mid_level() - # await mid_level.init(False) - # await mid_level.update([cc1, cc2]) - # for _ in range(100): - # update = await mid_level.get_current_data() - # print(update) - - # await asyncio.sleep(1) - - # await mid_level.stop() - - # # get low level layer to call low level commands - # low_level_layer = device.get_layer_low_level() - - # # call init low level - # await low_level_layer.init(LowLevelMode.STIM_CURRENT, LowLevelHighVoltageSource.STANDARD) - - # # now we can start stimulation - # counter = 0 - # ms: list[float] = [] - # sample_time = 0 - # while counter < 10: - # # get new data from connection - # # both append_bytes_to_buffer and get_packet_from_buffer should be called regulary - # new_buffer_data = device.connection.read() - # if len(new_buffer_data) > 0: - # low_level_layer.packet_buffer.append_bytes_to_buffer(new_buffer_data) - # # we added new data to buffer, so there may be new valid acknowledges - # packet_ack = low_level_layer.packet_buffer.get_packet_from_buffer() - # # do something with packet ack - # # here we print that an acknowledge arrived - # # print(f"I {packet_ack}") - # if packet_ack.command == Commands.LowLevelChannelConfigAck: - # ll_config_ack: PacketLowLevelChannelConfigAck = packet_ack - # ms.extend(ll_config_ack.measurement_samples) - # sample_time = ll_config_ack.sampling_time_in_microseconds - # print(f"sample time {ll_config_ack.sampling_time_in_microseconds}") - # print(ms) - - # # if counter % 10 == 0: - # # send_channel_config(low_level_layer, Connector.GREEN) - # # elif counter % 10 == 5: - # # send_channel_config(low_level_layer, Connector.YELLOW) - - # if counter % 10 == 0: - # low_level_layer.send_channel_config(True, Channel.RED, Connector.GREEN, - # [ChannelPoint(2000, 40), ChannelPoint(1000, 0), - # ChannelPoint(1000, -20)]) - # await asyncio.sleep(0.01) - # counter += 1 - - # # wait until all acknowledges are received - # await asyncio.sleep(0.5) - # # call stop low level - # await low_level_layer.stop() connection.close() diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/low_level/example_low_level.py b/examples/low_level/example_low_level.py index 3070970..b1aa297 100644 --- a/examples/low_level/example_low_level.py +++ b/examples/low_level/example_low_level.py @@ -67,16 +67,12 @@ def input_callback(input_value: str) -> bool: # now we can start stimulation while keyboard_input_thread.is_alive(): - # get new data from connection - # both append_bytes_to_buffer and get_packet_from_buffer should be called regulary - new_buffer_data = device.connection.read() - if len(new_buffer_data) > 0: - low_level_layer.packet_buffer.append_bytes_to_buffer(new_buffer_data) - # we added new data to buffer, so there may be new valid acknowledges - packet_ack = low_level_layer.packet_buffer.get_packet_from_buffer() + # get new packets from connection + ack = low_level_layer.packet_buffer.get_packet_from_buffer() + if ack: # do something with packet ack # here we print that an acknowledge arrived - print(packet_ack) + print(ack) await asyncio.sleep(0.1) diff --git a/examples/low_level/example_low_level_plot.py b/examples/low_level/example_low_level_plot.py index 6292c67..ea7e042 100644 --- a/examples/low_level/example_low_level_plot.py +++ b/examples/low_level/example_low_level_plot.py @@ -48,14 +48,10 @@ async def main() -> int: measurement_sample_time = 0 measurement_samples: list[float] = [] # get new data from connection - new_buffer_data = device.connection.read() - if len(new_buffer_data) > 0: - low_level_layer.packet_buffer.append_bytes_to_buffer(new_buffer_data) - # we added new data to buffer, so there may be new valid acknowledges - packet_ack = low_level_layer.packet_buffer.get_packet_from_buffer() - # do something with packet ack - if packet_ack.command == Commands.LowLevelChannelConfigAck: - ll_config_ack: PacketLowLevelChannelConfigAck = packet_ack + ack = low_level_layer.packet_buffer.get_packet_from_buffer() + if ack: + if ack.command == Commands.LowLevelChannelConfigAck: + ll_config_ack: PacketLowLevelChannelConfigAck = ack measurement_sample_time = ll_config_ack.sampling_time_in_microseconds measurement_samples.extend(ll_config_ack.measurement_samples) diff --git a/examples/mid_level/example_mid_level.py b/examples/mid_level/example_mid_level.py index 6c80113..5bac1e2 100644 --- a/examples/mid_level/example_mid_level.py +++ b/examples/mid_level/example_mid_level.py @@ -21,9 +21,8 @@ def input_callback(input_value: str) -> bool: if input_value == "q": # end keyboard input thread return True - else: - print("Invalid command") + print("Invalid command") return False print("Usage: stimulation is running, press q to quit") diff --git a/examples/mid_level/example_mid_level_simple.py b/examples/mid_level/example_mid_level_simple.py new file mode 100644 index 0000000..fa7caa7 --- /dev/null +++ b/examples/mid_level/example_mid_level_simple.py @@ -0,0 +1,65 @@ +"""Provides an example how to use mid level layer""" + +import sys +import asyncio + +from science_mode_4 import DeviceP24 +from science_mode_4 import MidLevelChannelConfiguration +from science_mode_4 import ChannelPoint +from science_mode_4 import SerialPortConnection +from examples.utils.example_utils import ExampleUtils + + +async def main() -> int: + """Main function""" + + # get comport from command line argument + com_port = ExampleUtils.get_comport_from_commandline_argument() + # create serial port connection + connection = SerialPortConnection(com_port) + # open connection, now we can read and write data + connection.open() + + # create science mode device + device = DeviceP24(connection) + # call initialize to get basic information (serial, versions) and stop any active stimulation/measurement + # to have a defined state + await device.initialize() + + # simple stimulation pattern + c1p1: ChannelPoint = ChannelPoint(200, 20) + c1p2: ChannelPoint = ChannelPoint(100, 0) + c1p3: ChannelPoint = ChannelPoint(200, -20) + cc1 = MidLevelChannelConfiguration(True, 3, 20, [c1p1, c1p2, c1p3]) + + c2p1: ChannelPoint = ChannelPoint(100, 10) + c2p2: ChannelPoint = ChannelPoint(100, 0) + c2p3: ChannelPoint = ChannelPoint(100, -10) + cc2 = MidLevelChannelConfiguration(True, 3, 10, [c2p1, c2p2, c2p3]) + + # get mid level layer to call mid level commands + mid_level = device.get_layer_mid_level() + # call init mid level, we want to stop on all stimulation errors + await mid_level.init(True) + # set stimulation pattern, P24 device will now stimulate according this pattern + await mid_level.update([cc1, cc2]) + + # stimulate for 15s + for _ in range(15): + # we have to call get_current_data() every 1.5s to keep stimulation ongoing + update = await mid_level.get_current_data() # pylint:disable=unused-variable + # print(update) + + await asyncio.sleep(1) + + # call stop mid level + await mid_level.stop() + + # close serial port connection + connection.close() + return 0 + + +if __name__ == "__main__": + res = asyncio.run(main()) + sys.exit(res) diff --git a/examples/requirements.txt b/examples/requirements.txt new file mode 100644 index 0000000..7451fcf Binary files /dev/null and b/examples/requirements.txt differ diff --git a/examples/utils/example_utils.py b/examples/utils/example_utils.py index 48195cb..9d2c771 100644 --- a/examples/utils/example_utils.py +++ b/examples/utils/example_utils.py @@ -3,7 +3,7 @@ import sys import threading from typing import Callable -import keyboard +from getch import getch from science_mode_4 import SerialPortConnection @@ -19,11 +19,16 @@ def __init__(self, input_cbk: Callable[[str], bool]): def run(self): while True: - event = keyboard.read_event(suppress=True) - if event.event_type == keyboard.KEY_DOWN: - if self._input_cbk(event.name): - # callback returned True, so end thread - break + # getch() returns a bytes object + key_raw = getch() + key = bytes.decode(key_raw) + # handle ctrl+c + if key == "\x03": + raise KeyboardInterrupt + + if self._input_cbk(key): + # callback returned True, so end thread + break class ExampleUtils(): @@ -41,7 +46,8 @@ def get_comport_from_commandline_argument() -> str: return ports[0].device # nothing found -> exit - print("Serial port command line argument missing (e.g. python -m example_xxx.py COM3)") + print("No science mode device found") + print("Serial port command line argument missing (e.g. python -m examples.. COM3)") sys.exit(1) com_port = sys.argv[1] diff --git a/examples/utils/fastplotlib_utils.py b/examples/utils/fastplotlib_utils.py index 4a5deaa..8c6f3a8 100644 --- a/examples/utils/fastplotlib_utils.py +++ b/examples/utils/fastplotlib_utils.py @@ -94,11 +94,6 @@ def __init__(self, channels: dict[int, tuple[str, str]], max_value_count: int): self._figure.show(maintain_aspect=False) - def append_value(self, channel: int, value: float) -> tuple[float, float]: - """This function is call in context of background thread""" - self._data[channel].append_value(value) - - def _animation(self, figure: fpl.Figure): """This function is call in context of main thread""" for x in self._data.values(): diff --git a/examples/utils/pyplot_utils.py b/examples/utils/pyplot_utils.py index 88db843..dd3d0f4 100644 --- a/examples/utils/pyplot_utils.py +++ b/examples/utils/pyplot_utils.py @@ -90,10 +90,6 @@ def __init__(self, channels: dict[int, tuple[str, str]], max_value_count: int): plt.show(block=False) - def append_value(self, channel: int, value: float) -> tuple[float, float]: - self._data[channel].append_value(value) - - def update(self): plt.pause(0.0001) diff --git a/pyproject.toml b/pyproject.toml index c72dece..37248a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "science_mode_4" -version = "0.0.8" +version = "0.0.9" authors = [ { name="Marc Hofmann", email="marc-hofmann@gmx.de" }, ] diff --git a/src/science_mode_4/device.py b/src/science_mode_4/device.py index 64df172..0c1033c 100644 --- a/src/science_mode_4/device.py +++ b/src/science_mode_4/device.py @@ -2,6 +2,7 @@ from enum import IntEnum from typing import Type + from .layer import Layer from .protocol.types import StimStatus from .protocol.packet_factory import PacketFactory @@ -12,6 +13,7 @@ from .dyscom.dyscom_layer import LayerDyscom from .dyscom.dyscom_types import DyscomGetOperationModeType from .utils.connection import Connection +from .utils.packet_buffer import PacketBuffer class DeviceCapability(IntEnum): @@ -29,6 +31,7 @@ class Device(): def __init__(self, conn: Connection, capabilities: set[DeviceCapability]): self._connection = conn self._packet_factory = PacketFactory() + self._packet_buffer = PacketBuffer(self._connection, self._packet_factory) self._packet_number_generator = PacketNumberGenerator() self._capabilities = capabilities self._layer: dict[DeviceCapability, Layer] = {} @@ -51,6 +54,12 @@ def packet_factory(self) -> PacketFactory: return self._packet_factory + @property + def packet_buffer(self) -> PacketBuffer: + """Getter for packet buffer""" + return self._packet_buffer + + @property def packet_number_generator(self) -> PacketNumberGenerator: """Getter for packet number generator""" @@ -59,7 +68,7 @@ def packet_number_generator(self) -> PacketNumberGenerator: @property def capabilities(self) -> set[DeviceCapability]: - """Getter for capabilites""" + """Getter for capabilities""" return self._capabilities @@ -112,4 +121,4 @@ def add_layer(self, capability: DeviceCapability, layer: Layer): def _add_layer(self, capability: DeviceCapability, used_capabilities: set[DeviceCapability], layer_class: Type[Layer]): """Helper method that checks if capability is in used_capabilities and if yes add a layer_class instance""" if capability in used_capabilities: - self.add_layer(capability, layer_class(self._connection, self._packet_factory, self._packet_number_generator)) + self.add_layer(capability, layer_class(self._packet_buffer, self._packet_factory, self._packet_number_generator)) diff --git a/src/science_mode_4/dyscom/ads129x/ads129x_config_register_1.py b/src/science_mode_4/dyscom/ads129x/ads129x_config_register_1.py index 41c69d6..c97371c 100644 --- a/src/science_mode_4/dyscom/ads129x/ads129x_config_register_1.py +++ b/src/science_mode_4/dyscom/ads129x/ads129x_config_register_1.py @@ -53,6 +53,6 @@ def set_data(self, data: bytes): def get_data(self) -> bytes: """Convert information to bytes""" - tmp = ((self.power_mode << 7) | (self.read_mode << 6) | + tmp = ((self.power_mode << 7) | (self.read_mode << 6) | (self.clock_connection << 5) | (self.output_data_rate << 0)) return [tmp] diff --git a/src/science_mode_4/dyscom/dyscom_helper.py b/src/science_mode_4/dyscom/dyscom_helper.py index 1452341..0c201ba 100644 --- a/src/science_mode_4/dyscom/dyscom_helper.py +++ b/src/science_mode_4/dyscom/dyscom_helper.py @@ -35,7 +35,7 @@ def bytes_to_datetime(data: bytes) -> datetime.datetime: hour, dst, day, minute, month, second, _, _, year_since_1900 = \ DyscomHelper._unpack_func(data) - # ToDo: check dst + # do we need to consider dst? result = datetime.datetime(1900 + year_since_1900, month, day, hour, minute, second) return result diff --git a/src/science_mode_4/dyscom/dyscom_layer.py b/src/science_mode_4/dyscom/dyscom_layer.py index de00eb4..1695461 100644 --- a/src/science_mode_4/dyscom/dyscom_layer.py +++ b/src/science_mode_4/dyscom/dyscom_layer.py @@ -1,4 +1,4 @@ -"""Provices low level layer""" +"""Provides low level layer""" from science_mode_4.layer import Layer from .dyscom_types import DyscomGetOperationModeType, DyscomPowerModuleType, DyscomPowerModulePowerType, DyscomSysType diff --git a/src/science_mode_4/dyscom/dyscom_send_measurement_meta_info.py b/src/science_mode_4/dyscom/dyscom_send_measurement_meta_info.py index c96e297..6ff445c 100644 --- a/src/science_mode_4/dyscom/dyscom_send_measurement_meta_info.py +++ b/src/science_mode_4/dyscom/dyscom_send_measurement_meta_info.py @@ -26,8 +26,8 @@ def __init__(self, data: bytes): if not data is None: self._init_params.set_data(data[0:361]) self._file_name = DyscomHelper.bytes_to_str(data[361:421], 60) - self._file_size = int.from_bytes(data[421:429], 'big') - self._file_number = int.from_bytes(data[429:431], 'big') + self._file_size = int.from_bytes(data[421:429], "big") + self._file_number = int.from_bytes(data[429:431], "big") self._proband_name = DyscomHelper.bytes_to_str(data[431:468], 37) self._start_time = DyscomHelper.bytes_to_datetime(data[468:479]) self._duration = datetime.timedelta(seconds=int.from_bytes(data[479:483])) diff --git a/src/science_mode_4/general/general_layer.py b/src/science_mode_4/general/general_layer.py index d991170..60430f3 100644 --- a/src/science_mode_4/general/general_layer.py +++ b/src/science_mode_4/general/general_layer.py @@ -1,9 +1,9 @@ -"""Provices general layer""" +"""Provides general layer""" from science_mode_4.protocol.packet_number_generator import PacketNumberGenerator from science_mode_4.protocol.packet_factory import PacketFactory from science_mode_4.layer import Layer -from science_mode_4.utils.connection import Connection +from science_mode_4.utils.packet_buffer import PacketBuffer from .general_reset import PacketGeneralReset, PacketGeneralResetAck from .general_stim_status import GetStimStatusResult, PacketGeneralGetStimStatus, PacketGeneralGetStimStatusAck from .general_device_id import PacketGeneralGetDeviceId, PacketGeneralGetDeviceIdAck @@ -14,8 +14,8 @@ class LayerGeneral(Layer): """Class for general layer""" - def __init__(self, conn: Connection, packet_factory: PacketFactory, packet_number_generator: PacketNumberGenerator): - super().__init__(conn, packet_factory, packet_number_generator) + def __init__(self, packet_buffer: PacketBuffer, packet_factory: PacketFactory, packet_number_generator: PacketNumberGenerator): + super().__init__(packet_buffer, packet_factory, packet_number_generator) self._device_id: str | None = None self._firmware_version: str | None = None self._science_mode_version: str | None = None diff --git a/src/science_mode_4/layer.py b/src/science_mode_4/layer.py index 1aa8f96..da30a4d 100644 --- a/src/science_mode_4/layer.py +++ b/src/science_mode_4/layer.py @@ -5,7 +5,6 @@ from .protocol.packet import Packet, PacketAck from .protocol.packet_factory import PacketFactory from .protocol.packet_number_generator import PacketNumberGenerator -from .utils.connection import Connection from .utils.packet_buffer import PacketBuffer @@ -13,12 +12,10 @@ class Layer(): """Base class for all layers""" - def __init__(self, conn: Connection, packet_factory: PacketFactory, packet_number_generator: PacketNumberGenerator): - self._connection = conn + def __init__(self, packet_buffer: PacketBuffer, packet_factory: PacketFactory, packet_number_generator: PacketNumberGenerator): self._packet_factory = packet_factory self._packet_number_generator = packet_number_generator - - self._packet_buffer = PacketBuffer(conn, packet_factory) + self._packet_buffer = packet_buffer @property @@ -44,4 +41,4 @@ async def _send_packet_and_wait(self, packet: Packet) -> PacketAck: def _check_result_error(self, result_error: ResultAndError, packet_name: str): """Check if result_error contains an error and if yes prints packet_name""" if result_error != ResultAndError.NO_ERROR: - raise ValueError(f"Error {packet_name} {result_error}") + raise ValueError(f"Error {packet_name} {result_error.name}") diff --git a/src/science_mode_4/low_level/low_level_layer.py b/src/science_mode_4/low_level/low_level_layer.py index 6f898de..b68ad6b 100644 --- a/src/science_mode_4/low_level/low_level_layer.py +++ b/src/science_mode_4/low_level/low_level_layer.py @@ -1,10 +1,7 @@ -"""Provices low level layer""" +"""Provides low level layer""" from science_mode_4.protocol.channel_point import ChannelPoint from science_mode_4.protocol.types import Channel, Connector -from science_mode_4.protocol.packet_number_generator import PacketNumberGenerator -from science_mode_4.protocol.packet_factory import PacketFactory -from science_mode_4.utils.connection import Connection from science_mode_4.utils.packet_buffer import PacketBuffer from science_mode_4.layer import Layer from .low_level_channel_config import PacketLowLevelChannelConfig @@ -20,11 +17,6 @@ class LayerLowLevel(Layer): """ - def __init__(self, conn: Connection, packet_factory: PacketFactory, packet_number_generator: PacketNumberGenerator): - super().__init__(conn, packet_factory, packet_number_generator) - self._packet_buffer = PacketBuffer(conn, packet_factory) - - @property def packet_buffer(self) -> PacketBuffer: """Getter for packet buffer""" diff --git a/src/science_mode_4/mid_level/mid_level_layer.py b/src/science_mode_4/mid_level/mid_level_layer.py index a7b3e0d..36d336f 100644 --- a/src/science_mode_4/mid_level/mid_level_layer.py +++ b/src/science_mode_4/mid_level/mid_level_layer.py @@ -1,4 +1,4 @@ -"""Provices mid level layer""" +"""Provides mid level layer""" from science_mode_4.layer import Layer from .mid_level_current_data import PacketMidLevelGetCurrentData, PacketMidLevelGetCurrentDataAck diff --git a/src/science_mode_4/utils/bit_vector.py b/src/science_mode_4/utils/bit_vector.py index 8c0bf46..f149c87 100644 --- a/src/science_mode_4/utils/bit_vector.py +++ b/src/science_mode_4/utils/bit_vector.py @@ -1,4 +1,4 @@ -"""Provices a simple BitVector class""" +"""Provides a simple BitVector class""" class BitVector(): @@ -52,8 +52,7 @@ def __len__(self) -> int: def __iter__(self): - for x in self._data: - yield x + yield from self._data def set_length(self, new_length: int): @@ -68,7 +67,7 @@ def set_length(self, new_length: int): def extend(self, value: "BitVector"): """Extends current data with value""" if isinstance(value, BitVector): - self._data += value._data + self._data += value._data # pylint: disable=protected-access def get_bytes(self) -> bytes: diff --git a/src/science_mode_4/utils/usb_connection.py b/src/science_mode_4/utils/usb_connection.py index abe0748..09b5c33 100644 --- a/src/science_mode_4/utils/usb_connection.py +++ b/src/science_mode_4/utils/usb_connection.py @@ -1,72 +1,72 @@ """Provides a class for a usb connection""" -import usb.core -import usb.util +# import usb.core +# import usb.util from .connection import Connection class UsbConnection(Connection): """USB connection class, - IMPORTANT: work in progress (there driver issues under windows)""" + IMPORTANT: work in progress (there are driver issues under windows)""" - @staticmethod - def list_devices() -> list[usb.core.Device]: - """Returns all USB devices""" - return list(usb.core.find(find_all=True)) + # @staticmethod + # def list_devices() -> list[usb.core.Device]: + # """Returns all USB devices""" + # return list(usb.core.find(find_all=True)) - @staticmethod - def list_science_mode_devices() -> list[usb.core.Device]: - """Returns all potential science mode USB devices""" - devices = UsbConnection.list_devices() - # science mode devices (P24/I24) have an STM32 mcu and these are - # default values for USB CDC devices - filtered_devices = list(filter(lambda x: x.idVendor == 0x0483 and x.idProduct == 0x5740 and - x.bDeviceClass == 0x02, devices)) - return filtered_devices + # @staticmethod + # def list_science_mode_devices() -> list[usb.core.Device]: + # """Returns all potential science mode USB devices""" + # devices = UsbConnection.list_devices() + # # science mode devices (P24/I24) have an STM32 mcu and these are + # # default values for USB CDC devices + # filtered_devices = list(filter(lambda x: x.idVendor == 0x0483 and x.idProduct == 0x5740 and + # x.bDeviceClass == 0x02, devices)) + # return filtered_devices - def __init__(self, device: usb.core.Device): - self._device = device - self._out_endpoint = None - self._in_endpoint = None - self._is_open = False + # def __init__(self, device: usb.core.Device): + # self._device = device + # self._out_endpoint = None + # self._in_endpoint = None + # self._is_open = False - def open(self): - # P24/I24 have only one configuration - self._device.set_configuration() - # get an endpoint instance - cfg = self._device.get_active_configuration() - intf = cfg[(0,0)] + # def open(self): + # # P24/I24 have only one configuration + # self._device.set_configuration() + # # get an endpoint instance + # cfg = self._device.get_active_configuration() + # intf = cfg[(0,0)] - # match the first OUT endpoint - self._out_endpoint = usb.util.find_descriptor( - intf, - custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT) - # match the first IN endpoint - self._in_endpoint = usb.util.find_descriptor( - intf, - custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN) + # # match the first OUT endpoint + # self._out_endpoint = usb.util.find_descriptor( + # intf, + # custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT) + # # match the first IN endpoint + # self._in_endpoint = usb.util.find_descriptor( + # intf, + # custom_match = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN) - self._is_open = True + # self._is_open = True - def close(self): - self._is_open = False + # def close(self): + # self._is_open = False - def is_open(self) -> bool: - return self._is_open + # def is_open(self) -> bool: + # return self._is_open - def write(self, data: bytes): - self._out_endpoint.write(data) + # def write(self, data: bytes): + # self._out_endpoint.write(data) - def read(self) -> bytes: - return self._in_endpoint.read() + # def read(self) -> bytes: + # return self._in_endpoint.read() - def clear_buffer(self): - pass + # def clear_buffer(self): + # pass