diff --git a/pyproject.toml b/pyproject.toml index 6c096ce..607b28d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ radio = ["pyserial>=3.5"] hardware = [ "python-periphery>=2.4.1", + "gpiod>=2.4.0", "spidev>=3.5", "pyserial>=3.5", ] diff --git a/src/pymc_core/hardware/gpio_manager.py b/src/pymc_core/hardware/gpio_manager.py index 399a5da..f4a4db9 100644 --- a/src/pymc_core/hardware/gpio_manager.py +++ b/src/pymc_core/hardware/gpio_manager.py @@ -9,6 +9,7 @@ import sys import threading import time +import os from typing import Callable, Dict, Optional try: @@ -21,6 +22,15 @@ GPIO = None EdgeEvent = None +# Optional libgpiod support +try: + import gpiod + + GPIOD_AVAILABLE = True +except Exception: + GPIOD_AVAILABLE = False + gpiod = None + class GPIOImportError(ImportError): """Raised when GPIO functionality is used without python-periphery""" @@ -44,19 +54,106 @@ def __init__(self): class GPIOPinManager: """Manages GPIO pins abstraction using Linux GPIO character device interface""" - def __init__(self, gpio_chip: str = "/dev/gpiochip0"): + def __init__(self, gpio_chip: str = "/dev/gpiochip0", backend: str = "auto"): """ Initialize GPIO Pin Manager Args: gpio_chip: Path to GPIO chip device (default: /dev/gpiochip0) Set to "auto" to auto-detect first available chip + backend: 'periphery', 'gpiod', or 'auto' to select backend Raises: GPIOImportError: If python-periphery is not available """ - if not PERIPHERY_AVAILABLE: - raise GPIOImportError() + # Determine backend to use + self._backend = backend + if backend == "auto": + if PERIPHERY_AVAILABLE: + self._backend = "periphery" + elif GPIOD_AVAILABLE: + class GpiodGPIO: + def __init__(self, chip_path, lineoffset, direction, bias=None, edge=None): + # chip_path is like '/dev/gpiochip0' — use it directly + try: + self._chip = gpiod.Chip(chip_path) + except Exception as e: + raise FileNotFoundError(f"gpiod Chip '{chip_path}' not found: {e}") from e + + self._line = self._chip.get_line(lineoffset) + self.direction = direction + self._consumer = "pymc_core" + + requested = False + try: + LineRequest = getattr(gpiod, "LineRequest", None) + if LineRequest is not None: + req = LineRequest() + if hasattr(req, "consumer"): + req.consumer = self._consumer + if hasattr(req, "request_type"): + req.request_type = ( + getattr(gpiod, "LINE_REQ_DIR_OUT", None) + if direction == "out" + else getattr(gpiod, "LINE_REQ_DIR_IN", None) + ) + try: + self._line.request(req) + requested = True + except Exception: + pass + except Exception: + pass + + if not requested: + try: + req_type = ( + getattr(gpiod, "LINE_REQ_DIR_OUT", None) + if direction == "out" + else getattr(gpiod, "LINE_REQ_DIR_IN", None) + ) + if req_type is None: + req_type = 1 if direction == "out" else 0 + try: + self._line.request(consumer=self._consumer, type=req_type) + requested = True + except Exception: + # try request with older API + try: + self._line.request(req_type) + requested = True + except Exception: + requested = False + except Exception: + requested = False + + if not requested: + raise RuntimeError( + "Unsupported gpiod Python library on this system. Please install a compatible python-libgpiod (v2.4) or adjust the wrapper." + ) + + def write(self, value: bool): + self._line.set_value(1 if value else 0) + + def read(self) -> bool: + return bool(self._line.get_value()) + + def close(self): + try: + self._line.release() + except Exception: + pass + + def poll(self, timeout): + return False + + def read_event(self): + return None + + # make the module-level GPIO name point to the wrapper so rest of code can instantiate + globals()["GPIO"] = GpiodGPIO + + # If periphery is used, ensure it was already imported; otherwise above raised self._gpio_chip = self._resolve_gpio_chip(gpio_chip) self._pins: Dict[int, GPIO] = {} @@ -125,8 +222,10 @@ def setup_output_pin(self, pin_number: int, initial_value: bool = False) -> bool print("━" * 60) sys.exit(1) else: - logger.error(f"Failed to setup output pin {pin_number}: {e}") - print(f"\nFATAL: Cannot setup GPIO pin {pin_number}") + logger.error( + f"Failed to setup output pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}" + ) + print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})") print("━" * 60) print(f"Error: {e}") print("\nThe system cannot function without GPIO access.") @@ -161,9 +260,17 @@ def setup_input_pin( # Open GPIO pin as input with edge detection if callback provided if callback: - gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising") - self._input_callbacks[pin_number] = callback - self._start_edge_detection(pin_number) + # For gpiod backend, libgpiod does not provide the same edge API here + # so we open a plain input and use a polling thread to detect edges. + if self._backend == "gpiod": + gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias) + self._input_callbacks[pin_number] = callback + # start polling-based detection + self._start_polling_detection(pin_number) + else: + gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising") + self._input_callbacks[pin_number] = callback + self._start_edge_detection(pin_number) else: # No callback, just simple input gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias) @@ -196,8 +303,10 @@ def setup_input_pin( print("━" * 60) sys.exit(1) else: - logger.error(f"Failed to setup input pin {pin_number}: {e}") - print(f"\nFATAL: Cannot setup GPIO pin {pin_number}") + logger.error( + f"Failed to setup input pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}" + ) + print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})") print("━" * 60) print(f"Error: {e}") print("\nThe system cannot function without GPIO access.") @@ -233,14 +342,19 @@ def setup_interrupt_pin( # Determine bias setting bias = "pull_up" if pull_up else "default" - # Open GPIO pin as input with edge detection on rising edge - gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising") - self._pins[pin_number] = gpio - - # Setup callback with async edge monitoring - if callback: - self._input_callbacks[pin_number] = callback - self._start_edge_detection(pin_number) + # Open GPIO pin as input with edge detection on rising edge or polling for gpiod + if self._backend == "gpiod": + gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias) + self._pins[pin_number] = gpio + if callback: + self._input_callbacks[pin_number] = callback + self._start_polling_detection(pin_number) + else: + gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising") + self._pins[pin_number] = gpio + if callback: + self._input_callbacks[pin_number] = callback + self._start_edge_detection(pin_number) logger.debug( f"Interrupt pin {pin_number} configured " @@ -266,8 +380,10 @@ def setup_interrupt_pin( print("━" * 60) sys.exit(1) else: - logger.error(f"Failed to setup interrupt pin {pin_number}: {e}") - print(f"\nFATAL: Cannot setup GPIO pin {pin_number}") + logger.error( + f"Failed to setup interrupt pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}" + ) + print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})") print("━" * 60) print(f"Error: {e}") print("\nThe system cannot function without GPIO access.") @@ -289,6 +405,53 @@ def _start_edge_detection(self, pin_number: int) -> None: self._edge_threads[pin_number] = thread logger.debug(f"Edge detection thread started for pin {pin_number}") + def _start_polling_detection(self, pin_number: int, interval: float = 0.02) -> None: + """Start a polling thread to detect rising edges for GPIO lines (used with gpiod backend).""" + stop_event = threading.Event() + self._edge_stop_events[pin_number] = stop_event + + thread = threading.Thread( + target=self._monitor_polling, + args=(pin_number, stop_event, interval), + daemon=True, + name=f"GPIO-Poll-{pin_number}", + ) + thread.start() + self._edge_threads[pin_number] = thread + logger.debug(f"Polling detection thread started for pin {pin_number} (interval={interval}s)") + + def _monitor_polling(self, pin_number: int, stop_event: threading.Event, interval: float) -> None: + """Poll input state and invoke callback on rising edge (low->high).""" + try: + gpio = self._pins.get(pin_number) + if not gpio: + return + + # initialize last_state + try: + last_state = bool(gpio.read()) + except Exception: + last_state = False + + while not stop_event.is_set() and pin_number in self._pins: + try: + current = bool(self._pins[pin_number].read()) + if current and not last_state: + callback = self._input_callbacks.get(pin_number) + if callback: + try: + callback() + except Exception: + pass + last_state = current + time.sleep(interval) + except Exception: + if not stop_event.is_set(): + time.sleep(0.1) + + except Exception as e: + logger.error(f"Polling detection error for pin {pin_number}: {e}") + def _monitor_edge_events(self, pin_number: int, stop_event: threading.Event) -> None: """Monitor hardware edge events using poll() for interrupts""" try: diff --git a/src/pymc_core/hardware/sx1262_wrapper.py b/src/pymc_core/hardware/sx1262_wrapper.py index 2cce390..6beb485 100644 --- a/src/pymc_core/hardware/sx1262_wrapper.py +++ b/src/pymc_core/hardware/sx1262_wrapper.py @@ -31,6 +31,8 @@ def __init__( bus_id: int = 0, cs_id: int = 0, cs_pin: int = -1, + gpio_chip: int = 0, + use_gpiod_backend: bool = False, reset_pin: int = 18, busy_pin: int = 20, irq_pin: int = 16, @@ -56,7 +58,9 @@ def __init__( Args: bus_id: SPI bus ID (default: 0) cs_id: SPI chip select ID (default: 0) - cs_pin: Manual CS GPIO pin (-1 = use hardware CS, e.g. 21 for Waveshare HAT) + cs_pin: Manual CS GPIO pin (-1 = use hardware CS, e.g. 21 for Waveshare HAT) + gpio_chip: GPIO chip select ID (default: 0) + use_gpiod_backend: Use alternative backend for GPIO support (default: False) reset_pin: GPIO pin for reset (default: 18) busy_pin: GPIO pin for busy signal (default: 20) irq_pin: GPIO pin for interrupt (default: 16) @@ -74,7 +78,7 @@ def __init__( is_waveshare: Use alternate initialization needed for Waveshare HAT use_dio3_tcxo: Enable DIO3 TCXO control (default: False) dio3_tcxo_voltage: TCXO reference voltage in volts (default: 1.8) - use_dio2_rf: Enable DIO2 as RF switch control (default: False) + use_dio2_rf: Enable DIO2 as RF switch control (default: False) """ # Check if there's already an active instance and clean it up if SX1262Radio._active_instance is not None: @@ -88,6 +92,8 @@ def __init__( self.bus_id = bus_id self.cs_id = cs_id self.cs_pin = cs_pin + self.gpio_chip = gpio_chip + self.use_gpiod_backend = use_gpiod_backend self.reset_pin = reset_pin self.busy_pin = busy_pin self.irq_pin_number = irq_pin # Store pin number @@ -119,7 +125,8 @@ def __init__( self._tx_lock = asyncio.Lock() # GPIO management - self._gpio_manager = GPIOPinManager() + backend = "gpiod" if self.use_gpiod_backend else "auto" + self._gpio_manager = GPIOPinManager(backend=backend, gpio_chip=f"/dev/gpiochip{self.gpio_chip}") self._interrupt_setup = False self._txen_pin_setup = False self._txled_pin_setup = False