From 16a49bb89b97aa54b1af8ebfef58b5a95ffd8734 Mon Sep 17 00:00:00 2001 From: silamon Date: Thu, 13 Mar 2025 16:14:01 +0100 Subject: [PATCH 01/10] Async serial implementation --- setup.cfg | 1 + src/velbustcp/__main__.py | 34 +++--- src/velbustcp/lib/connection/bridge.py | 15 +-- .../lib/connection/newserial/__init__.py | 0 src/velbustcp/lib/connection/newserial/bus.py | 102 ++++++++++++++++++ .../lib/connection/newserial/factory.py | 26 +++++ .../connection/newserial/serialprotocol.py | 31 ++++++ .../lib/connection/newserial/writerthread.py | 61 +++++++++++ 8 files changed, 249 insertions(+), 21 deletions(-) create mode 100644 src/velbustcp/lib/connection/newserial/__init__.py create mode 100644 src/velbustcp/lib/connection/newserial/bus.py create mode 100644 src/velbustcp/lib/connection/newserial/factory.py create mode 100644 src/velbustcp/lib/connection/newserial/serialprotocol.py create mode 100644 src/velbustcp/lib/connection/newserial/writerthread.py diff --git a/setup.cfg b/setup.cfg index 4a71216..bf23703 100644 --- a/setup.cfg +++ b/setup.cfg @@ -9,6 +9,7 @@ packages = find_namespace: install_requires = pyserial==3.5 blinker==1.6.1 + pyserial-asyncio-fast==0.14 python_requires = >=3.8 package_dir = =src diff --git a/src/velbustcp/__main__.py b/src/velbustcp/__main__.py index 0ee22e6..56e69c6 100644 --- a/src/velbustcp/__main__.py +++ b/src/velbustcp/__main__.py @@ -2,9 +2,10 @@ import json from threading import Event import sys +import asyncio from velbustcp.lib.connection.bridge import Bridge -from velbustcp.lib.connection.serial.bus import Bus +from velbustcp.lib.connection.newserial.bus import Bus from velbustcp.lib.connection.tcp.network import Network from velbustcp.lib.connection.tcp.networkmanager import NetworkManager from velbustcp.lib.settings.settings import validate_and_set_settings @@ -33,31 +34,30 @@ def __init__(self): network_manager.add_network(network) self.__bridge = Bridge(bus, network_manager) - self.__bridge.start() + + async def start(self): + """Starts the bridge.""" + await self.__bridge.start() + + async def stop(self): + """Stops the bridge.""" + await self.__bridge.stop() def main_loop(self): """Main loop for the program, blocks infinitely until it receives a KeyboardInterrupt. """ - q = Event() q.wait() - def stop(self): - """Stops bridge. - """ - self.__bridge.stop() - - -def main(args=None): - """Main method.""" +async def main_async(args=None): + """Main asynchronous method.""" parser = argparse.ArgumentParser(description="Velbus communication") parser.add_argument("--settings", help="Settings file", required=False) args = parser.parse_args() # If settings are supplied, read and validate them if args.settings: - # Open settings file with open(args.settings, 'r') as f: settings = json.load(f) @@ -72,6 +72,7 @@ def main(args=None): main = Main() try: + await main.start() main.main_loop() except KeyboardInterrupt: @@ -81,10 +82,15 @@ def main(args=None): logger.exception(e) finally: - main.stop() + await main.stop() logger.info("Shutdown") +def main(args=None): + """Main method.""" + asyncio.run(main_async(args)) + + if __name__ == '__main__': - sys.exit(main()) + sys.exit(main()) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/bridge.py b/src/velbustcp/lib/connection/bridge.py index ce41a5e..4077df4 100644 --- a/src/velbustcp/lib/connection/bridge.py +++ b/src/velbustcp/lib/connection/bridge.py @@ -1,6 +1,7 @@ -from velbustcp.lib.connection.serial.bus import Bus +from velbustcp.lib.connection.newserial.bus import Bus from velbustcp.lib.connection.tcp.networkmanager import NetworkManager from velbustcp.lib.signals import on_bus_receive, on_bus_send, on_tcp_receive +import asyncio class Bridge(): @@ -30,23 +31,23 @@ def handle_bus_send(sender, **kwargs): def handle_tcp_receive(sender, **kwargs): packet = kwargs["packet"] - self.__bus.send(packet) + asyncio.create_task(self.__bus.send(packet)) self.handle_tcp_receive = handle_tcp_receive on_tcp_receive.connect(handle_tcp_receive) self.__bus: Bus = bus self.__network_manager: NetworkManager = network_manager - def start(self) -> None: + async def start(self) -> None: """Starts bus and TCP network(s). """ - self.__bus.ensure() + await self.__bus.ensure() self.__network_manager.start() - def stop(self) -> None: + async def stop(self) -> None: """Stops NTP, bus and network. """ - self.__bus.stop() - self.__network_manager.stop() + await self.__bus.stop() + self.__network_manager.stop() \ No newline at end of file diff --git a/src/velbustcp/lib/connection/newserial/__init__.py b/src/velbustcp/lib/connection/newserial/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/velbustcp/lib/connection/newserial/bus.py b/src/velbustcp/lib/connection/newserial/bus.py new file mode 100644 index 0000000..13c92e9 --- /dev/null +++ b/src/velbustcp/lib/connection/newserial/bus.py @@ -0,0 +1,102 @@ +import asyncio +import serial_asyncio_fast +import logging +from velbustcp.lib.packet.handlers.busstatus import BusStatus +from velbustcp.lib.settings.serial import SerialSettings +from velbustcp.lib.connection.newserial.factory import set_serial_settings, find_port +from velbustcp.lib.connection.newserial.serialprotocol import VelbusSerialProtocol +from velbustcp.lib.connection.newserial.writerthread import WriterThread +from velbustcp.lib.signals import on_bus_receive, on_bus_fault + + +class Bus: + def __init__(self, options: SerialSettings): + """Initialises a bus connection.""" + self.__logger = logging.getLogger("__main__." + __name__) + self.__options = options + self.__bus_status: BusStatus = BusStatus() + self.__do_reconnect: bool = False + self.__connected: bool = False + + on_bus_receive.connect(self.handle_on_bus_receive) + on_bus_fault.connect(self.handle_on_bus_fault) + + async def __reconnect(self): + """Reconnects until active.""" + self.__logger.info("Attempting to connect") + while self.__do_reconnect and not self.is_active(): + try: + await self.__start() + except Exception: + self.__logger.exception("Couldn't create bus connection, waiting 5 seconds") + await asyncio.sleep(5) + + def is_active(self) -> bool: + """Returns whether or not the serial connection is active.""" + return self.__connected + + async def ensure(self): + """Ensures that a connection with the bus is established.""" + if self.is_active() or self.__do_reconnect: + return + self.__do_reconnect = True + self.__reconnect_task = asyncio.create_task(self.__reconnect()) + await self.__reconnect_task + + async def __start(self): + """Starts up the serial communication if the serial connection is not yet active.""" + if self.is_active(): + return + + self.__port = find_port(options=self.__options) + if not self.__port: + raise ValueError("Couldn't find a port to open communication on") + + settings = set_serial_settings() + self.__protocol = VelbusSerialProtocol() + self.__transport, _ = await serial_asyncio_fast.create_serial_connection( + asyncio.get_event_loop(), lambda: self.__protocol, url=self.__port, **settings + ) + self.__connected = True + + self.__writer = WriterThread(self.__transport) + asyncio.create_task(self.__writer.run()) + + self.__logger.info("Serial connection active on port %s", self.__port) + + async def stop(self): + """Stops the serial communication if the serial connection is active.""" + if not self.is_active(): + return + + self.__logger.info("Stopping serial connection") + self.__do_reconnect = False + self.__connected = False + + if self.__transport: + self.__transport.close() + + if self.__writer: + await self.__writer.close() + + async def send(self, packet: bytearray): + """Queues a packet to be sent on the serial connection.""" + if self.is_active(): + await self.__writer.queue(packet) + + def handle_on_bus_receive(self, sender, **kwargs): + old_state = self.__bus_status.alive + packet = kwargs["packet"] + self.__bus_status.receive_packet(packet) + + if old_state == self.__bus_status.alive: + return + + if self.__bus_status.active: + self.__writer.unlock() + else: + self.__writer.lock() + + def handle_on_bus_fault(self, sender, **kwargs): + asyncio.create_task(self.stop()) + asyncio.create_task(self.ensure()) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/newserial/factory.py b/src/velbustcp/lib/connection/newserial/factory.py new file mode 100644 index 0000000..33402f4 --- /dev/null +++ b/src/velbustcp/lib/connection/newserial/factory.py @@ -0,0 +1,26 @@ +import serial_asyncio_fast +from velbustcp.lib.settings.serial import SerialSettings +from velbustcp.lib.util.util import search_for_serial + + +def set_serial_settings() -> dict: + """Returns settings for a Serial object for use with the Velbus protocol.""" + return { + 'baudrate': 38400, + 'parity': serial_asyncio_fast.serial.PARITY_NONE, + 'stopbits': serial_asyncio_fast.serial.STOPBITS_ONE, + 'bytesize': serial_asyncio_fast.serial.EIGHTBITS, + 'xonxoff': 0, + 'timeout': None, + 'dsrdtr': 1, + 'rtscts': 0 + } + + + +def find_port(options: SerialSettings) -> str: + """Finds a port for the serial object.""" + if options.autodiscover: + ports = search_for_serial() + return next((port for port in ports), options.port) + return options.port \ No newline at end of file diff --git a/src/velbustcp/lib/connection/newserial/serialprotocol.py b/src/velbustcp/lib/connection/newserial/serialprotocol.py new file mode 100644 index 0000000..5413979 --- /dev/null +++ b/src/velbustcp/lib/connection/newserial/serialprotocol.py @@ -0,0 +1,31 @@ +import asyncio +import logging + +from velbustcp.lib.packet.packetparser import PacketParser +from velbustcp.lib.signals import on_bus_receive, on_bus_fault + + +class VelbusSerialProtocol(asyncio.Protocol): + """Velbus serial protocol.""" + + def __init__(self): + self.__logger = logging.getLogger("__main__." + __name__) + self.__parser = PacketParser() + + def connection_made(self, transport): + self.transport = transport + + def data_received(self, data: bytes): + """Called upon serial data receive.""" + if data: + packets = self.__parser.feed(bytearray(data)) + for packet in packets: + if self.__logger.isEnabledFor(logging.DEBUG): + self.__logger.debug("[BUS IN] %s", " ".join(hex(x) for x in packet)) + on_bus_receive.send(self, packet=packet) + + def connection_lost(self, exc): + self.__logger.error("Connection lost") + if exc: + self.__logger.exception(exc) + on_bus_fault.send(self) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/newserial/writerthread.py b/src/velbustcp/lib/connection/newserial/writerthread.py new file mode 100644 index 0000000..6e6ca6c --- /dev/null +++ b/src/velbustcp/lib/connection/newserial/writerthread.py @@ -0,0 +1,61 @@ +import asyncio +from collections import deque +from typing import Deque +import logging + +from velbustcp.lib import consts +from velbustcp.lib.signals import on_bus_send + +class WriterThread: + def __init__(self, serial_instance: asyncio.StreamWriter): + self.alive: bool = True + self.__serial = serial_instance + self.__logger = logging.getLogger("__main__." + __name__) + self.__send_buffer: Deque[bytearray] = deque() + self.__serial_lock = asyncio.Lock() + self.__locked = False + + async def close(self): + """Stop the writer thread""" + self.alive = False + + async def queue(self, packet: bytearray): + self.__send_buffer.append(packet) + + async def run(self): + """Coroutine to safely write to the serial port with a delay.""" + last_send_time = asyncio.get_event_loop().time() + + try: + while self.alive: + if not self.__send_buffer or self.__locked: + await asyncio.sleep(0.1) + continue + + packet = self.__send_buffer.popleft() + + delta_time = asyncio.get_event_loop().time() - last_send_time + if delta_time < consts.SEND_DELAY: + await asyncio.sleep(consts.SEND_DELAY - delta_time) + + async with self.__serial_lock: + try: + if self.__logger.isEnabledFor(logging.DEBUG): + self.__logger.debug("[BUS OUT] %s", " ".join(hex(x) for x in packet)) + + self.__serial.write(packet) + on_bus_send.send(self, packet=packet) + except Exception as e: + self.__logger.exception(e) + + last_send_time = asyncio.get_event_loop().time() + except e: + self.__logger.info("Writer thread cancelled") + + def lock(self): + """Locks the writer thread to prevent sending packets.""" + self.__locked = True + + def unlock(self): + """Unlocks the writer thread to allow sending packets.""" + self.__locked = False \ No newline at end of file From 699d559166d79e11254217132c392f5d42bf0b26 Mon Sep 17 00:00:00 2001 From: silamon Date: Fri, 14 Mar 2025 11:34:15 +0100 Subject: [PATCH 02/10] Make tcp asyncio as well --- src/velbustcp/__main__.py | 10 +- src/velbustcp/lib/connection/bridge.py | 12 +- .../lib/connection/newserial/__init__.py | 0 src/velbustcp/lib/connection/newserial/bus.py | 102 -------- .../lib/connection/newserial/factory.py | 26 -- .../connection/newserial/serialprotocol.py | 31 --- .../lib/connection/newserial/writerthread.py | 61 ----- src/velbustcp/lib/connection/serial/bus.py | 152 ++++-------- .../lib/connection/serial/factory.py | 61 ++--- .../lib/connection/serial/serialprotocol.py | 32 +-- .../lib/connection/serial/writerthread.py | 99 +++----- src/velbustcp/lib/connection/tcp/client.py | 67 ++--- .../lib/connection/tcp/clientconnection.py | 9 +- src/velbustcp/lib/connection/tcp/network.py | 233 ++++-------------- .../lib/connection/tcp/networkmanager.py | 19 +- src/velbustcp/lib/signals.py | 2 +- src/velbustcp/lib/util/util.py | 2 +- 17 files changed, 214 insertions(+), 704 deletions(-) delete mode 100644 src/velbustcp/lib/connection/newserial/__init__.py delete mode 100644 src/velbustcp/lib/connection/newserial/bus.py delete mode 100644 src/velbustcp/lib/connection/newserial/factory.py delete mode 100644 src/velbustcp/lib/connection/newserial/serialprotocol.py delete mode 100644 src/velbustcp/lib/connection/newserial/writerthread.py diff --git a/src/velbustcp/__main__.py b/src/velbustcp/__main__.py index 56e69c6..1ddc100 100644 --- a/src/velbustcp/__main__.py +++ b/src/velbustcp/__main__.py @@ -5,7 +5,7 @@ import asyncio from velbustcp.lib.connection.bridge import Bridge -from velbustcp.lib.connection.newserial.bus import Bus +from velbustcp.lib.connection.serial.bus import Bus from velbustcp.lib.connection.tcp.network import Network from velbustcp.lib.connection.tcp.networkmanager import NetworkManager from velbustcp.lib.settings.settings import validate_and_set_settings @@ -86,11 +86,5 @@ async def main_async(args=None): logger.info("Shutdown") - -def main(args=None): - """Main method.""" - asyncio.run(main_async(args)) - - if __name__ == '__main__': - sys.exit(main()) \ No newline at end of file + asyncio.run(main_async()) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/bridge.py b/src/velbustcp/lib/connection/bridge.py index 4077df4..53a9c4f 100644 --- a/src/velbustcp/lib/connection/bridge.py +++ b/src/velbustcp/lib/connection/bridge.py @@ -1,4 +1,4 @@ -from velbustcp.lib.connection.newserial.bus import Bus +from velbustcp.lib.connection.serial.bus import Bus from velbustcp.lib.connection.tcp.networkmanager import NetworkManager from velbustcp.lib.signals import on_bus_receive, on_bus_send, on_tcp_receive import asyncio @@ -19,13 +19,13 @@ def __init__(self, bus: Bus, network_manager: NetworkManager): def handle_bus_receive(sender, **kwargs): packet = kwargs["packet"] - self.__network_manager.send(packet) + asyncio.create_task(self.__network_manager.send(packet)) self.handle_bus_receive = handle_bus_receive on_bus_receive.connect(handle_bus_receive) def handle_bus_send(sender, **kwargs): packet = kwargs["packet"] - self.__network_manager.send(packet) + asyncio.create_task(self.__network_manager.send(packet)) self.handle_bus_send = handle_bus_send on_bus_send.connect(handle_bus_send) @@ -43,11 +43,11 @@ async def start(self) -> None: """ await self.__bus.ensure() - self.__network_manager.start() + await self.__network_manager.start() async def stop(self) -> None: """Stops NTP, bus and network. """ - await self.__bus.stop() - self.__network_manager.stop() \ No newline at end of file + await self.__network_manager.stop() + await self.__bus.stop() \ No newline at end of file diff --git a/src/velbustcp/lib/connection/newserial/__init__.py b/src/velbustcp/lib/connection/newserial/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/velbustcp/lib/connection/newserial/bus.py b/src/velbustcp/lib/connection/newserial/bus.py deleted file mode 100644 index 13c92e9..0000000 --- a/src/velbustcp/lib/connection/newserial/bus.py +++ /dev/null @@ -1,102 +0,0 @@ -import asyncio -import serial_asyncio_fast -import logging -from velbustcp.lib.packet.handlers.busstatus import BusStatus -from velbustcp.lib.settings.serial import SerialSettings -from velbustcp.lib.connection.newserial.factory import set_serial_settings, find_port -from velbustcp.lib.connection.newserial.serialprotocol import VelbusSerialProtocol -from velbustcp.lib.connection.newserial.writerthread import WriterThread -from velbustcp.lib.signals import on_bus_receive, on_bus_fault - - -class Bus: - def __init__(self, options: SerialSettings): - """Initialises a bus connection.""" - self.__logger = logging.getLogger("__main__." + __name__) - self.__options = options - self.__bus_status: BusStatus = BusStatus() - self.__do_reconnect: bool = False - self.__connected: bool = False - - on_bus_receive.connect(self.handle_on_bus_receive) - on_bus_fault.connect(self.handle_on_bus_fault) - - async def __reconnect(self): - """Reconnects until active.""" - self.__logger.info("Attempting to connect") - while self.__do_reconnect and not self.is_active(): - try: - await self.__start() - except Exception: - self.__logger.exception("Couldn't create bus connection, waiting 5 seconds") - await asyncio.sleep(5) - - def is_active(self) -> bool: - """Returns whether or not the serial connection is active.""" - return self.__connected - - async def ensure(self): - """Ensures that a connection with the bus is established.""" - if self.is_active() or self.__do_reconnect: - return - self.__do_reconnect = True - self.__reconnect_task = asyncio.create_task(self.__reconnect()) - await self.__reconnect_task - - async def __start(self): - """Starts up the serial communication if the serial connection is not yet active.""" - if self.is_active(): - return - - self.__port = find_port(options=self.__options) - if not self.__port: - raise ValueError("Couldn't find a port to open communication on") - - settings = set_serial_settings() - self.__protocol = VelbusSerialProtocol() - self.__transport, _ = await serial_asyncio_fast.create_serial_connection( - asyncio.get_event_loop(), lambda: self.__protocol, url=self.__port, **settings - ) - self.__connected = True - - self.__writer = WriterThread(self.__transport) - asyncio.create_task(self.__writer.run()) - - self.__logger.info("Serial connection active on port %s", self.__port) - - async def stop(self): - """Stops the serial communication if the serial connection is active.""" - if not self.is_active(): - return - - self.__logger.info("Stopping serial connection") - self.__do_reconnect = False - self.__connected = False - - if self.__transport: - self.__transport.close() - - if self.__writer: - await self.__writer.close() - - async def send(self, packet: bytearray): - """Queues a packet to be sent on the serial connection.""" - if self.is_active(): - await self.__writer.queue(packet) - - def handle_on_bus_receive(self, sender, **kwargs): - old_state = self.__bus_status.alive - packet = kwargs["packet"] - self.__bus_status.receive_packet(packet) - - if old_state == self.__bus_status.alive: - return - - if self.__bus_status.active: - self.__writer.unlock() - else: - self.__writer.lock() - - def handle_on_bus_fault(self, sender, **kwargs): - asyncio.create_task(self.stop()) - asyncio.create_task(self.ensure()) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/newserial/factory.py b/src/velbustcp/lib/connection/newserial/factory.py deleted file mode 100644 index 33402f4..0000000 --- a/src/velbustcp/lib/connection/newserial/factory.py +++ /dev/null @@ -1,26 +0,0 @@ -import serial_asyncio_fast -from velbustcp.lib.settings.serial import SerialSettings -from velbustcp.lib.util.util import search_for_serial - - -def set_serial_settings() -> dict: - """Returns settings for a Serial object for use with the Velbus protocol.""" - return { - 'baudrate': 38400, - 'parity': serial_asyncio_fast.serial.PARITY_NONE, - 'stopbits': serial_asyncio_fast.serial.STOPBITS_ONE, - 'bytesize': serial_asyncio_fast.serial.EIGHTBITS, - 'xonxoff': 0, - 'timeout': None, - 'dsrdtr': 1, - 'rtscts': 0 - } - - - -def find_port(options: SerialSettings) -> str: - """Finds a port for the serial object.""" - if options.autodiscover: - ports = search_for_serial() - return next((port for port in ports), options.port) - return options.port \ No newline at end of file diff --git a/src/velbustcp/lib/connection/newserial/serialprotocol.py b/src/velbustcp/lib/connection/newserial/serialprotocol.py deleted file mode 100644 index 5413979..0000000 --- a/src/velbustcp/lib/connection/newserial/serialprotocol.py +++ /dev/null @@ -1,31 +0,0 @@ -import asyncio -import logging - -from velbustcp.lib.packet.packetparser import PacketParser -from velbustcp.lib.signals import on_bus_receive, on_bus_fault - - -class VelbusSerialProtocol(asyncio.Protocol): - """Velbus serial protocol.""" - - def __init__(self): - self.__logger = logging.getLogger("__main__." + __name__) - self.__parser = PacketParser() - - def connection_made(self, transport): - self.transport = transport - - def data_received(self, data: bytes): - """Called upon serial data receive.""" - if data: - packets = self.__parser.feed(bytearray(data)) - for packet in packets: - if self.__logger.isEnabledFor(logging.DEBUG): - self.__logger.debug("[BUS IN] %s", " ".join(hex(x) for x in packet)) - on_bus_receive.send(self, packet=packet) - - def connection_lost(self, exc): - self.__logger.error("Connection lost") - if exc: - self.__logger.exception(exc) - on_bus_fault.send(self) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/newserial/writerthread.py b/src/velbustcp/lib/connection/newserial/writerthread.py deleted file mode 100644 index 6e6ca6c..0000000 --- a/src/velbustcp/lib/connection/newserial/writerthread.py +++ /dev/null @@ -1,61 +0,0 @@ -import asyncio -from collections import deque -from typing import Deque -import logging - -from velbustcp.lib import consts -from velbustcp.lib.signals import on_bus_send - -class WriterThread: - def __init__(self, serial_instance: asyncio.StreamWriter): - self.alive: bool = True - self.__serial = serial_instance - self.__logger = logging.getLogger("__main__." + __name__) - self.__send_buffer: Deque[bytearray] = deque() - self.__serial_lock = asyncio.Lock() - self.__locked = False - - async def close(self): - """Stop the writer thread""" - self.alive = False - - async def queue(self, packet: bytearray): - self.__send_buffer.append(packet) - - async def run(self): - """Coroutine to safely write to the serial port with a delay.""" - last_send_time = asyncio.get_event_loop().time() - - try: - while self.alive: - if not self.__send_buffer or self.__locked: - await asyncio.sleep(0.1) - continue - - packet = self.__send_buffer.popleft() - - delta_time = asyncio.get_event_loop().time() - last_send_time - if delta_time < consts.SEND_DELAY: - await asyncio.sleep(consts.SEND_DELAY - delta_time) - - async with self.__serial_lock: - try: - if self.__logger.isEnabledFor(logging.DEBUG): - self.__logger.debug("[BUS OUT] %s", " ".join(hex(x) for x in packet)) - - self.__serial.write(packet) - on_bus_send.send(self, packet=packet) - except Exception as e: - self.__logger.exception(e) - - last_send_time = asyncio.get_event_loop().time() - except e: - self.__logger.info("Writer thread cancelled") - - def lock(self): - """Locks the writer thread to prevent sending packets.""" - self.__locked = True - - def unlock(self): - """Unlocks the writer thread to allow sending packets.""" - self.__locked = False \ No newline at end of file diff --git a/src/velbustcp/lib/connection/serial/bus.py b/src/velbustcp/lib/connection/serial/bus.py index 8a041bb..0388427 100644 --- a/src/velbustcp/lib/connection/serial/bus.py +++ b/src/velbustcp/lib/connection/serial/bus.py @@ -1,152 +1,102 @@ -import serial -import serial.threaded -import serial.tools.list_ports -import threading +import asyncio +import serial_asyncio_fast import logging from velbustcp.lib.packet.handlers.busstatus import BusStatus from velbustcp.lib.settings.serial import SerialSettings -from velbustcp.lib.connection.serial.factory import construct_serial_obj, find_port +from velbustcp.lib.connection.serial.factory import set_serial_settings, find_port from velbustcp.lib.connection.serial.serialprotocol import VelbusSerialProtocol from velbustcp.lib.connection.serial.writerthread import WriterThread from velbustcp.lib.signals import on_bus_receive, on_bus_fault -class Bus(): - +class Bus: def __init__(self, options: SerialSettings): - """Initialises a bus connection. - - Args: - options (dict): The options used to configure the serial connection. - """ - - # Hook signals - def handle_on_bus_receive(sender, **kwargs): - old_state = self.__bus_status.alive - packet = kwargs["packet"] - self.__bus_status.receive_packet(packet) - - if old_state == self.__bus_status.alive: - return - - if self.__bus_status.active: - self.__writer.unlock() - else: - self.__writer.lock() - self.handle_on_bus_receive = handle_on_bus_receive - on_bus_receive.connect(handle_on_bus_receive) - - def handle_on_bus_fault(sender, **kwargs): - self.stop() - self.ensure() - self.handle_on_bus_fault = handle_on_bus_fault - on_bus_fault.connect(handle_on_bus_fault) - + """Initialises a bus connection.""" self.__logger = logging.getLogger("__main__." + __name__) - self.__reconnect_event = threading.Event() self.__options = options - self.__bus_status: BusStatus = BusStatus() - self.__do_reconnect: bool = False self.__connected: bool = False - def __reconnect(self) -> None: - """Reconnects until active. - """ + on_bus_receive.connect(self.handle_on_bus_receive) + on_bus_fault.connect(self.handle_on_bus_fault) + async def __reconnect(self): + """Reconnects until active.""" self.__logger.info("Attempting to connect") - while self.__do_reconnect and not self.is_active(): try: - self.__start() + await self.__start() except Exception: self.__logger.exception("Couldn't create bus connection, waiting 5 seconds") - self.__reconnect_event.clear() - self.__reconnect_event.wait(5) + await asyncio.sleep(5) def is_active(self) -> bool: - """Returns whether or not the serial connection is active. - - Returns: - bool: A boolean indicating whether or not the serial connection is active. - """ - + """Returns whether or not the serial connection is active.""" return self.__connected - def ensure(self) -> None: - """Ensures that a connection with the bus is established. - """ - - # Already active - if self.is_active(): - return - - # Already trying to connect - if self.__do_reconnect: + async def ensure(self): + """Ensures that a connection with the bus is established.""" + if self.is_active() or self.__do_reconnect: return - self.__do_reconnect = True + self.__reconnect_task = asyncio.create_task(self.__reconnect()) + await self.__reconnect_task - # Start reconnecting thread - _ = threading.Thread(target=self.__reconnect) - _.start() - - def __start(self) -> None: - """Starts up the serial communication if the serial connection is not yet active. - """ - + async def __start(self): + """Starts up the serial communication if the serial connection is not yet active.""" if self.is_active(): return self.__port = find_port(options=self.__options) - if not self.__port: raise ValueError("Couldn't find a port to open communication on") - serial_port = construct_serial_obj(self.__port) - - if not serial_port.isOpen(): - raise Exception("Couldn't open port {0}".format(self.__port)) - - # Now that we're connected, set connected state + settings = set_serial_settings() + self.__protocol = VelbusSerialProtocol() + self.__transport, _ = await serial_asyncio_fast.create_serial_connection( + asyncio.get_event_loop(), lambda: self.__protocol, url=self.__port, **settings + ) self.__connected = True - # Create reader thread - self.__reader = serial.threaded.ReaderThread(serial_port, VelbusSerialProtocol()) - self.__reader.start() - - # Create write thread - self.__writer = WriterThread(serial_port) - self.__writer.start() + self.__writer = WriterThread(self.__transport) + asyncio.create_task(self.__writer.run()) self.__logger.info("Serial connection active on port %s", self.__port) - def stop(self) -> None: - """Stops the serial communication if the serial connection is active. - """ - + async def stop(self): + """Stops the serial communication if the serial connection is active.""" if not self.is_active(): return self.__logger.info("Stopping serial connection") - self.__do_reconnect = False self.__connected = False - self.__reconnect_event.set() - if self.__reader and self.__reader.alive: - self.__reader.close() + if self.__transport: + self.__transport.close() - if self.__writer and self.__writer.alive: - self.__writer.close() + if self.__writer: + await self.__writer.close() - def send(self, packet: bytearray) -> None: - """Queues a packet to be sent on the serial connection. + async def send(self, packet: bytearray): + """Queues a packet to be sent on the serial connection.""" + if self.is_active(): + await self.__writer.queue(packet) - Args: - packet (bytearray): An id - """ + def handle_on_bus_receive(self, sender, **kwargs): + old_state = self.__bus_status.alive + packet = kwargs["packet"] + self.__bus_status.receive_packet(packet) - if self.is_active(): - self.__writer.queue(packet) + if old_state == self.__bus_status.alive: + return + + if self.__bus_status.active: + self.__writer.unlock() + else: + self.__writer.lock() + + def handle_on_bus_fault(self, sender, **kwargs): + asyncio.create_task(self.stop()) + asyncio.create_task(self.ensure()) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/serial/factory.py b/src/velbustcp/lib/connection/serial/factory.py index e825404..33402f4 100644 --- a/src/velbustcp/lib/connection/serial/factory.py +++ b/src/velbustcp/lib/connection/serial/factory.py @@ -1,57 +1,26 @@ -import serial - +import serial_asyncio_fast from velbustcp.lib.settings.serial import SerialSettings from velbustcp.lib.util.util import search_for_serial -def set_serial_settings(s: serial.Serial): - """Sets settings on a Serial object for use with the Velbus protcol - - Args: - s (serial.Serial): A serial object. - """ - - s.baudrate = 38400 - s.parity = serial.PARITY_NONE - s.stopbits = serial.STOPBITS_ONE - s.bytesize = serial.EIGHTBITS - s.xonxoff = 0 - s.timeout = None - s.dsrdtr = 1 - s.rtscts = 0 - - -def construct_serial_obj(port: str) -> serial.Serial: - """Constructs a serial object for use with the Velbus protocol. - - Args: - port (str): A port suitable for the serial object. - - Returns: - serial.Serial: A serial object. - """ +def set_serial_settings() -> dict: + """Returns settings for a Serial object for use with the Velbus protocol.""" + return { + 'baudrate': 38400, + 'parity': serial_asyncio_fast.serial.PARITY_NONE, + 'stopbits': serial_asyncio_fast.serial.STOPBITS_ONE, + 'bytesize': serial_asyncio_fast.serial.EIGHTBITS, + 'xonxoff': 0, + 'timeout': None, + 'dsrdtr': 1, + 'rtscts': 0 + } - s = serial.Serial(port) - set_serial_settings(s) - - return s def find_port(options: SerialSettings) -> str: - """[summary] - - Args: - options (SerialSettings): [description] - - Returns: - str: A port name - """ - - # If we need to autodiscover port + """Finds a port for the serial object.""" if options.autodiscover: ports = search_for_serial() - return next((port for port in ports), options.port) - - # No port found (or no autodiscover) - return options.port + return options.port \ No newline at end of file diff --git a/src/velbustcp/lib/connection/serial/serialprotocol.py b/src/velbustcp/lib/connection/serial/serialprotocol.py index 92e076c..5413979 100644 --- a/src/velbustcp/lib/connection/serial/serialprotocol.py +++ b/src/velbustcp/lib/connection/serial/serialprotocol.py @@ -1,43 +1,31 @@ -from typing import Any -import serial +import asyncio import logging from velbustcp.lib.packet.packetparser import PacketParser from velbustcp.lib.signals import on_bus_receive, on_bus_fault -class VelbusSerialProtocol(serial.threaded.Protocol): - """Velbus serial protocol. - """ +class VelbusSerialProtocol(asyncio.Protocol): + """Velbus serial protocol.""" def __init__(self): self.__logger = logging.getLogger("__main__." + __name__) self.__parser = PacketParser() - def __call__(self, *args: Any, **kwds: Any) -> Any: - return self + def connection_made(self, transport): + self.transport = transport def data_received(self, data: bytes): - """Called upon serial data receive. - - Args: - data (bytes): Data received from the serial bus. - """ - + """Called upon serial data receive.""" if data: packets = self.__parser.feed(bytearray(data)) - for packet in packets: - - if self.__logger.isEnabledFor(logging.DEBUG): # pragma: no cover - self.__logger.debug("[BUS IN] %s", " ".join(hex(x) for x in packet)) - + if self.__logger.isEnabledFor(logging.DEBUG): + self.__logger.debug("[BUS IN] %s", " ".join(hex(x) for x in packet)) on_bus_receive.send(self, packet=packet) - def connection_lost(self, exc: Exception): + def connection_lost(self, exc): self.__logger.error("Connection lost") - if exc: self.__logger.exception(exc) - - on_bus_fault.send(self) + on_bus_fault.send(self) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/serial/writerthread.py b/src/velbustcp/lib/connection/serial/writerthread.py index c5d5db2..f8480ef 100644 --- a/src/velbustcp/lib/connection/serial/writerthread.py +++ b/src/velbustcp/lib/connection/serial/writerthread.py @@ -1,88 +1,61 @@ +import asyncio from collections import deque -from serial import Serial -import threading -import time from typing import Deque import logging from velbustcp.lib import consts from velbustcp.lib.signals import on_bus_send - -class WriterThread(threading.Thread): - - def __init__(self, serial_instance: Serial): +class WriterThread: + def __init__(self, serial_instance: asyncio.StreamWriter): self.alive: bool = True - self.__serial: Serial = serial_instance + self.__serial = serial_instance self.__logger = logging.getLogger("__main__." + __name__) - self.__send_event: threading.Event = threading.Event() self.__send_buffer: Deque[bytearray] = deque() - self.__serial_lock: threading.Event = threading.Event() - self.unlock() - - threading.Thread.__init__(self) - - def close(self): - """Stop the reader thread""" - - if not self.alive: - return + self.__serial_lock = asyncio.Lock() + self.__locked = False + async def close(self): + """Stop the writer thread""" self.alive = False - self.__send_event.set() - self.join(2) - def queue(self, packet: bytearray): + async def queue(self, packet: bytearray): self.__send_buffer.append(packet) - self.__send_event.set() - - def lock(self) -> None: - """Locks the write thread. - """ - self.__serial_lock.clear() - - def unlock(self) -> None: - """Unlocks the write thread. - """ - self.__serial_lock.set() - - def run(self) -> None: - """Thread to safely write to the serial port with a delay. - """ - - last_send_time = time.monotonic() - - while self.alive and self.__serial.is_open: - self.__send_event.wait() - self.__send_event.clear() - # While we have packets to send - while len(self.__send_buffer) > 0: + async def run(self): + """Coroutine to safely write to the serial port with a delay.""" + last_send_time = asyncio.get_event_loop().time() - # Still connected? - if not self.alive or not self.__serial.is_open: - return + try: + while self.alive: + if not self.__send_buffer or self.__locked: + await asyncio.sleep(0.1) + continue packet = self.__send_buffer.popleft() - # Ensure that we don't write to the bus too fast - delta_time = time.monotonic() - last_send_time + delta_time = asyncio.get_event_loop().time() - last_send_time if delta_time < consts.SEND_DELAY: - time.sleep(consts.SEND_DELAY - delta_time) + await asyncio.sleep(consts.SEND_DELAY - delta_time) - # Wait for serial lock to be not set - self.__serial_lock.wait() + async with self.__serial_lock: + try: + if self.__logger.isEnabledFor(logging.DEBUG): + self.__logger.debug("[BUS OUT] %s", " ".join(hex(x) for x in packet)) - # Write packet and set new last send time - try: - if self.__logger.isEnabledFor(logging.DEBUG): - self.__logger.debug("[BUS OUT] %s", " ".join(hex(x) for x in packet)) + self.__serial.write(packet) + on_bus_send.send(self, packet=packet) + except Exception as e: + self.__logger.exception(e) - self.__serial.write(packet) - on_bus_send.send(self, packet=packet) + last_send_time = asyncio.get_event_loop().time() + except Exception as e: + self.__logger.info("Writer thread cancelled") - except Exception as e: - self.__logger.exception(e) - # self.__on_error() + def lock(self): + """Locks the writer thread to prevent sending packets.""" + self.__locked = True - last_send_time = time.monotonic() + def unlock(self): + """Unlocks the writer thread to allow sending packets.""" + self.__locked = False \ No newline at end of file diff --git a/src/velbustcp/lib/connection/tcp/client.py b/src/velbustcp/lib/connection/tcp/client.py index 3b313fd..d4418ca 100644 --- a/src/velbustcp/lib/connection/tcp/client.py +++ b/src/velbustcp/lib/connection/tcp/client.py @@ -1,14 +1,13 @@ import logging -import threading -import socket -from typing import Any, Optional, List +import asyncio +from typing import Any, List from velbustcp.lib.connection.tcp.clientconnection import ClientConnection from velbustcp.lib.packet.packetparser import PacketParser from velbustcp.lib.signals import on_tcp_receive, on_client_close -class Client(): +class Client: def __init__(self, connection: ClientConnection): """Initialises a network client. @@ -20,24 +19,29 @@ def __init__(self, connection: ClientConnection): self.__logger: logging.Logger = logging.getLogger("__main__." + __name__) self.__connection: ClientConnection = connection self.__is_active: bool = False - self.__address: str = connection.socket.getpeername() + self.__address: str = connection.writer.get_extra_info('peername') self.__received_packets: List[bytearray] = [] - def start(self) -> None: + async def start(self) -> None: """Starts receiving data from the client. """ - # Start a thread to handle receive if self.is_active(): return self.__is_active = True self.__logger.info("Starting client connection for %s", self.address()) - self.__receive_thread = threading.Thread(target=self.__handle_client) - self.__receive_thread.name = f"TCP-RECV: {self.address()}" - self.__receive_thread.start() - def stop(self) -> None: + if not await self.__handle_authorization(): + self.__logger.warning("Client authorization failed for %s", self.address()) + await self.stop() + return + + await self.__handle_packets() + + await self.stop() + + async def stop(self) -> None: """Stops receiving data and disconnects from the client. """ @@ -46,12 +50,12 @@ def stop(self) -> None: self.__is_active = False self.__logger.info("Closing client connection for %s", self.address()) - self.__connection.socket.shutdown(socket.SHUT_RDWR) - self.__connection.socket.close() + self.__connection.writer.close() + await self.__connection.writer.wait_closed() self.__received_packets.clear() on_client_close.send(self) - def send(self, data: bytearray): + async def send(self, data: bytearray) -> None: """Sends data to the client. Args: @@ -65,7 +69,8 @@ def send(self, data: bytearray): self.__received_packets.remove(data) return - self.__connection.socket.sendall(data) + self.__connection.writer.write(data) + await self.__connection.writer.drain() def is_active(self) -> bool: """Returns whether the client is active for communication. @@ -86,23 +91,7 @@ def address(self) -> Any: return self.__address - def __handle_client(self) -> None: - """Bootstraps the client for communcation. - """ - - # Handle authorization, if not authorized stop client and return - if not self.__handle_authorization(): - self.__logger.warning("Client authorization failed for %s", self.address()) - self.stop() - return - - # Handle packets - self.__handle_packets() - - # Make sure client communication is stopped - self.stop() - - def __handle_authorization(self) -> bool: + async def __handle_authorization(self) -> bool: """Handles client authorization. Returns: @@ -113,7 +102,7 @@ def __handle_authorization(self) -> bool: return True try: - data = self.__connection.socket.recv(1024) + data = await self.__connection.reader.read(1024) if not data: self.__logger.warning("Client %s disconnected before receiving authorization key", self.address()) @@ -126,25 +115,19 @@ def __handle_authorization(self) -> bool: return False - def __handle_packets(self) -> None: + async def __handle_packets(self) -> None: """Receives packet until client is no longer active. """ parser = PacketParser() - # Receive data while self.is_active(): - - data: Optional[bytes] = None - try: - data = self.__connection.socket.recv(1024) + data = await self.__connection.reader.read(1024) except Exception: self.__logger.exception("Exception during packet receiving") return - # If no data received from the socket, the client disconnected - # Break out of the loop if not data: self.__logger.info("Received no data from client %s", self.address()) return @@ -153,4 +136,4 @@ def __handle_packets(self) -> None: for packet in packets: self.__received_packets.append(packet) - on_tcp_receive.send(self, packet=packet) + on_tcp_receive.send(self, packet=packet) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/tcp/clientconnection.py b/src/velbustcp/lib/connection/tcp/clientconnection.py index 48f797f..ed3b00f 100644 --- a/src/velbustcp/lib/connection/tcp/clientconnection.py +++ b/src/velbustcp/lib/connection/tcp/clientconnection.py @@ -1,10 +1,11 @@ -import socket +import asyncio -class ClientConnection(): +class ClientConnection: """Represents an incoming client connection. """ - socket: socket.socket + reader: asyncio.StreamReader + writer: asyncio.StreamWriter should_authorize: bool = False - authorization_key: str = "" + authorization_key: str = "" \ No newline at end of file diff --git a/src/velbustcp/lib/connection/tcp/network.py b/src/velbustcp/lib/connection/tcp/network.py index 19f89f1..3f7195b 100644 --- a/src/velbustcp/lib/connection/tcp/network.py +++ b/src/velbustcp/lib/connection/tcp/network.py @@ -1,8 +1,6 @@ -import threading -import socket +import asyncio import ssl import logging -import platform from typing import List, Optional from velbustcp.lib.connection.tcp.client import Client from velbustcp.lib.connection.tcp.clientconnection import ClientConnection @@ -10,15 +8,21 @@ from velbustcp.lib.signals import on_client_close -class Network(): +class Network: def __init__(self, options: NetworkSettings): """Initialises a TCP network. Args: - options (dict): The options used to configure the network. + options (NetworkSettings): The options used to configure the network. """ + self.__logger: logging.Logger = logging.getLogger("__main__." + __name__) + self.__clients: List[Client] = [] + self.__options: NetworkSettings = options + self.__context: Optional[ssl.SSLContext] = None + self.__server: Optional[asyncio.AbstractServer] = None + # Hook up signal def handle_client_close(sender: Client, **kwargs): self.__logger.info("TCP connection closed %s", sender.address()) @@ -26,201 +30,68 @@ def handle_client_close(sender: Client, **kwargs): if sender not in self.__clients: return - with self.__clients_lock: - self.__clients.remove(sender) + self.__clients.remove(sender) self.handle_client_close = handle_client_close on_client_close.connect(handle_client_close) - self.__logger: logging.Logger = logging.getLogger("__main__." + __name__) - self.__clients: List[Client] = [] - self.__clients_lock: threading.Lock = threading.Lock() - # Indicate that the server should be terminated - # (but does not necessarily indicate that it has been terminated). - self.__stop: threading.Event = threading.Event() - self.__stop.set() - - self.__bind_socket: Optional[socket.socket] = None - # Synchronizes `__bind_socket` modifications - self.__socket_lock: threading.Lock = threading.Lock() - - self.__context: Optional[ssl.SSLContext] = None - self.__options: NetworkSettings = options - - def send(self, data: bytearray) -> None: - """Sends given data to all connected clients to the network. - - Args: - data (bytearray): Specifies the packet to send to the connected clients of this network. + async def start(self) -> None: + """Starts up the TCP server """ - if not self.is_active(): - return + if self.__options.ssl: + self.__context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + self.__context.load_cert_chain(self.__options.cert, keyfile=self.__options.pk) - if not self.__options.relay: - return + self.__server = await asyncio.start_server( + self.__handle_client, + self.__options.host, + self.__options.port, + ssl=self.__context + ) - if self.__logger.isEnabledFor(logging.DEBUG): # pragma: no cover - self.__logger.debug("[TCP OUT] %s", " ".join(hex(x) for x in data)) + self.__logger.info(f"Listening to TCP connections on {self.__options.address} [SSL:{self.__options.ssl}] [AUTH:{self.__options.auth}]") - with self.__clients_lock: - for client in self.__clients: - try: - client.send(data) - except Exception: - self.__logger.exception("Could not send data to client %s", client.address()) - - def __get_bound_socket(self) -> Optional[socket.socket]: - RETRY_DELAY = 5.0 - - while self.is_active() and not self.__bind_socket: - # First, try to initialize the SSL context (as we only need to do it once and once done - # we won’t need to return to trying this, most likely.) - if self.__options.ssl and not self.__context: - try: - context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - context.load_cert_chain(self.__options.cert, keyfile=self.__options.pk) - self.__context = context - except Exception: - self.__logger.exception("Could not initialize SSL for %s", self.__options.address) - self.__stop.wait(RETRY_DELAY) - continue - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - # On Linux the following socket option succeeds binding to the specified IP address - # even if it is not currently assigned to any of the interfaces, which foregoes the - # need for this entire retry logic (still necessary for non-Linux systems) that - # follows. - # - # FreeBSD has IP_BINDANY and OpenBSD has SO_BINDANY, but at least the latter is a - # privileged operation. - if platform.system() == "Linux": - try: - IP_FREEBIND = 15 - sock.setsockopt(socket.SOL_IP, IP_FREEBIND, 1) - except Exception as e: - self.__logger.debug("Could not set IP_FREEBIND for socket at %s: %s", self.__options.address, e) - - try: - sock.bind(self.__options.address) - except OSError: - self.__logger.exception("Could not bind to %s", self.__options.address) - self.__stop.wait(RETRY_DELAY) - continue - - try: - sock.listen(0) - except OSError: - self.__logger.exception("Could not listen on %s", self.__options.address) - self.__stop.wait(RETRY_DELAY) - continue - - # This check is required for `stop()` logic with `shutdown()`s to work correctly – for - # it to work `__bind_socket` must never become `non-None` after `self.__stop` gets set. - with self.__socket_lock: - if self.is_active(): - self.__bind_socket = sock - ssl_str = "enabled" if self.__options.ssl else "disabled" - auth_str = "enabled" if self.__options.auth else "disabled" - self.__logger.info(f"Listening to TCP connections on {self.__options.address} [SSL:{ssl_str}] [AUTH:{auth_str}]") - else: - sock.close() - return self.__bind_socket - - def __bind_and_accept_sockets(self) -> None: - """Binds a listener socket and accept clients from it. - - If the tcp server is closed it will also close socket + async def stop(self) -> None: + """Stops the TCP server """ - while self.is_active(): - listen_socket = self.__get_bound_socket() - if listen_socket is None: - continue # `self.is_active()` most likely became `False` - - try: - client_socket, address = listen_socket.accept() - except OSError: - if not self.__stop.is_set(): - self.__logger.exception("Couldn't accept socket") - continue + if self.__server is not None: + self.__server.close() + await self.__server.wait_closed() + self.__server = None - # Make sure that we're still active - if not self.is_active(): - return - self.__logger.info("TCP connection from %s", address) - - if self.__context is not None: - try: - client_socket = self.__context.wrap_socket(client_socket, server_side=True) - except ssl.SSLError: - self.__logger.exception("SSL handshake failed") - continue - else: - assert not self.__options.ssl, "SSL should have been set-up when SSL is enabled" - - # Define client connection - connection = ClientConnection() - connection.socket = client_socket - connection.should_authorize = self.__options.auth - connection.authorization_key = self.__options.auth_key - - # Start client - client = Client(connection) - client.start() - - with self.__clients_lock: - self.__clients.append(client) - - def is_active(self) -> bool: - """Returns whether or not the TCP connection is active - - Returns: - bool: Whether or not the TCP connection is active - """ + for client in self.__clients: + await client.stop() - return not self.__stop.is_set() + self.__clients.clear() + self.__logger.info("Stopped TCP connection %s", self.__options.address) - def start(self) -> None: - """Starts up the TCP server + async def __handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + """Handles a new client connection """ - if self.is_active(): - return - self.__stop.clear() + connection = ClientConnection() + connection.reader = reader + connection.writer = writer + connection.should_authorize = self.__options.auth + connection.authorization_key = self.__options.auth_key - # Start the server thread to handle connections - self.__server_thread = threading.Thread(target=self.__bind_and_accept_sockets) - self.__server_thread.name = "TCP server thread " + self.__options.host + ":" + str(self.__options.port) - self.__server_thread.start() + client = Client(connection) + self.__clients.append(client) + await client.start() - def stop(self) -> None: - """Stops the TCP server + async def send(self, data: bytearray) -> None: + """Sends given data to all connected clients to the network. + + Args: + data (bytearray): Specifies the packet to send to the connected clients of this network. """ - if not self.is_active(): + if not self.__options.relay: return - self.__logger.info("Stopping TCP connection %s", self.__options.address) - - with self.__socket_lock: - self.__stop.set() - if self.__bind_socket is not None: - # Stop accepting further connections. - # - # Shutting down the socket also interrupts the `accept` call within the - # __server_thread, thus terminating it. - self.__bind_socket.shutdown(socket.SHUT_RDWR) - self.__bind_socket.close() - self.__bind_socket = None - - # Wait till the server thread is closed - self.__server_thread.join() - - # Stop every client listening - with self.__clients_lock: - for client in self.__clients: - client.stop() + if self.__logger.isEnabledFor(logging.DEBUG): # pragma: no cover + self.__logger.debug("[TCP OUT] %s", " ".join(hex(x) for x in data)) - self.__logger.info("Stopped TCP connection %s", self.__options.address) + tasks = [client.send(data) for client in self.__clients] + await asyncio.gather(*tasks) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/tcp/networkmanager.py b/src/velbustcp/lib/connection/tcp/networkmanager.py index a9168ec..8f239db 100644 --- a/src/velbustcp/lib/connection/tcp/networkmanager.py +++ b/src/velbustcp/lib/connection/tcp/networkmanager.py @@ -1,5 +1,6 @@ import logging from typing import List +import asyncio from velbustcp.lib.connection.tcp.network import Network @@ -13,26 +14,26 @@ def __init__(self) -> None: def add_network(self, network: Network): self.__networks.append(network) - def start(self): + async def start(self): """Starts all available networks. """ - for network in self.__networks: - network.start() + tasks = [network.start() for network in self.__networks] + await asyncio.gather(*tasks) - def stop(self): + async def stop(self): """Stops all connected networks. """ - for network in self.__networks: - network.stop() + tasks = [network.stop() for network in self.__networks] + await asyncio.gather(*tasks) - def send(self, packet: bytearray): + async def send(self, packet: bytearray): """Sends the given packet to all networks. Args: packet (bytearray): The packet to send. """ - for network in self.__networks: - network.send(packet) + tasks = [network.send(packet) for network in self.__networks] + await asyncio.gather(*tasks) \ No newline at end of file diff --git a/src/velbustcp/lib/signals.py b/src/velbustcp/lib/signals.py index 6762080..750a040 100644 --- a/src/velbustcp/lib/signals.py +++ b/src/velbustcp/lib/signals.py @@ -4,4 +4,4 @@ on_tcp_receive: NamedSignal = signal("on-tcp-receive") # sender: Client, **kwargs { packet: bytearray } on_bus_send: NamedSignal = signal("on-bus-send") # sender:, **kwargs { packet: bytearray } on_bus_fault: NamedSignal = signal("on-bus-fault") # sender:, **kwargs {} -on_client_close: NamedSignal = signal("on-client-close") # sender: Cient, **kwargs { } +on_client_close: NamedSignal = signal("on-client-close") # sender: Client, **kwargs { } diff --git a/src/velbustcp/lib/util/util.py b/src/velbustcp/lib/util/util.py index 5a5d59f..5511949 100644 --- a/src/velbustcp/lib/util/util.py +++ b/src/velbustcp/lib/util/util.py @@ -26,7 +26,7 @@ def setup_logging(settings: LoggingSettings) -> logging.Logger: if settings.type == "debug": logger.setLevel(logging.DEBUG) else: - logger.setLevel(logging.INFO) + logger.setLevel(logging.DEBUG) # Set handler handler: logging.Handler From fc0196920827a7c974a694cde5879c545ecc2690 Mon Sep 17 00:00:00 2001 From: silamon Date: Wed, 19 Mar 2025 07:44:33 +0100 Subject: [PATCH 03/10] Further work on asyncio --- src/velbustcp/__main__.py | 11 ++---- src/velbustcp/lib/connection/bridge.py | 6 ++-- src/velbustcp/lib/connection/serial/bus.py | 14 ++++---- .../lib/connection/serial/factory.py | 1 - .../lib/connection/serial/writerthread.py | 36 ++++++++++++++----- src/velbustcp/lib/connection/tcp/client.py | 1 - .../lib/connection/tcp/clientconnection.py | 1 - src/velbustcp/lib/connection/tcp/network.py | 3 ++ src/velbustcp/lib/util/util.py | 2 +- 9 files changed, 44 insertions(+), 31 deletions(-) diff --git a/src/velbustcp/__main__.py b/src/velbustcp/__main__.py index 1ddc100..79e1d3a 100644 --- a/src/velbustcp/__main__.py +++ b/src/velbustcp/__main__.py @@ -43,12 +43,6 @@ async def stop(self): """Stops the bridge.""" await self.__bridge.stop() - def main_loop(self): - """Main loop for the program, blocks infinitely until it receives a KeyboardInterrupt. - """ - q = Event() - q.wait() - async def main_async(args=None): """Main asynchronous method.""" @@ -72,8 +66,9 @@ async def main_async(args=None): main = Main() try: - await main.start() - main.main_loop() + loop = asyncio.get_event_loop() + loop.run_until_complete(await main.start()) + loop.run_forever() except KeyboardInterrupt: logger.info("Interrupted, shutting down") diff --git a/src/velbustcp/lib/connection/bridge.py b/src/velbustcp/lib/connection/bridge.py index 53a9c4f..82226f0 100644 --- a/src/velbustcp/lib/connection/bridge.py +++ b/src/velbustcp/lib/connection/bridge.py @@ -42,8 +42,10 @@ async def start(self) -> None: """Starts bus and TCP network(s). """ - await self.__bus.ensure() - await self.__network_manager.start() + serial_task = asyncio.create_task(self.__bus.ensure()) + tcp_task = asyncio.create_task(self.__network_manager.start()) + + await asyncio.gather(serial_task, tcp_task) async def stop(self) -> None: """Stops NTP, bus and network. diff --git a/src/velbustcp/lib/connection/serial/bus.py b/src/velbustcp/lib/connection/serial/bus.py index 0388427..7e7b987 100644 --- a/src/velbustcp/lib/connection/serial/bus.py +++ b/src/velbustcp/lib/connection/serial/bus.py @@ -24,6 +24,7 @@ def __init__(self, options: SerialSettings): async def __reconnect(self): """Reconnects until active.""" self.__logger.info("Attempting to connect") + await self.stop() while self.__do_reconnect and not self.is_active(): try: await self.__start() @@ -40,8 +41,7 @@ async def ensure(self): if self.is_active() or self.__do_reconnect: return self.__do_reconnect = True - self.__reconnect_task = asyncio.create_task(self.__reconnect()) - await self.__reconnect_task + await self.__reconnect() async def __start(self): """Starts up the serial communication if the serial connection is not yet active.""" @@ -53,17 +53,16 @@ async def __start(self): raise ValueError("Couldn't find a port to open communication on") settings = set_serial_settings() - self.__protocol = VelbusSerialProtocol() - self.__transport, _ = await serial_asyncio_fast.create_serial_connection( - asyncio.get_event_loop(), lambda: self.__protocol, url=self.__port, **settings + self.__transport, self.__protocol = await serial_asyncio_fast.create_serial_connection( + asyncio.get_event_loop(), VelbusSerialProtocol, url=self.__port, **settings ) self.__connected = True self.__writer = WriterThread(self.__transport) - asyncio.create_task(self.__writer.run()) - self.__logger.info("Serial connection active on port %s", self.__port) + await self.__writer.run() + async def stop(self): """Stops the serial communication if the serial connection is active.""" if not self.is_active(): @@ -98,5 +97,4 @@ def handle_on_bus_receive(self, sender, **kwargs): self.__writer.lock() def handle_on_bus_fault(self, sender, **kwargs): - asyncio.create_task(self.stop()) asyncio.create_task(self.ensure()) \ No newline at end of file diff --git a/src/velbustcp/lib/connection/serial/factory.py b/src/velbustcp/lib/connection/serial/factory.py index 33402f4..bf356aa 100644 --- a/src/velbustcp/lib/connection/serial/factory.py +++ b/src/velbustcp/lib/connection/serial/factory.py @@ -17,7 +17,6 @@ def set_serial_settings() -> dict: } - def find_port(options: SerialSettings) -> str: """Finds a port for the serial object.""" if options.autodiscover: diff --git a/src/velbustcp/lib/connection/serial/writerthread.py b/src/velbustcp/lib/connection/serial/writerthread.py index f8480ef..8813760 100644 --- a/src/velbustcp/lib/connection/serial/writerthread.py +++ b/src/velbustcp/lib/connection/serial/writerthread.py @@ -6,6 +6,7 @@ from velbustcp.lib import consts from velbustcp.lib.signals import on_bus_send + class WriterThread: def __init__(self, serial_instance: asyncio.StreamWriter): self.alive: bool = True @@ -13,28 +14,37 @@ def __init__(self, serial_instance: asyncio.StreamWriter): self.__logger = logging.getLogger("__main__." + __name__) self.__send_buffer: Deque[bytearray] = deque() self.__serial_lock = asyncio.Lock() + self.__buffer_condition = asyncio.Condition() self.__locked = False async def close(self): """Stop the writer thread""" self.alive = False + async with self.__buffer_condition: + self.__buffer_condition.notify_all() # Wake up the run loop if waiting async def queue(self, packet: bytearray): - self.__send_buffer.append(packet) + """Add a packet to the send buffer and notify the writer thread.""" + async with self.__buffer_condition: + self.__send_buffer.append(packet) + self.__buffer_condition.notify() # Notify the writer thread that a packet is available async def run(self): """Coroutine to safely write to the serial port with a delay.""" - last_send_time = asyncio.get_event_loop().time() + loop = asyncio.get_event_loop() + last_send_time = loop.time() try: while self.alive: - if not self.__send_buffer or self.__locked: - await asyncio.sleep(0.1) - continue + async with self.__buffer_condition: + # Wait until there is data in the buffer and the thread is unlocked + await self.__buffer_condition.wait_for(lambda: self.__send_buffer and not self.__locked) + # Get the next packet to send packet = self.__send_buffer.popleft() - delta_time = asyncio.get_event_loop().time() - last_send_time + # Enforce the send delay + delta_time = loop.time() - last_send_time if delta_time < consts.SEND_DELAY: await asyncio.sleep(consts.SEND_DELAY - delta_time) @@ -48,9 +58,11 @@ async def run(self): except Exception as e: self.__logger.exception(e) - last_send_time = asyncio.get_event_loop().time() - except Exception as e: + last_send_time = loop.time() + except asyncio.CancelledError: self.__logger.info("Writer thread cancelled") + except Exception as e: + self.__logger.exception("Unexpected error in writer thread: %s", e) def lock(self): """Locks the writer thread to prevent sending packets.""" @@ -58,4 +70,10 @@ def lock(self): def unlock(self): """Unlocks the writer thread to allow sending packets.""" - self.__locked = False \ No newline at end of file + self.__locked = False + asyncio.create_task(self.__notify_condition()) + + async def __notify_condition(self): + """Helper coroutine to notify the condition variable.""" + async with self.__buffer_condition: + self.__buffer_condition.notify() \ No newline at end of file diff --git a/src/velbustcp/lib/connection/tcp/client.py b/src/velbustcp/lib/connection/tcp/client.py index d4418ca..9657091 100644 --- a/src/velbustcp/lib/connection/tcp/client.py +++ b/src/velbustcp/lib/connection/tcp/client.py @@ -1,5 +1,4 @@ import logging -import asyncio from typing import Any, List from velbustcp.lib.connection.tcp.clientconnection import ClientConnection diff --git a/src/velbustcp/lib/connection/tcp/clientconnection.py b/src/velbustcp/lib/connection/tcp/clientconnection.py index ed3b00f..8925332 100644 --- a/src/velbustcp/lib/connection/tcp/clientconnection.py +++ b/src/velbustcp/lib/connection/tcp/clientconnection.py @@ -1,6 +1,5 @@ import asyncio - class ClientConnection: """Represents an incoming client connection. """ diff --git a/src/velbustcp/lib/connection/tcp/network.py b/src/velbustcp/lib/connection/tcp/network.py index 3f7195b..9993852 100644 --- a/src/velbustcp/lib/connection/tcp/network.py +++ b/src/velbustcp/lib/connection/tcp/network.py @@ -51,6 +51,9 @@ async def start(self) -> None: self.__logger.info(f"Listening to TCP connections on {self.__options.address} [SSL:{self.__options.ssl}] [AUTH:{self.__options.auth}]") + async with self.__server: + await self.__server.serve_forever() + async def stop(self) -> None: """Stops the TCP server """ diff --git a/src/velbustcp/lib/util/util.py b/src/velbustcp/lib/util/util.py index 5511949..5a5d59f 100644 --- a/src/velbustcp/lib/util/util.py +++ b/src/velbustcp/lib/util/util.py @@ -26,7 +26,7 @@ def setup_logging(settings: LoggingSettings) -> logging.Logger: if settings.type == "debug": logger.setLevel(logging.DEBUG) else: - logger.setLevel(logging.DEBUG) + logger.setLevel(logging.INFO) # Set handler handler: logging.Handler From 3d97bae0f7cf00ed9892ab71d006a1230b7200d2 Mon Sep 17 00:00:00 2001 From: silamon Date: Mon, 24 Mar 2025 10:41:42 +0100 Subject: [PATCH 04/10] Fix --- src/velbustcp/lib/connection/serial/bus.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/velbustcp/lib/connection/serial/bus.py b/src/velbustcp/lib/connection/serial/bus.py index 7e7b987..79cc1a8 100644 --- a/src/velbustcp/lib/connection/serial/bus.py +++ b/src/velbustcp/lib/connection/serial/bus.py @@ -24,7 +24,6 @@ def __init__(self, options: SerialSettings): async def __reconnect(self): """Reconnects until active.""" self.__logger.info("Attempting to connect") - await self.stop() while self.__do_reconnect and not self.is_active(): try: await self.__start() @@ -96,5 +95,9 @@ def handle_on_bus_receive(self, sender, **kwargs): else: self.__writer.lock() + async def on_reconnection(self): + await self.stop() + await self.ensure() + def handle_on_bus_fault(self, sender, **kwargs): - asyncio.create_task(self.ensure()) \ No newline at end of file + asyncio.create_task(self.on_reconnection()) \ No newline at end of file From b59580553ec4422f61cccebd0506c1ecdee02f8e Mon Sep 17 00:00:00 2001 From: Simon Lamon <32477463+silamon@users.noreply.github.com> Date: Mon, 24 Mar 2025 11:58:20 +0100 Subject: [PATCH 05/10] Add entrypoint again --- src/velbustcp/__main__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/velbustcp/__main__.py b/src/velbustcp/__main__.py index 79e1d3a..ee2fc5d 100644 --- a/src/velbustcp/__main__.py +++ b/src/velbustcp/__main__.py @@ -81,5 +81,10 @@ async def main_async(args=None): logger.info("Shutdown") +# entrypoint for the snap +def main(args=None): + """Main method.""" + asyncio.run(main_async(args)) + if __name__ == '__main__': - asyncio.run(main_async()) \ No newline at end of file + sys.exit(main()) From f1af9f7a5485eeda780ca531fafa04a6ed7d47da Mon Sep 17 00:00:00 2001 From: silamon Date: Thu, 27 Mar 2025 10:24:51 +0000 Subject: [PATCH 06/10] Fix some tests --- .gitignore | 208 +++++++++--------- setup.cfg | 1 + src/velbustcp/lib/connection/tcp/network.py | 14 ++ .../lib/connection/serial/test_factory.py | 18 +- .../lib/connection/tcp/test_client.py | 172 ++++++--------- .../lib/connection/tcp/test_network.py | 23 +- .../lib/connection/tcp/test_networkmanager.py | 37 +++- tests/velbustcp/lib/connection/test_bridge.py | 21 +- 8 files changed, 253 insertions(+), 241 deletions(-) diff --git a/.gitignore b/.gitignore index 398785f..9e93d27 100644 --- a/.gitignore +++ b/.gitignore @@ -1,105 +1,105 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -.hypothesis/ -.pytest_cache/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# pyenv -.python-version - -# celery beat schedule file -celerybeat-schedule - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ - +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + .vscode \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index bf23703..e1282a7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,6 +26,7 @@ testing = pytest>=7.3.1 pytest-cov>=4.1.0 pytest-mock>=3.10.0 + pytest-asyncio>=0.23.3 mypy>=1.3.0 flake8>=6.0.0 tox>=4.6.0 diff --git a/src/velbustcp/lib/connection/tcp/network.py b/src/velbustcp/lib/connection/tcp/network.py index 9993852..dc5c20e 100644 --- a/src/velbustcp/lib/connection/tcp/network.py +++ b/src/velbustcp/lib/connection/tcp/network.py @@ -22,6 +22,7 @@ def __init__(self, options: NetworkSettings): self.__options: NetworkSettings = options self.__context: Optional[ssl.SSLContext] = None self.__server: Optional[asyncio.AbstractServer] = None + self.__is_active: bool = False # New field to track server state # Hook up signal def handle_client_close(sender: Client, **kwargs): @@ -34,6 +35,14 @@ def handle_client_close(sender: Client, **kwargs): self.handle_client_close = handle_client_close on_client_close.connect(handle_client_close) + def is_active(self) -> bool: + """Checks if the TCP server is active. + + Returns: + bool: True if the server is running, False otherwise. + """ + return self.__is_active + async def start(self) -> None: """Starts up the TCP server """ @@ -49,6 +58,7 @@ async def start(self) -> None: ssl=self.__context ) + self.__is_active = True # Set to True when the server starts self.__logger.info(f"Listening to TCP connections on {self.__options.address} [SSL:{self.__options.ssl}] [AUTH:{self.__options.auth}]") async with self.__server: @@ -67,6 +77,7 @@ async def stop(self) -> None: await client.stop() self.__clients.clear() + self.__is_active = False # Set to False when the server stops self.__logger.info("Stopped TCP connection %s", self.__options.address) async def __handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: @@ -90,6 +101,9 @@ async def send(self, data: bytearray) -> None: data (bytearray): Specifies the packet to send to the connected clients of this network. """ + if not self.is_active(): + return + if not self.__options.relay: return diff --git a/tests/velbustcp/lib/connection/serial/test_factory.py b/tests/velbustcp/lib/connection/serial/test_factory.py index 53ecca4..36a3b68 100644 --- a/tests/velbustcp/lib/connection/serial/test_factory.py +++ b/tests/velbustcp/lib/connection/serial/test_factory.py @@ -5,13 +5,13 @@ def test_settings(): serial = serial_for_url("loop://", timeout=1) - set_serial_settings(serial) + serial_settings = set_serial_settings() - assert serial.baudrate == 38400 - assert serial.parity == PARITY_NONE - assert serial.stopbits == STOPBITS_ONE - assert serial.bytesize == EIGHTBITS - assert serial.xonxoff == 0 - assert not serial.timeout - assert serial.dsrdtr == 1 - assert serial.rtscts == 0 + assert serial_settings.get("baudrate") == 38400 + assert serial_settings.get("parity") == PARITY_NONE + assert serial_settings.get("stopbits") == STOPBITS_ONE + assert serial_settings.get("bytesize") == EIGHTBITS + assert serial_settings.get("xonxoff") == 0 + assert not serial_settings.get("timeout") + assert serial_settings.get("dsrdtr") == 1 + assert serial_settings.get("rtscts") == 0 diff --git a/tests/velbustcp/lib/connection/tcp/test_client.py b/tests/velbustcp/lib/connection/tcp/test_client.py index e72e8d6..f0c6312 100644 --- a/tests/velbustcp/lib/connection/tcp/test_client.py +++ b/tests/velbustcp/lib/connection/tcp/test_client.py @@ -1,26 +1,24 @@ +import asyncio import threading +import pytest from velbustcp.lib.connection.tcp.client import Client from pytest_mock import MockFixture, MockerFixture from velbustcp.lib.connection.tcp.clientconnection import ClientConnection from velbustcp.lib.signals import on_client_close, on_tcp_receive -def get_mock_socket(mocker: MockerFixture): - - def get_address(): - return "mock" - - socket = mocker.Mock() - socket.getpeername = get_address - - return socket +def get_mock_connection(mocker: MockerFixture): + """Creates a mock ClientConnection with mocked reader and writer.""" + connection = mocker.Mock(spec=ClientConnection) + connection.reader = mocker.AsyncMock() + connection.writer = mocker.AsyncMock() + connection.writer.get_extra_info = mocker.Mock(return_value="mock") + return connection def test_defaults(mocker: MockerFixture): - # Create connection - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) + conn = get_mock_connection(mocker) # Create client client = Client(conn) @@ -32,15 +30,11 @@ def test_defaults(mocker: MockerFixture): assert not client.is_active() -def test_auth_wrong_key(mocker: MockerFixture): - +@pytest.mark.asyncio +async def test_auth_wrong_key(mocker: MockerFixture): # Create connection - def recv(len): - return "velbus".encode("utf-8") - - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) - conn.socket.recv = recv + conn = get_mock_connection(mocker) + conn.reader.read = mocker.AsyncMock(return_value="velbus".encode("utf-8")) conn.should_authorize = True conn.authorization_key = "something-different" @@ -49,24 +43,21 @@ def recv(len): def handle_client_close(sender, **kwargs): e.set() + on_client_close.connect(handle_client_close) client = Client(conn) - client.start() + await client.start() e.wait() assert not client.is_active() -def test_auth_no_data(mocker: MockerFixture): - +@pytest.mark.asyncio +async def test_auth_no_data(mocker: MockerFixture): # Create connection - def recv(len): - return bytes() - - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) - conn.socket.recv = recv + conn = get_mock_connection(mocker) + conn.reader.read = mocker.AsyncMock(return_value=b"") conn.should_authorize = True conn.authorization_key = "velbus" @@ -75,24 +66,21 @@ def recv(len): def handle_client_close(sender, **kwargs): e.set() + on_client_close.connect(handle_client_close) client = Client(conn) - client.start() + await client.start() e.wait() assert not client.is_active() -def test_auth_recv_exception(mocker: MockerFixture): - +@pytest.mark.asyncio +async def test_auth_recv_exception(mocker: MockerFixture): # Create connection - def recv(len): - raise Exception("Thrown on purpose") - - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) - conn.socket.recv = recv + conn = get_mock_connection(mocker) + conn.reader.read = mocker.AsyncMock(side_effect=Exception("Thrown on purpose")) conn.should_authorize = True conn.authorization_key = "velbus" @@ -101,24 +89,21 @@ def recv(len): def handle_client_close(sender, **kwargs): e.set() + on_client_close.connect(handle_client_close) client = Client(conn) - client.start() + await client.start() e.wait() assert not client.is_active() -def test_packet_empty(mocker: MockFixture): - +@pytest.mark.asyncio +async def test_packet_empty(mocker: MockFixture): # Create connection - def recv(len): - return bytes() - - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) - conn.socket.recv = recv + conn = get_mock_connection(mocker) + conn.reader.read = mocker.AsyncMock(return_value=b"") conn.should_authorize = False # Create client @@ -130,57 +115,48 @@ def handle_client_close(sender, **kwargs): on_client_close.connect(handle_client_close) client = Client(conn) - client.start() + await client.start() e.wait() assert not client.is_active() -def test_packet_handling(mocker: MockFixture): - +@pytest.mark.asyncio +async def test_packet_handling(mocker: MockFixture): # Create connection data = bytes([0x0F, 0xFB, 0xFF, 0x40, 0xB7, 0x04]) - - def recv(len): - return data - - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) - conn.socket.recv = recv + conn = get_mock_connection(mocker) + conn.reader.read = mocker.AsyncMock(return_value=data) conn.should_authorize = False # Create client - e = threading.Event() + e = asyncio.Event() def on_packet_receive(sender, **kwargs): packet = kwargs["packet"] - if (packet == bytearray(data)): + if packet == bytearray(data): e.set() - client = Client(conn) - client.start() - on_tcp_receive.connect(on_packet_receive) - e.wait() + client = Client(conn) + await client.start() - client.stop() - assert not client.is_active() + await e.wait() # Use asyncio.Event for waiting + await client.stop() + assert not client.is_active() -def test_packet_recv_exception(mocker: MockerFixture): +@pytest.mark.asyncio +async def test_packet_recv_exception(mocker: MockerFixture): # Create connection - def recv(len): - raise Exception("Thrown on purpose") - - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) - conn.socket.recv = recv + conn = get_mock_connection(mocker) + conn.reader.read = mocker.AsyncMock(side_effect=Exception("Thrown on purpose")) conn.should_authorize = False # Create client - e = threading.Event() + e = asyncio.Event() def handle_client_close(sender, **kwargs): e.set() @@ -188,50 +164,40 @@ def handle_client_close(sender, **kwargs): on_client_close.connect(handle_client_close) client = Client(conn) - client.start() + await client.start() - e.wait() + await e.wait() # Use asyncio.Event for waiting assert not client.is_active() -def test_client_send(mocker: MockerFixture): - +@pytest.mark.asyncio +async def test_client_send(mocker: MockerFixture): # Create connection data = bytes([0x0F, 0xFB, 0xFF, 0x40, 0xB7, 0x04]) - - def recv(len): - return [0x00] - - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) - conn.socket.recv = recv + conn = get_mock_connection(mocker) + conn.reader.read = mocker.AsyncMock(return_value=b"\x00") conn.should_authorize = False # Create client client = Client(conn) # First send data without client being connected - client.send(data) - conn.socket.sendall.assert_not_called() + await client.send(data) + conn.writer.write.assert_not_called() # Start client and try sending - client.start() - client.send(bytearray(data)) - conn.socket.sendall.assert_called_with(data) - client.stop() - + await client.start() + await client.send(bytearray(data)) + conn.writer.write.assert_called_with(data) + await client.stop() -def test_client_send_not_own_packet(mocker: MockerFixture): +@pytest.mark.asyncio +async def test_client_send_not_own_packet(mocker: MockerFixture): # Create connection data = bytes([0x0F, 0xFB, 0xFF, 0x40, 0xB7, 0x04]) - - def recv(len): - return data - - conn = ClientConnection() - conn.socket = get_mock_socket(mocker) - conn.socket.recv = recv + conn = get_mock_connection(mocker) + conn.reader.read = mocker.AsyncMock(return_value=data) conn.should_authorize = False # Create client @@ -239,7 +205,7 @@ def recv(len): # Start client and try sending # should fail because it originated from that client - client.start() - client.send(bytearray(data)) - conn.socket.sendall.assert_not_called() - client.stop() + await client.start() + await client.send(bytearray(data)) + conn.writer.write.assert_not_called() + await client.stop() diff --git a/tests/velbustcp/lib/connection/tcp/test_network.py b/tests/velbustcp/lib/connection/tcp/test_network.py index 8a975e6..05c1e28 100644 --- a/tests/velbustcp/lib/connection/tcp/test_network.py +++ b/tests/velbustcp/lib/connection/tcp/test_network.py @@ -1,3 +1,5 @@ +import asyncio +import pytest from pytest_mock import MockFixture from velbustcp.lib.connection.tcp.network import Network @@ -14,22 +16,31 @@ def test_defaults(mocker: MockFixture): assert not network.is_active() -def test_start_stop(mocker: MockFixture): +@pytest.mark.asyncio +async def test_start_stop(mocker: MockFixture): # Arrange settings = NetworkSettings() network = Network(options=settings) # Act - network.start() + start_task = asyncio.create_task(network.start()) + await asyncio.sleep(1) # Allow some time for the server to start # Assert assert network.is_active() - network.stop() - assert not network.is_active() + # Cleanup + await network.stop() + assert not network.is_active() + start_task.cancel() + try: + await start_task + except asyncio.CancelledError: + pass -def test_send_not_active(mocker: MockFixture): +@pytest.mark.asyncio +async def test_send_not_active(mocker: MockFixture): # Arrange settings = NetworkSettings() @@ -37,7 +48,7 @@ def test_send_not_active(mocker: MockFixture): spy = mocker.spy(network, 'is_active') # Act - network.send(bytearray([])) + await network.send(bytearray([])) # Assert spy.assert_called_once() diff --git a/tests/velbustcp/lib/connection/tcp/test_networkmanager.py b/tests/velbustcp/lib/connection/tcp/test_networkmanager.py index ffeae1c..59a3e8d 100644 --- a/tests/velbustcp/lib/connection/tcp/test_networkmanager.py +++ b/tests/velbustcp/lib/connection/tcp/test_networkmanager.py @@ -1,39 +1,39 @@ +import pytest from pytest_mock import MockFixture - from velbustcp.lib.connection.tcp.network import Network from velbustcp.lib.connection.tcp.networkmanager import NetworkManager -def test_start(mocker: MockFixture): - +@pytest.mark.asyncio +async def test_start(mocker: MockFixture): # Arrange network_manager = NetworkManager() network = mocker.Mock(spec=Network) network_manager.add_network(network) # Act - network_manager.start() + await network_manager.start() # Assert network.start.assert_called_once() -def test_stop(mocker: MockFixture): - +@pytest.mark.asyncio +async def test_stop(mocker: MockFixture): # Arrange network_manager = NetworkManager() network = mocker.Mock(spec=Network) network_manager.add_network(network) # Act - network_manager.stop() + await network_manager.stop() # Assert network.stop.assert_called_once() -def test_send(mocker: MockFixture): - +@pytest.mark.asyncio +async def test_send(mocker: MockFixture): # Arrange packet = bytearray([0x01]) network_manager = NetworkManager() @@ -41,7 +41,24 @@ def test_send(mocker: MockFixture): network_manager.add_network(network) # Act - network_manager.send(packet) + await network_manager.send(packet) # Assert network.send.assert_called_once_with(packet) + + +@pytest.mark.asyncio +async def test_add_multiple_networks(mocker: MockFixture): + # Arrange + network_manager = NetworkManager() + network1 = mocker.Mock(spec=Network) + network2 = mocker.Mock(spec=Network) + network_manager.add_network(network1) + network_manager.add_network(network2) + + # Act + await network_manager.start() + + # Assert + network1.start.assert_called_once() + network2.start.assert_called_once() diff --git a/tests/velbustcp/lib/connection/test_bridge.py b/tests/velbustcp/lib/connection/test_bridge.py index 39beb7b..54d669c 100644 --- a/tests/velbustcp/lib/connection/test_bridge.py +++ b/tests/velbustcp/lib/connection/test_bridge.py @@ -1,4 +1,6 @@ +import pytest + from pytest_mock import MockerFixture from velbustcp.lib.connection.bridge import Bridge from velbustcp.lib.consts import COMMAND_BUS_ACTIVE, COMMAND_BUS_BUFFERREADY, COMMAND_BUS_OFF, ETX, PRIORITY_HIGH, STX @@ -7,24 +9,25 @@ BUS_OFF_DATA = bytearray([ETX, PRIORITY_HIGH, 0x00, 0x01, COMMAND_BUS_OFF, 0x00, STX]) BUS_BUFFER_READY_DATA = bytearray([ETX, PRIORITY_HIGH, 0x00, 0x01, COMMAND_BUS_BUFFERREADY, 0x00, STX]) - -def test_bridge_start(mocker: MockerFixture): - mock_bus = mocker.Mock() - mock_network_manager = mocker.Mock() +@pytest.mark.asyncio +async def test_bridge_start(mocker: MockerFixture): + mock_bus = mocker.AsyncMock() + mock_network_manager = mocker.AsyncMock() bridge = Bridge(mock_bus, mock_network_manager) - bridge.start() + await bridge.start() mock_bus.ensure.assert_called() mock_network_manager.start.assert_called() -def test_bridge_stop(mocker: MockerFixture): - mock_bus = mocker.Mock() - mock_network_manager = mocker.Mock() +@pytest.mark.asyncio +async def test_bridge_stop(mocker: MockerFixture): + mock_bus = mocker.AsyncMock() + mock_network_manager = mocker.AsyncMock() bridge = Bridge(mock_bus, mock_network_manager) - bridge.stop() + await bridge.stop() mock_bus.stop.assert_called() mock_network_manager.stop.assert_called() From e7c61dc10bc76c668bbd198569b6aafb18893b8e Mon Sep 17 00:00:00 2001 From: Simon Lamon <32477463+silamon@users.noreply.github.com> Date: Mon, 31 Mar 2025 07:03:19 +0000 Subject: [PATCH 07/10] Fixes --- .devcontainer/devcontainer.json | 2 +- pyproject.toml | 1 + src/velbustcp/lib/connection/tcp/client.py | 5 +- .../lib/connection/tcp/test_client.py | 70 +++++++++++++------ 4 files changed, 56 insertions(+), 22 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 6fdd9fa..0b015a9 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,7 +3,7 @@ { "name": "Python 3", // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile - "image": "mcr.microsoft.com/devcontainers/python:3.10-buster", + "image": "mcr.microsoft.com/devcontainers/python:3.13-bullseye", // Features to add to the dev container. More info: https://containers.dev/features. // "features": {}, diff --git a/pyproject.toml b/pyproject.toml index 517ee83..725cd16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,6 +3,7 @@ requires = ["setuptools>=42.0", "wheel"] build-backend = "setuptools.build_meta" [tool.pytest.ini_options] +asyncio_mode = "auto" addopts = "--cov=velbustcp" testpaths = [ "tests", diff --git a/src/velbustcp/lib/connection/tcp/client.py b/src/velbustcp/lib/connection/tcp/client.py index 9657091..9350d5f 100644 --- a/src/velbustcp/lib/connection/tcp/client.py +++ b/src/velbustcp/lib/connection/tcp/client.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import Any, List @@ -135,4 +136,6 @@ async def __handle_packets(self) -> None: for packet in packets: self.__received_packets.append(packet) - on_tcp_receive.send(self, packet=packet) \ No newline at end of file + on_tcp_receive.send(self, packet=packet) + + await asyncio.sleep(0) \ No newline at end of file diff --git a/tests/velbustcp/lib/connection/tcp/test_client.py b/tests/velbustcp/lib/connection/tcp/test_client.py index f0c6312..16cac81 100644 --- a/tests/velbustcp/lib/connection/tcp/test_client.py +++ b/tests/velbustcp/lib/connection/tcp/test_client.py @@ -39,7 +39,7 @@ async def test_auth_wrong_key(mocker: MockerFixture): conn.authorization_key = "something-different" # Create client - e = threading.Event() + e = asyncio.Event() def handle_client_close(sender, **kwargs): e.set() @@ -47,9 +47,13 @@ def handle_client_close(sender, **kwargs): on_client_close.connect(handle_client_close) client = Client(conn) - await client.start() + task = asyncio.create_task(client.start()) # Run client.start() as a separate task + + await e.wait() # Wait for the event to be set + + await client.stop() + await task - e.wait() assert not client.is_active() @@ -62,7 +66,7 @@ async def test_auth_no_data(mocker: MockerFixture): conn.authorization_key = "velbus" # Create client - e = threading.Event() + e = asyncio.Event() def handle_client_close(sender, **kwargs): e.set() @@ -70,9 +74,13 @@ def handle_client_close(sender, **kwargs): on_client_close.connect(handle_client_close) client = Client(conn) - await client.start() + task = asyncio.create_task(client.start()) # Run client.start() as a separate task + + await e.wait() # Wait for the event to be set + + await client.stop() + await task - e.wait() assert not client.is_active() @@ -85,7 +93,7 @@ async def test_auth_recv_exception(mocker: MockerFixture): conn.authorization_key = "velbus" # Create client - e = threading.Event() + e = asyncio.Event() def handle_client_close(sender, **kwargs): e.set() @@ -93,9 +101,13 @@ def handle_client_close(sender, **kwargs): on_client_close.connect(handle_client_close) client = Client(conn) - await client.start() + task = asyncio.create_task(client.start()) # Run client.start() as a separate task + + await e.wait() # Wait for the event to be set + + await client.stop() + await task - e.wait() assert not client.is_active() @@ -107,7 +119,7 @@ async def test_packet_empty(mocker: MockFixture): conn.should_authorize = False # Create client - e = threading.Event() + e = asyncio.Event() def handle_client_close(sender, **kwargs): e.set() @@ -115,9 +127,13 @@ def handle_client_close(sender, **kwargs): on_client_close.connect(handle_client_close) client = Client(conn) - await client.start() + task = asyncio.create_task(client.start()) # Run client.start() as a separate task + + await e.wait() # Wait for the event to be set + + await client.stop() + await task - e.wait() assert not client.is_active() @@ -134,15 +150,18 @@ async def test_packet_handling(mocker: MockFixture): def on_packet_receive(sender, **kwargs): packet = kwargs["packet"] - if packet == bytearray(data): + if packet == bytearray(data) and not e.is_set(): e.set() on_tcp_receive.connect(on_packet_receive) client = Client(conn) - await client.start() + task = asyncio.create_task(client.start()) # Run client.start() as a separate task - await e.wait() # Use asyncio.Event for waiting + await e.wait() # Wait for the event to be set + + await client.stop() + await task await client.stop() assert not client.is_active() @@ -164,11 +183,14 @@ def handle_client_close(sender, **kwargs): on_client_close.connect(handle_client_close) client = Client(conn) - await client.start() + task = asyncio.create_task(client.start()) # Run client.start() as a separate task - await e.wait() # Use asyncio.Event for waiting - assert not client.is_active() + await e.wait() # Wait for the event to be set + await client.stop() + await task + + assert not client.is_active() @pytest.mark.asyncio async def test_client_send(mocker: MockerFixture): @@ -186,10 +208,14 @@ async def test_client_send(mocker: MockerFixture): conn.writer.write.assert_not_called() # Start client and try sending - await client.start() + task = asyncio.create_task(client.start()) # Run client.start() as a separate task + await asyncio.sleep(0) + await client.send(bytearray(data)) conn.writer.write.assert_called_with(data) + await client.stop() + await task @pytest.mark.asyncio @@ -205,7 +231,11 @@ async def test_client_send_not_own_packet(mocker: MockerFixture): # Start client and try sending # should fail because it originated from that client - await client.start() + task = asyncio.create_task(client.start()) # Run client.start() as a separate task + await asyncio.sleep(0) + await client.send(bytearray(data)) conn.writer.write.assert_not_called() + await client.stop() + await task From 652a8ad09bc053975eda9e1af62ff932af2f2af2 Mon Sep 17 00:00:00 2001 From: Simon Lamon <32477463+silamon@users.noreply.github.com> Date: Mon, 31 Mar 2025 07:08:35 +0000 Subject: [PATCH 08/10] Flake --- src/velbustcp/__main__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/velbustcp/__main__.py b/src/velbustcp/__main__.py index ee2fc5d..7417206 100644 --- a/src/velbustcp/__main__.py +++ b/src/velbustcp/__main__.py @@ -1,6 +1,5 @@ import argparse import json -from threading import Event import sys import asyncio @@ -81,10 +80,12 @@ async def main_async(args=None): logger.info("Shutdown") + # entrypoint for the snap def main(args=None): """Main method.""" asyncio.run(main_async(args)) + if __name__ == '__main__': sys.exit(main()) From dcaaf1ff729c331a5192f16f0e5f6722f9075123 Mon Sep 17 00:00:00 2001 From: Simon Lamon <32477463+silamon@users.noreply.github.com> Date: Mon, 31 Mar 2025 07:10:51 +0000 Subject: [PATCH 09/10] Flake --- src/velbustcp/lib/connection/tcp/clientconnection.py | 1 + tests/velbustcp/lib/connection/tcp/test_client.py | 1 + tests/velbustcp/lib/connection/tcp/test_network.py | 1 + tests/velbustcp/lib/connection/test_bridge.py | 1 + 4 files changed, 4 insertions(+) diff --git a/src/velbustcp/lib/connection/tcp/clientconnection.py b/src/velbustcp/lib/connection/tcp/clientconnection.py index 8925332..ed3b00f 100644 --- a/src/velbustcp/lib/connection/tcp/clientconnection.py +++ b/src/velbustcp/lib/connection/tcp/clientconnection.py @@ -1,5 +1,6 @@ import asyncio + class ClientConnection: """Represents an incoming client connection. """ diff --git a/tests/velbustcp/lib/connection/tcp/test_client.py b/tests/velbustcp/lib/connection/tcp/test_client.py index 16cac81..a5b71e1 100644 --- a/tests/velbustcp/lib/connection/tcp/test_client.py +++ b/tests/velbustcp/lib/connection/tcp/test_client.py @@ -192,6 +192,7 @@ def handle_client_close(sender, **kwargs): assert not client.is_active() + @pytest.mark.asyncio async def test_client_send(mocker: MockerFixture): # Create connection diff --git a/tests/velbustcp/lib/connection/tcp/test_network.py b/tests/velbustcp/lib/connection/tcp/test_network.py index 05c1e28..6efbf34 100644 --- a/tests/velbustcp/lib/connection/tcp/test_network.py +++ b/tests/velbustcp/lib/connection/tcp/test_network.py @@ -39,6 +39,7 @@ async def test_start_stop(mocker: MockFixture): except asyncio.CancelledError: pass + @pytest.mark.asyncio async def test_send_not_active(mocker: MockFixture): diff --git a/tests/velbustcp/lib/connection/test_bridge.py b/tests/velbustcp/lib/connection/test_bridge.py index 54d669c..6827a7f 100644 --- a/tests/velbustcp/lib/connection/test_bridge.py +++ b/tests/velbustcp/lib/connection/test_bridge.py @@ -9,6 +9,7 @@ BUS_OFF_DATA = bytearray([ETX, PRIORITY_HIGH, 0x00, 0x01, COMMAND_BUS_OFF, 0x00, STX]) BUS_BUFFER_READY_DATA = bytearray([ETX, PRIORITY_HIGH, 0x00, 0x01, COMMAND_BUS_BUFFERREADY, 0x00, STX]) + @pytest.mark.asyncio async def test_bridge_start(mocker: MockerFixture): mock_bus = mocker.AsyncMock() From 98e8c353a398feb41973da75d08a12a05d3090ec Mon Sep 17 00:00:00 2001 From: Simon Lamon <32477463+silamon@users.noreply.github.com> Date: Mon, 31 Mar 2025 07:29:57 +0000 Subject: [PATCH 10/10] flake, mypy --- .devcontainer/devcontainer.json | 2 +- src/velbustcp/lib/connection/bridge.py | 4 ++-- src/velbustcp/lib/connection/serial/bus.py | 2 +- src/velbustcp/lib/connection/serial/factory.py | 5 +++-- src/velbustcp/lib/connection/serial/serialprotocol.py | 2 +- src/velbustcp/lib/connection/serial/writerthread.py | 2 +- src/velbustcp/lib/connection/tcp/client.py | 2 +- src/velbustcp/lib/connection/tcp/clientconnection.py | 2 +- src/velbustcp/lib/connection/tcp/network.py | 2 +- src/velbustcp/lib/connection/tcp/networkmanager.py | 2 +- tests/velbustcp/lib/connection/serial/test_factory.py | 2 +- tests/velbustcp/lib/connection/tcp/test_client.py | 1 - tests/velbustcp/lib/connection/test_bridge.py | 2 +- tox.ini | 10 ++++++---- 14 files changed, 21 insertions(+), 19 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 0b015a9..dd50d21 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,7 +3,7 @@ { "name": "Python 3", // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile - "image": "mcr.microsoft.com/devcontainers/python:3.13-bullseye", + "image": "mcr.microsoft.com/devcontainers/python:3.12-bullseye", // Features to add to the dev container. More info: https://containers.dev/features. // "features": {}, diff --git a/src/velbustcp/lib/connection/bridge.py b/src/velbustcp/lib/connection/bridge.py index 82226f0..19d1311 100644 --- a/src/velbustcp/lib/connection/bridge.py +++ b/src/velbustcp/lib/connection/bridge.py @@ -44,7 +44,7 @@ async def start(self) -> None: serial_task = asyncio.create_task(self.__bus.ensure()) tcp_task = asyncio.create_task(self.__network_manager.start()) - + await asyncio.gather(serial_task, tcp_task) async def stop(self) -> None: @@ -52,4 +52,4 @@ async def stop(self) -> None: """ await self.__network_manager.stop() - await self.__bus.stop() \ No newline at end of file + await self.__bus.stop() diff --git a/src/velbustcp/lib/connection/serial/bus.py b/src/velbustcp/lib/connection/serial/bus.py index 79cc1a8..cff8f2a 100644 --- a/src/velbustcp/lib/connection/serial/bus.py +++ b/src/velbustcp/lib/connection/serial/bus.py @@ -100,4 +100,4 @@ async def on_reconnection(self): await self.ensure() def handle_on_bus_fault(self, sender, **kwargs): - asyncio.create_task(self.on_reconnection()) \ No newline at end of file + asyncio.create_task(self.on_reconnection()) diff --git a/src/velbustcp/lib/connection/serial/factory.py b/src/velbustcp/lib/connection/serial/factory.py index bf356aa..09c7618 100644 --- a/src/velbustcp/lib/connection/serial/factory.py +++ b/src/velbustcp/lib/connection/serial/factory.py @@ -1,9 +1,10 @@ +from typing import Any import serial_asyncio_fast from velbustcp.lib.settings.serial import SerialSettings from velbustcp.lib.util.util import search_for_serial -def set_serial_settings() -> dict: +def set_serial_settings() -> dict[str, Any]: """Returns settings for a Serial object for use with the Velbus protocol.""" return { 'baudrate': 38400, @@ -22,4 +23,4 @@ def find_port(options: SerialSettings) -> str: if options.autodiscover: ports = search_for_serial() return next((port for port in ports), options.port) - return options.port \ No newline at end of file + return options.port diff --git a/src/velbustcp/lib/connection/serial/serialprotocol.py b/src/velbustcp/lib/connection/serial/serialprotocol.py index 5413979..cdc8f8e 100644 --- a/src/velbustcp/lib/connection/serial/serialprotocol.py +++ b/src/velbustcp/lib/connection/serial/serialprotocol.py @@ -28,4 +28,4 @@ def connection_lost(self, exc): self.__logger.error("Connection lost") if exc: self.__logger.exception(exc) - on_bus_fault.send(self) \ No newline at end of file + on_bus_fault.send(self) diff --git a/src/velbustcp/lib/connection/serial/writerthread.py b/src/velbustcp/lib/connection/serial/writerthread.py index 8813760..44d1081 100644 --- a/src/velbustcp/lib/connection/serial/writerthread.py +++ b/src/velbustcp/lib/connection/serial/writerthread.py @@ -76,4 +76,4 @@ def unlock(self): async def __notify_condition(self): """Helper coroutine to notify the condition variable.""" async with self.__buffer_condition: - self.__buffer_condition.notify() \ No newline at end of file + self.__buffer_condition.notify() diff --git a/src/velbustcp/lib/connection/tcp/client.py b/src/velbustcp/lib/connection/tcp/client.py index 9350d5f..6156a3d 100644 --- a/src/velbustcp/lib/connection/tcp/client.py +++ b/src/velbustcp/lib/connection/tcp/client.py @@ -138,4 +138,4 @@ async def __handle_packets(self) -> None: self.__received_packets.append(packet) on_tcp_receive.send(self, packet=packet) - await asyncio.sleep(0) \ No newline at end of file + await asyncio.sleep(0) diff --git a/src/velbustcp/lib/connection/tcp/clientconnection.py b/src/velbustcp/lib/connection/tcp/clientconnection.py index ed3b00f..354e12c 100644 --- a/src/velbustcp/lib/connection/tcp/clientconnection.py +++ b/src/velbustcp/lib/connection/tcp/clientconnection.py @@ -8,4 +8,4 @@ class ClientConnection: reader: asyncio.StreamReader writer: asyncio.StreamWriter should_authorize: bool = False - authorization_key: str = "" \ No newline at end of file + authorization_key: str = "" diff --git a/src/velbustcp/lib/connection/tcp/network.py b/src/velbustcp/lib/connection/tcp/network.py index dc5c20e..44d7d12 100644 --- a/src/velbustcp/lib/connection/tcp/network.py +++ b/src/velbustcp/lib/connection/tcp/network.py @@ -111,4 +111,4 @@ async def send(self, data: bytearray) -> None: self.__logger.debug("[TCP OUT] %s", " ".join(hex(x) for x in data)) tasks = [client.send(data) for client in self.__clients] - await asyncio.gather(*tasks) \ No newline at end of file + await asyncio.gather(*tasks) diff --git a/src/velbustcp/lib/connection/tcp/networkmanager.py b/src/velbustcp/lib/connection/tcp/networkmanager.py index 8f239db..bac2d49 100644 --- a/src/velbustcp/lib/connection/tcp/networkmanager.py +++ b/src/velbustcp/lib/connection/tcp/networkmanager.py @@ -36,4 +36,4 @@ async def send(self, packet: bytearray): """ tasks = [network.send(packet) for network in self.__networks] - await asyncio.gather(*tasks) \ No newline at end of file + await asyncio.gather(*tasks) diff --git a/tests/velbustcp/lib/connection/serial/test_factory.py b/tests/velbustcp/lib/connection/serial/test_factory.py index 36a3b68..7bfa09b 100644 --- a/tests/velbustcp/lib/connection/serial/test_factory.py +++ b/tests/velbustcp/lib/connection/serial/test_factory.py @@ -4,7 +4,7 @@ def test_settings(): - serial = serial_for_url("loop://", timeout=1) + serial_for_url("loop://", timeout=1) serial_settings = set_serial_settings() assert serial_settings.get("baudrate") == 38400 diff --git a/tests/velbustcp/lib/connection/tcp/test_client.py b/tests/velbustcp/lib/connection/tcp/test_client.py index a5b71e1..8cc6f57 100644 --- a/tests/velbustcp/lib/connection/tcp/test_client.py +++ b/tests/velbustcp/lib/connection/tcp/test_client.py @@ -1,5 +1,4 @@ import asyncio -import threading import pytest from velbustcp.lib.connection.tcp.client import Client from pytest_mock import MockFixture, MockerFixture diff --git a/tests/velbustcp/lib/connection/test_bridge.py b/tests/velbustcp/lib/connection/test_bridge.py index 6827a7f..54d444e 100644 --- a/tests/velbustcp/lib/connection/test_bridge.py +++ b/tests/velbustcp/lib/connection/test_bridge.py @@ -1,5 +1,5 @@ -import pytest +import pytest from pytest_mock import MockerFixture from velbustcp.lib.connection.bridge import Bridge diff --git a/tox.ini b/tox.ini index b35996f..c7227d0 100644 --- a/tox.ini +++ b/tox.ini @@ -1,14 +1,16 @@ [tox] minversion = 4.6.0 -envlist = py38, py39, py310, py311, flake8, mypy +envlist = py38, py39, py310, py311, py312, py313, flake8, mypy isolated_build = true [gh-actions] python = 3.8: py38 3.9: py39 - 3.10: py310, mypy, flake8 + 3.10: py310 3.11: py311 + 3.12: py312, mypy, flake8 + 3.13: py313 [testenv] setenv = @@ -19,12 +21,12 @@ commands = pytest --cov-report html:htmlcov/pytest --basetemp={envtmpdir} [testenv:flake8] -basepython = python3.10 +basepython = python3.12 deps = flake8 commands = flake8 src tests [testenv:mypy] -basepython = python3.10 +basepython = python3.12 deps = -r{toxinidir}/requirements_dev.txt commands = mypy --install-types --non-interactive src \ No newline at end of file