Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
radio = ["pyserial>=3.5"]
hardware = [
"python-periphery>=2.4.1",
"gpiod>=2.4.0",
"spidev>=3.5",
"pyserial>=3.5",
]
Expand Down
203 changes: 183 additions & 20 deletions src/pymc_core/hardware/gpio_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import sys
import threading
import time
import os
from typing import Callable, Dict, Optional

try:
Expand All @@ -21,6 +22,15 @@
GPIO = None
EdgeEvent = None

# Optional libgpiod support
try:
import gpiod

GPIOD_AVAILABLE = True
except Exception:
GPIOD_AVAILABLE = False
gpiod = None

class GPIOImportError(ImportError):
"""Raised when GPIO functionality is used without python-periphery"""

Expand All @@ -44,19 +54,106 @@ def __init__(self):
class GPIOPinManager:
"""Manages GPIO pins abstraction using Linux GPIO character device interface"""

def __init__(self, gpio_chip: str = "/dev/gpiochip0"):
def __init__(self, gpio_chip: str = "/dev/gpiochip0", backend: str = "auto"):
"""
Initialize GPIO Pin Manager

Args:
gpio_chip: Path to GPIO chip device (default: /dev/gpiochip0)
Set to "auto" to auto-detect first available chip
backend: 'periphery', 'gpiod', or 'auto' to select backend

Raises:
GPIOImportError: If python-periphery is not available
"""
if not PERIPHERY_AVAILABLE:
raise GPIOImportError()
# Determine backend to use
self._backend = backend
if backend == "auto":
if PERIPHERY_AVAILABLE:
self._backend = "periphery"
elif GPIOD_AVAILABLE:
class GpiodGPIO:
def __init__(self, chip_path, lineoffset, direction, bias=None, edge=None):
# chip_path is like '/dev/gpiochip0' — use it directly
try:
self._chip = gpiod.Chip(chip_path)
except Exception as e:
raise FileNotFoundError(f"gpiod Chip '{chip_path}' not found: {e}") from e

self._line = self._chip.get_line(lineoffset)
self.direction = direction
self._consumer = "pymc_core"

requested = False
try:
LineRequest = getattr(gpiod, "LineRequest", None)
if LineRequest is not None:
req = LineRequest()
if hasattr(req, "consumer"):
req.consumer = self._consumer
if hasattr(req, "request_type"):
req.request_type = (
getattr(gpiod, "LINE_REQ_DIR_OUT", None)
if direction == "out"
else getattr(gpiod, "LINE_REQ_DIR_IN", None)
)
try:
self._line.request(req)
requested = True
except Exception:
pass
except Exception:
pass

if not requested:
try:
req_type = (
getattr(gpiod, "LINE_REQ_DIR_OUT", None)
if direction == "out"
else getattr(gpiod, "LINE_REQ_DIR_IN", None)
)
if req_type is None:
req_type = 1 if direction == "out" else 0
try:
self._line.request(consumer=self._consumer, type=req_type)
requested = True
except Exception:
# try request with older API
try:
self._line.request(req_type)
requested = True
except Exception:
requested = False
except Exception:
requested = False

if not requested:
raise RuntimeError(
"Unsupported gpiod Python library on this system. Please install a compatible python-libgpiod (v2.4) or adjust the wrapper."
)

def write(self, value: bool):
self._line.set_value(1 if value else 0)

def read(self) -> bool:
return bool(self._line.get_value())

def close(self):
try:
self._line.release()
except Exception:
pass

def poll(self, timeout):
return False

def read_event(self):
return None

# make the module-level GPIO name point to the wrapper so rest of code can instantiate
globals()["GPIO"] = GpiodGPIO

# If periphery is used, ensure it was already imported; otherwise above raised

self._gpio_chip = self._resolve_gpio_chip(gpio_chip)
self._pins: Dict[int, GPIO] = {}
Expand Down Expand Up @@ -125,8 +222,10 @@ def setup_output_pin(self, pin_number: int, initial_value: bool = False) -> bool
print("━" * 60)
sys.exit(1)
else:
logger.error(f"Failed to setup output pin {pin_number}: {e}")
print(f"\nFATAL: Cannot setup GPIO pin {pin_number}")
logger.error(
f"Failed to setup output pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}"
)
print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})")
print("━" * 60)
print(f"Error: {e}")
print("\nThe system cannot function without GPIO access.")
Expand Down Expand Up @@ -161,9 +260,17 @@ def setup_input_pin(

# Open GPIO pin as input with edge detection if callback provided
if callback:
gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising")
self._input_callbacks[pin_number] = callback
self._start_edge_detection(pin_number)
# For gpiod backend, libgpiod does not provide the same edge API here
# so we open a plain input and use a polling thread to detect edges.
if self._backend == "gpiod":
gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias)
self._input_callbacks[pin_number] = callback
# start polling-based detection
self._start_polling_detection(pin_number)
else:
gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising")
self._input_callbacks[pin_number] = callback
self._start_edge_detection(pin_number)
else:
# No callback, just simple input
gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias)
Expand Down Expand Up @@ -196,8 +303,10 @@ def setup_input_pin(
print("━" * 60)
sys.exit(1)
else:
logger.error(f"Failed to setup input pin {pin_number}: {e}")
print(f"\nFATAL: Cannot setup GPIO pin {pin_number}")
logger.error(
f"Failed to setup input pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}"
)
print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})")
print("━" * 60)
print(f"Error: {e}")
print("\nThe system cannot function without GPIO access.")
Expand Down Expand Up @@ -233,14 +342,19 @@ def setup_interrupt_pin(
# Determine bias setting
bias = "pull_up" if pull_up else "default"

# Open GPIO pin as input with edge detection on rising edge
gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising")
self._pins[pin_number] = gpio

# Setup callback with async edge monitoring
if callback:
self._input_callbacks[pin_number] = callback
self._start_edge_detection(pin_number)
# Open GPIO pin as input with edge detection on rising edge or polling for gpiod
if self._backend == "gpiod":
gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias)
self._pins[pin_number] = gpio
if callback:
self._input_callbacks[pin_number] = callback
self._start_polling_detection(pin_number)
else:
gpio = GPIO(self._gpio_chip, pin_number, "in", bias=bias, edge="rising")
self._pins[pin_number] = gpio
if callback:
self._input_callbacks[pin_number] = callback
self._start_edge_detection(pin_number)

