Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 83 additions & 19 deletions hardware.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,112 @@
import logging
import serial # for Arduino serial communication
import time


class HardwareError(Exception):
"""Base class for hardware-related errors."""


class HardwareConnectionError(HardwareError):
"""Raised when hardware connection is unavailable or fails."""


class HardwareWriteError(HardwareError):
"""Raised when a write to hardware fails."""


class HardwareReadError(HardwareError):
"""Raised when a read from hardware fails."""


class HardwareTimeoutError(HardwareError):
"""Raised when a hardware operation times out."""

class Arduino:
def __init__(self, port="COM3", baudrate=9600, timeout=1):
def __init__(
self,
port="COM3",
baudrate=9600,
timeout=1,
read_timeout=1.0,
write_timeout=1.0,
logger=None,
):
self.logger = logger or logging.getLogger(__name__)
self.read_timeout = read_timeout
self.write_timeout = write_timeout
try:
self.conn = serial.Serial(port, baudrate, timeout=timeout)
time.sleep(2) # wait for Arduino to reset
print(f"[Arduino] Connected to {port} at {baudrate} baud.")
except Exception as e:
print("[Arduino] Connection failed:", e)
self.logger.info(
"[Arduino] Connected to %s at %s baud.", port, baudrate
)
except Exception as exc:
self.logger.error("[Arduino] Connection failed: %s", exc)
self.conn = None

def write(self, message):
def write(self, message, timeout=None):
"""Send a message to Arduino."""
if self.conn:
self.conn.write(str(message).encode())
print(f"[Arduino] Sent: {message}")
if not self.conn:
raise HardwareConnectionError("Arduino connection is not available.")

effective_timeout = self.write_timeout if timeout is None else timeout
previous_timeout = self.conn.write_timeout
try:
if effective_timeout is not None:
self.conn.write_timeout = effective_timeout
payload = str(message).encode()
bytes_written = self.conn.write(payload)
return {
"ok": True,
"bytes_written": bytes_written,
"message": message,
}
except Exception as exc:
raise HardwareWriteError(f"Failed to write to Arduino: {exc}") from exc
finally:
self.conn.write_timeout = previous_timeout

def read(self):
def read(self, timeout=None, poll_interval=0.05):
"""Read a line from Arduino."""
if self.conn and self.conn.in_waiting > 0:
data = self.conn.readline().decode().strip()
print(f"[Arduino] Received: {data}")
return data
return None
if not self.conn:
raise HardwareConnectionError("Arduino connection is not available.")

max_wait = self.read_timeout if timeout is None else timeout
deadline = time.monotonic() + max_wait
try:
while time.monotonic() < deadline:
if self.conn.in_waiting > 0:
data = self.conn.readline().decode().strip()
Comment on lines +76 to +80

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Enforce per-call timeout on readline

When in_waiting > 0, the code calls self.conn.readline() without adjusting the serial port’s own read timeout, so readline() can still block until the port-level timeout or a newline is received. In scenarios where timeout/read_timeout is shorter than the serial port’s configured timeout (or when a partial line arrives without a newline), the method can exceed the requested max_wait and won’t raise HardwareTimeoutError on time, defeating the per-call timeout guarantee. Consider temporarily setting self.conn.timeout (or using a non-blocking read) to ensure the loop’s max_wait is the actual upper bound.

Useful? React with 👍 / 👎.

return {"ok": True, "data": data}
time.sleep(poll_interval)
except Exception as exc:
raise HardwareReadError(f"Failed to read from Arduino: {exc}") from exc

raise HardwareTimeoutError(
f"Timed out waiting for Arduino data after {max_wait} seconds."
)

def led_on(self, pin=13):
"""Turn ON LED at given pin (default 13)."""
self.write(f"LED_ON:{pin}")
return self.write(f"LED_ON:{pin}")

def led_off(self, pin=13):
"""Turn OFF LED at given pin (default 13)."""
self.write(f"LED_OFF:{pin}")
return self.write(f"LED_OFF:{pin}")

def motor_start(self, pin=9, speed=255):
"""Start motor at pin with speed (0-255)."""
self.write(f"MOTOR_START:{pin}:{speed}")
return self.write(f"MOTOR_START:{pin}:{speed}")

def motor_stop(self, pin=9):
"""Stop motor at pin."""
self.write(f"MOTOR_STOP:{pin}")
return self.write(f"MOTOR_STOP:{pin}")

def close(self):
"""Close Arduino connection."""
if self.conn:
self.conn.close()
print("[Arduino] Connection closed.")
self.logger.info("[Arduino] Connection closed.")
return {"ok": True, "message": "Connection closed."}
return {"ok": False, "message": "Connection already closed."}