From 5fa070541d68b3924f664ab01d87a5e7e68b2851 Mon Sep 17 00:00:00 2001 From: theshaun Date: Fri, 23 Jan 2026 18:13:34 +1000 Subject: [PATCH 1/3] Add support for setting gpiochip and leveraging the gpiod backend on systems that don't support GPIO CDEV2 --- src/pymc_core/hardware/gpio_manager.py | 216 ++++++++++++++++++++--- src/pymc_core/hardware/sx1262_wrapper.py | 13 +- 2 files changed, 206 insertions(+), 23 deletions(-) diff --git a/src/pymc_core/hardware/gpio_manager.py b/src/pymc_core/hardware/gpio_manager.py index 399a5da..0dc367d 100644 --- a/src/pymc_core/hardware/gpio_manager.py +++ b/src/pymc_core/hardware/gpio_manager.py @@ -9,6 +9,7 @@ import sys import threading import time +import os from typing import Callable, Dict, Optional try: @@ -21,6 +22,15 @@ GPIO = None EdgeEvent = None +# Optional libgpiod support +try: + import gpiod + + GPIOD_AVAILABLE = True +except Exception: + GPIOD_AVAILABLE = False + gpiod = None + class GPIOImportError(ImportError): """Raised when GPIO functionality is used without python-periphery""" @@ -44,19 +54,119 @@ def __init__(self): class GPIOPinManager: """Manages GPIO pins abstraction using Linux GPIO character device interface""" - def __init__(self, gpio_chip: str = "/dev/gpiochip0"): + def __init__(self, gpio_chip: str = "/dev/gpiochip0", backend: str = "auto"): """ Initialize GPIO Pin Manager Args: gpio_chip: Path to GPIO chip device (default: /dev/gpiochip0) Set to "auto" to auto-detect first available chip + backend: 'periphery', 'gpiod', or 'auto' to select backend Raises: GPIOImportError: If python-periphery is not available """ - if not PERIPHERY_AVAILABLE: - raise GPIOImportError() + # Determine backend to use + self._backend = backend + if backend == "auto": + if PERIPHERY_AVAILABLE: + self._backend = "periphery" + elif GPIOD_AVAILABLE: + class GpiodGPIO: + def __init__(self, chip_path, lineoffset, direction, bias=None, edge=None): + # chip_path is like '/dev/gpiochip0' — use it directly + try: + self._chip = gpiod.Chip(chip_path) + except Exception as e: + raise FileNotFoundError(f"gpiod Chip '{chip_path}' not found: {e}") from e + + self._line = self._chip.get_line(lineoffset) + self.direction = direction + self._consumer = "pymc_core" + + # Request line for input or output using libgpiod v2 LineRequest if available + requested = False + # Attempt v2 API: gpiod.LineRequest + try: + LineRequest = getattr(gpiod, "LineRequest", None) + if LineRequest is not None: + req = LineRequest() + # set consumer if attribute exists + if hasattr(req, "consumer"): + req.consumer = self._consumer + # set request type constant if present + if hasattr(req, "request_type"): + # prefer named constants if provided + req.request_type = ( + getattr(gpiod, "LINE_REQ_DIR_OUT", None) + if direction == "out" + else getattr(gpiod, "LINE_REQ_DIR_IN", None) + ) + # issue request + try: + self._line.request(req) + requested = True + except Exception: + # some bindings expect different method signature + pass + except Exception: + pass + + # Fallback: older style line.request(consumer=..., type=...) + if not requested: + try: + req_type = ( + getattr(gpiod, "LINE_REQ_DIR_OUT", None) + if direction == "out" + else getattr(gpiod, "LINE_REQ_DIR_IN", None) + ) + if req_type is None: + # fallback integer defaults + req_type = 1 if direction == "out" else 0 + # try request with kwargs + try: + self._line.request(consumer=self._consumer, type=req_type) + requested = True + except Exception: + # try request with positional or older API + try: + self._line.request(req_type) + requested = True + except Exception: + requested = False + except Exception: + requested = False + + if not requested: + raise RuntimeError( + "Unsupported gpiod Python API on this system. Please install a compatible python-libgpiod (v2.4) or adjust the wrapper." + ) + + def write(self, value: bool): + # set_value exists in both v1 and v2 bindings + self._line.set_value(1 if value else 0) + + def read(self) -> bool: + # get_value exists in both v1 and v2 bindings + return bool(self._line.get_value()) + + def close(self): + try: + self._line.release() + except Exception: + pass + + # Provide no-op poll/read_event for compatibility + def poll(self, timeout): + return False + + def read_event(self): + return None + + # make the module-level GPIO name point to the wrapper so rest of code can instantiate + globals()["GPIO"] = GpiodGPIO + + # If periphery is used, ensure it was already imported; otherwise above raised self._gpio_chip = self._resolve_gpio_chip(gpio_chip) self._pins: Dict[int, GPIO] = {} @@ -125,8 +235,10 @@ def setup_output_pin(self, pin_number: int, initial_value: bool = False) -> bool print("━" * 60) sys.exit(1) else: - logger.error(f"Failed to setup output pin {pin_number}: {e}") - print(f"\nFATAL: Cannot setup GPIO pin {pin_number}") + logger.error( + f"Failed to setup output pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}" + ) + print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})") print("━" * 60) print(f"Error: {e}") print("\nThe system cannot function without GPIO access.") @@ -161,9 +273,17 @@ def setup_input_pin( # Open GPIO pin as input with edge detection if callback provided if callback: - gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising") - self._input_callbacks[pin_number] = callback - self._start_edge_detection(pin_number) + # For gpiod backend, libgpiod does not provide the same edge API here + # so we open a plain input and use a polling thread to detect edges. + if self._backend == "gpiod": + gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias) + self._input_callbacks[pin_number] = callback + # start polling-based detection + self._start_polling_detection(pin_number) + else: + gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising") + self._input_callbacks[pin_number] = callback + self._start_edge_detection(pin_number) else: # No callback, just simple input gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias) @@ -196,8 +316,10 @@ def setup_input_pin( print("━" * 60) sys.exit(1) else: - logger.error(f"Failed to setup input pin {pin_number}: {e}") - print(f"\nFATAL: Cannot setup GPIO pin {pin_number}") + logger.error( + f"Failed to setup input pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}" + ) + print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})") print("━" * 60) print(f"Error: {e}") print("\nThe system cannot function without GPIO access.") @@ -233,14 +355,19 @@ def setup_interrupt_pin( # Determine bias setting bias = "pull_up" if pull_up else "default" - # Open GPIO pin as input with edge detection on rising edge - gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising") - self._pins[pin_number] = gpio - - # Setup callback with async edge monitoring - if callback: - self._input_callbacks[pin_number] = callback - self._start_edge_detection(pin_number) + # Open GPIO pin as input with edge detection on rising edge or polling for gpiod + if self._backend == "gpiod": + gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias) + self._pins[pin_number] = gpio + if callback: + self._input_callbacks[pin_number] = callback + self._start_polling_detection(pin_number) + else: + gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising") + self._pins[pin_number] = gpio + if callback: + self._input_callbacks[pin_number] = callback + self._start_edge_detection(pin_number) logger.debug( f"Interrupt pin {pin_number} configured " @@ -266,8 +393,10 @@ def setup_interrupt_pin( print("━" * 60) sys.exit(1) else: - logger.error(f"Failed to setup interrupt pin {pin_number}: {e}") - print(f"\nFATAL: Cannot setup GPIO pin {pin_number}") + logger.error( + f"Failed to setup interrupt pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}" + ) + print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})") print("━" * 60) print(f"Error: {e}") print("\nThe system cannot function without GPIO access.") @@ -289,6 +418,53 @@ def _start_edge_detection(self, pin_number: int) -> None: self._edge_threads[pin_number] = thread logger.debug(f"Edge detection thread started for pin {pin_number}") + def _start_polling_detection(self, pin_number: int, interval: float = 0.02) -> None: + """Start a polling thread to detect rising edges for GPIO lines (used with gpiod backend).""" + stop_event = threading.Event() + self._edge_stop_events[pin_number] = stop_event + + thread = threading.Thread( + target=self._monitor_polling, + args=(pin_number, stop_event, interval), + daemon=True, + name=f"GPIO-Poll-{pin_number}", + ) + thread.start() + self._edge_threads[pin_number] = thread + logger.debug(f"Polling detection thread started for pin {pin_number} (interval={interval}s)") + + def _monitor_polling(self, pin_number: int, stop_event: threading.Event, interval: float) -> None: + """Poll input state and invoke callback on rising edge (low->high).""" + try: + gpio = self._pins.get(pin_number) + if not gpio: + return + + # initialize last_state + try: + last_state = bool(gpio.read()) + except Exception: + last_state = False + + while not stop_event.is_set() and pin_number in self._pins: + try: + current = bool(self._pins[pin_number].read()) + if current and not last_state: + callback = self._input_callbacks.get(pin_number) + if callback: + try: + callback() + except Exception: + pass + last_state = current + time.sleep(interval) + except Exception: + if not stop_event.is_set(): + time.sleep(0.1) + + except Exception as e: + logger.error(f"Polling detection error for pin {pin_number}: {e}") + def _monitor_edge_events(self, pin_number: int, stop_event: threading.Event) -> None: """Monitor hardware edge events using poll() for interrupts""" try: diff --git a/src/pymc_core/hardware/sx1262_wrapper.py b/src/pymc_core/hardware/sx1262_wrapper.py index 2cce390..6beb485 100644 --- a/src/pymc_core/hardware/sx1262_wrapper.py +++ b/src/pymc_core/hardware/sx1262_wrapper.py @@ -31,6 +31,8 @@ def __init__( bus_id: int = 0, cs_id: int = 0, cs_pin: int = -1, + gpio_chip: int = 0, + use_gpiod_backend: bool = False, reset_pin: int = 18, busy_pin: int = 20, irq_pin: int = 16, @@ -56,7 +58,9 @@ def __init__( Args: bus_id: SPI bus ID (default: 0) cs_id: SPI chip select ID (default: 0) - cs_pin: Manual CS GPIO pin (-1 = use hardware CS, e.g. 21 for Waveshare HAT) + cs_pin: Manual CS GPIO pin (-1 = use hardware CS, e.g. 21 for Waveshare HAT) + gpio_chip: GPIO chip select ID (default: 0) + use_gpiod_backend: Use alternative backend for GPIO support (default: False) reset_pin: GPIO pin for reset (default: 18) busy_pin: GPIO pin for busy signal (default: 20) irq_pin: GPIO pin for interrupt (default: 16) @@ -74,7 +78,7 @@ def __init__( is_waveshare: Use alternate initialization needed for Waveshare HAT use_dio3_tcxo: Enable DIO3 TCXO control (default: False) dio3_tcxo_voltage: TCXO reference voltage in volts (default: 1.8) - use_dio2_rf: Enable DIO2 as RF switch control (default: False) + use_dio2_rf: Enable DIO2 as RF switch control (default: False) """ # Check if there's already an active instance and clean it up if SX1262Radio._active_instance is not None: @@ -88,6 +92,8 @@ def __init__( self.bus_id = bus_id self.cs_id = cs_id self.cs_pin = cs_pin + self.gpio_chip = gpio_chip + self.use_gpiod_backend = use_gpiod_backend self.reset_pin = reset_pin self.busy_pin = busy_pin self.irq_pin_number = irq_pin # Store pin number @@ -119,7 +125,8 @@ def __init__( self._tx_lock = asyncio.Lock() # GPIO management - self._gpio_manager = GPIOPinManager() + backend = "gpiod" if self.use_gpiod_backend else "auto" + self._gpio_manager = GPIOPinManager(backend=backend, gpio_chip=f"/dev/gpiochip{self.gpio_chip}") self._interrupt_setup = False self._txen_pin_setup = False self._txled_pin_setup = False From c0008cee6a166f8a7dfb37a06130eb0ffc14a86d Mon Sep 17 00:00:00 2001 From: theshaun Date: Fri, 23 Jan 2026 18:30:46 +1000 Subject: [PATCH 2/3] indent code correctly and tidy up comments --- src/pymc_core/hardware/gpio_manager.py | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/src/pymc_core/hardware/gpio_manager.py b/src/pymc_core/hardware/gpio_manager.py index 0dc367d..f4a4db9 100644 --- a/src/pymc_core/hardware/gpio_manager.py +++ b/src/pymc_core/hardware/gpio_manager.py @@ -84,35 +84,27 @@ def __init__(self, chip_path, lineoffset, direction, bias=None, edge=None): self.direction = direction self._consumer = "pymc_core" - # Request line for input or output using libgpiod v2 LineRequest if available requested = False - # Attempt v2 API: gpiod.LineRequest try: LineRequest = getattr(gpiod, "LineRequest", None) if LineRequest is not None: req = LineRequest() - # set consumer if attribute exists if hasattr(req, "consumer"): req.consumer = self._consumer - # set request type constant if present if hasattr(req, "request_type"): - # prefer named constants if provided req.request_type = ( getattr(gpiod, "LINE_REQ_DIR_OUT", None) if direction == "out" else getattr(gpiod, "LINE_REQ_DIR_IN", None) ) - # issue request try: self._line.request(req) requested = True except Exception: - # some bindings expect different method signature pass except Exception: pass - # Fallback: older style line.request(consumer=..., type=...) if not requested: try: req_type = ( @@ -121,14 +113,12 @@ def __init__(self, chip_path, lineoffset, direction, bias=None, edge=None): else getattr(gpiod, "LINE_REQ_DIR_IN", None) ) if req_type is None: - # fallback integer defaults req_type = 1 if direction == "out" else 0 - # try request with kwargs try: self._line.request(consumer=self._consumer, type=req_type) requested = True except Exception: - # try request with positional or older API + # try request with older API try: self._line.request(req_type) requested = True @@ -139,15 +129,13 @@ def __init__(self, chip_path, lineoffset, direction, bias=None, edge=None): if not requested: raise RuntimeError( - "Unsupported gpiod Python API on this system. Please install a compatible python-libgpiod (v2.4) or adjust the wrapper." + "Unsupported gpiod Python library on this system. Please install a compatible python-libgpiod (v2.4) or adjust the wrapper." ) def write(self, value: bool): - # set_value exists in both v1 and v2 bindings self._line.set_value(1 if value else 0) def read(self) -> bool: - # get_value exists in both v1 and v2 bindings return bool(self._line.get_value()) def close(self): @@ -156,15 +144,14 @@ def close(self): except Exception: pass - # Provide no-op poll/read_event for compatibility def poll(self, timeout): return False def read_event(self): return None - # make the module-level GPIO name point to the wrapper so rest of code can instantiate - globals()["GPIO"] = GpiodGPIO + # make the module-level GPIO name point to the wrapper so rest of code can instantiate + globals()["GPIO"] = GpiodGPIO # If periphery is used, ensure it was already imported; otherwise above raised From efa8cd5f83009c5abdffc356406517130d5951b4 Mon Sep 17 00:00:00 2001 From: theshaun Date: Fri, 23 Jan 2026 18:49:00 +1000 Subject: [PATCH 3/3] Add gpiod package... --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 6c096ce..607b28d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ radio = ["pyserial>=3.5"] hardware = [ "python-periphery>=2.4.1", + "gpiod>=2.4.0", "spidev>=3.5", "pyserial>=3.5", ]