logger.debug(
f"Interrupt pin {pin_number} configured "
Expand All @@ -266,8 +380,10 @@ def setup_interrupt_pin(
print("━" * 60)
sys.exit(1)
else:
logger.error(f"Failed to setup interrupt pin {pin_number}: {e}")
print(f"\nFATAL: Cannot setup GPIO pin {pin_number}")
logger.error(
f"Failed to setup interrupt pin {pin_number} on {self._gpio_chip} (backend={self._backend}): {e}"
)
print(f"\nFATAL: Cannot setup GPIO pin {pin_number} on {self._gpio_chip} (backend={self._backend})")
print("━" * 60)
print(f"Error: {e}")
print("\nThe system cannot function without GPIO access.")
Expand All @@ -289,6 +405,53 @@ def _start_edge_detection(self, pin_number: int) -> None:
self._edge_threads[pin_number] = thread
logger.debug(f"Edge detection thread started for pin {pin_number}")

def _start_polling_detection(self, pin_number: int, interval: float = 0.02) -> None:
"""Start a polling thread to detect rising edges for GPIO lines (used with gpiod backend)."""
stop_event = threading.Event()
self._edge_stop_events[pin_number] = stop_event

thread = threading.Thread(
target=self._monitor_polling,
args=(pin_number, stop_event, interval),
daemon=True,
name=f"GPIO-Poll-{pin_number}",
)
thread.start()
self._edge_threads[pin_number] = thread
logger.debug(f"Polling detection thread started for pin {pin_number} (interval={interval}s)")

def _monitor_polling(self, pin_number: int, stop_event: threading.Event, interval: float) -> None:
"""Poll input state and invoke callback on rising edge (low->high)."""
try:
gpio = self._pins.get(pin_number)
if not gpio:
return

# initialize last_state
try:
last_state = bool(gpio.read())
except Exception:
last_state = False

while not stop_event.is_set() and pin_number in self._pins:
try:
current = bool(self._pins[pin_number].read())
if current and not last_state:
callback = self._input_callbacks.get(pin_number)
if callback:
try:
callback()
except Exception:
pass
last_state = current
time.sleep(interval)
except Exception:
if not stop_event.is_set():
time.sleep(0.1)

except Exception as e:
logger.error(f"Polling detection error for pin {pin_number}: {e}")

def _monitor_edge_events(self, pin_number: int, stop_event: threading.Event) -> None:
"""Monitor hardware edge events using poll() for interrupts"""
try:
Expand Down
13 changes: 10 additions & 3 deletions src/pymc_core/hardware/sx1262_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def __init__(
bus_id: int = 0,
cs_id: int = 0,
cs_pin: int = -1,
gpio_chip: int = 0,
use_gpiod_backend: bool = False,
reset_pin: int = 18,
busy_pin: int = 20,
irq_pin: int = 16,
Expand All @@ -56,7 +58,9 @@ def __init__(
Args:
bus_id: SPI bus ID (default: 0)
cs_id: SPI chip select ID (default: 0)
cs_pin: Manual CS GPIO pin (-1 = use hardware CS, e.g. 21 for Waveshare HAT)
cs_pin: Manual CS GPIO pin (-1 = use hardware CS, e.g. 21 for Waveshare HAT)
gpio_chip: GPIO chip select ID (default: 0)
use_gpiod_backend: Use alternative backend for GPIO support (default: False)
reset_pin: GPIO pin for reset (default: 18)
busy_pin: GPIO pin for busy signal (default: 20)
irq_pin: GPIO pin for interrupt (default: 16)
Expand All @@ -74,7 +78,7 @@ def __init__(
is_waveshare: Use alternate initialization needed for Waveshare HAT
use_dio3_tcxo: Enable DIO3 TCXO control (default: False)
dio3_tcxo_voltage: TCXO reference voltage in volts (default: 1.8)
use_dio2_rf: Enable DIO2 as RF switch control (default: False)
use_dio2_rf: Enable DIO2 as RF switch control (default: False)
"""
# Check if there's already an active instance and clean it up
if SX1262Radio._active_instance is not None:
Expand All @@ -88,6 +92,8 @@ def __init__(
self.bus_id = bus_id
self.cs_id = cs_id
self.cs_pin = cs_pin
self.gpio_chip = gpio_chip
self.use_gpiod_backend = use_gpiod_backend
self.reset_pin = reset_pin
self.busy_pin = busy_pin
self.irq_pin_number = irq_pin # Store pin number
Expand Down Expand Up @@ -119,7 +125,8 @@ def __init__(
self._tx_lock = asyncio.Lock()

# GPIO management
self._gpio_manager = GPIOPinManager()
backend = "gpiod" if self.use_gpiod_backend else "auto"
self._gpio_manager = GPIOPinManager(backend=backend, gpio_chip=f"/dev/gpiochip{self.gpio_chip}")
self._interrupt_setup = False
self._txen_pin_setup = False
self._txled_pin_setup = False
Expand Down