Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 117 additions & 52 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import subprocess
import time
from contextlib import contextmanager
from datetime import datetime
from typing import ClassVar, Literal, Mapping, Optional, TypedDict

Expand Down Expand Up @@ -55,6 +56,29 @@ def set_chromedriver_pid(self, pid: int) -> None:
self.last_chromedriver_pid = pid


def cleanup_old_processes(debug_logger: Optional["DebugLogger"] = None) -> None:
"""Find and terminate any lingering chromedriver processes from previous runs.

This is a safeguard against resource leaks from crashed/orphaned sessions.
"""
print("Running pre-emptive cleanup of old chromedriver processes...")
if debug_logger:
debug_logger.log("cleanup_old_processes: START")
try:
# Use pkill to find and kill processes by name. -f checks the full command line.
# The [c]haracter class is a trick to prevent pkill from finding its own process.
command = "pkill -f '[c]hromedriver'"
result = subprocess.run(command, shell=True, check=False)
if debug_logger:
rc = getattr(result, "returncode", "NA")
debug_logger.log(f"cleanup_old_processes: END (rc={rc})")
print("Cleanup complete.")
except Exception as e:
if debug_logger:
debug_logger.log(f"cleanup_old_processes: ERROR {e}")
print(f"Notice: Pre-emptive cleanup failed. This is non-critical. Error: {e}")


def log_running_chromedriver_processes(debug_logger: DebugLogger) -> None:
"""Logs any active chromedriver processes using `ps`.

Expand All @@ -78,6 +102,56 @@ def log_running_chromedriver_processes(debug_logger: DebugLogger) -> None:
debug_logger.log(f"Error while checking chromedriver processes: {e}")


@contextmanager
def managed_webdriver_session(chrome_options: Options, debug_logger: DebugLogger):
"""A self-cleaning context manager for the Selenium WebDriver.

Sets up the driver and guarantees the chromedriver service process is
terminated on exit, aggressively if needed to avoid orphans.
"""
service: ChromeService = ChromeService()
driver: Optional[WebDriver] = None
print("Setting up WebDriver for gateway tests...")
try:
driver = webdriver.Chrome(service=service, options=chrome_options)
# Track and log the chromedriver PID if available
try:
if service and service.process and service.process.pid:
debug_logger.set_chromedriver_pid(service.process.pid)
debug_logger.log(
f"WebDriver service started with PID: {service.process.pid}"
)
except Exception as e: # pragma: no cover - best effort logging
debug_logger.log(f"Unable to retrieve WebDriver PID: {e}")
yield driver
finally:
print("Shutting down WebDriver session...")
if driver:
debug_logger.log("WebDriver quit: START")
try:
driver.quit() # Graceful shutdown of browser and driver
finally:
debug_logger.log("WebDriver quit: END")
# Aggressive kill of chromedriver service process
if service and getattr(service, "process", None):
pid = getattr(service.process, "pid", None)
if pid:
debug_logger.log(
f"Forcefully terminating chromedriver service (PID: {pid})..."
)
try:
service.process.kill() # Force kill to prevent orphans
service.process.wait(timeout=5) # Confirm termination
debug_logger.log("Service terminated successfully.")
except Exception as e: # pragma: no cover - defensive
debug_logger.log(
"Notice: Could not kill service process; it may have already exited. "
f"Error: {e}"
)
# Verify process state after teardown
log_running_chromedriver_processes(debug_logger)


# --- Typing Models ---
class GatewayPingResults(TypedDict, total=False):
"""Structured results from gateway ping parsing."""
Expand Down Expand Up @@ -454,9 +528,6 @@ def run_local_ping_task(target: str) -> LocalPingResults:
return {}


# In main.py


def run_local_speed_test_task() -> Optional[SpeedResults]:
"""
Runs a local speed test with a retry mechanism, returning numerical
Expand Down Expand Up @@ -570,13 +641,27 @@ def run_wifi_diagnostics_task() -> WifiDiagnostics:
def find_value(key: str, text: str) -> str:
"""Helper to find values in the wdutil output using regex."""
match = re.search(rf"^\s*{key}\s*:\s*(.*)$", text, re.MULTILINE)
return match.group(1).strip() if match else "N/A"
if match:
# Strip trailing unit to normalize values like '864.0 Mbps' -> '864.0'
return match.group(1).strip().replace(" Mbps", "")
return "N/A"

results["wifi_rssi"] = find_value("RSSI", output)
results["wifi_noise"] = find_value("Noise", output)
results["wifi_tx_rate"] = find_value("TxRate", output)
results["wifi_channel"] = find_value("Channel", output)

# Try a list of possible keys for transmit rate to make it more universal
tx_rate_keys = ["Tx Rate", "TxRate", "Last Tx Rate", "Max PHY Rate"]
for key in tx_rate_keys:
tx_rate = find_value(key, output)
if tx_rate != "N/A":
# Found it, so we can stop looking
results["wifi_tx_rate"] = tx_rate
break
else:
# If the loop finishes without finding any key, default to N/A
results["wifi_tx_rate"] = "N/A"

except Exception as e:
print(f"Warning: Could not parse wdutil output. Error: {e}")

Expand Down Expand Up @@ -628,6 +713,8 @@ def perform_checks() -> None:
master_results: dict[str, str | float | int | None] = {}
debug_log = DebugLogger(start_time=time.time())
debug_log.log("perform_checks: START")
# Pre-emptive scavenger to ensure no orphaned chromedriver processes linger
cleanup_old_processes(debug_log)
print(
f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"Starting checks (Run #{run_counter})..."
Expand Down Expand Up @@ -676,56 +763,34 @@ def perform_checks() -> None:
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

driver: Optional[WebDriver] = None
# Use Selenium Manager to handle the driver
service: ChromeService = ChromeService()
try:
debug_log.log("Selenium setup: START")
print("Setting up WebDriver for gateway tests...")
driver = webdriver.Chrome(
service=service,
options=chrome_options,
)
# Track and log the chromedriver PID if available
try:
if service and service.process and service.process.pid:
debug_log.set_chromedriver_pid(service.process.pid)
debug_log.log(f"WebDriver service started with PID: {service.process.pid}")
except Exception as e:
debug_log.log(f"Unable to retrieve WebDriver PID: {e}")
debug_log.log("Selenium setup: END")
driver.get(config.GATEWAY_URL)
time.sleep(2)
debug_log.log("run_ping_test_task: START")
gateway_ping_results = run_ping_test_task(driver)
debug_log.log("run_ping_test_task: END")
if gateway_ping_results:
master_results.update(gateway_ping_results)
if should_run_gateway_speed_test:
debug_log.log("run_speed_test_task: START")
gateway_speed_results = run_speed_test_task(driver, DEVICE_ACCESS_CODE)
debug_log.log("run_speed_test_task: END")
if gateway_speed_results:
master_results.update(gateway_speed_results)
with managed_webdriver_session(chrome_options, debug_log) as driver:
debug_log.log("Selenium setup: END")
driver.get(config.GATEWAY_URL)
time.sleep(2)
debug_log.log("run_ping_test_task: START")
gateway_ping_results = run_ping_test_task(driver)
debug_log.log("run_ping_test_task: END")
if gateway_ping_results:
master_results.update(gateway_ping_results)
if should_run_gateway_speed_test:
debug_log.log("run_speed_test_task: START")
gateway_speed_results = run_speed_test_task(driver, DEVICE_ACCESS_CODE)
debug_log.log("run_speed_test_task: END")
if gateway_speed_results:
master_results.update(gateway_speed_results)
except Exception as e:
print(f"An error occurred during the gateway automation process: {e}")
if driver:
error_time = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_file = f"error_screenshot_{error_time}.png"
driver.save_screenshot(screenshot_file)
print(f"Saved screenshot to {screenshot_file} for debugging.")
finally:
if driver:
debug_log.log("WebDriver quit: START")
print("Closing WebDriver...")
driver.quit()
debug_log.log("WebDriver quit: END")
if service:
debug_log.log("WebDriver service.stop(): START")
service.stop()
debug_log.log("WebDriver service.stop(): END")
# Verify process state immediately after stopping service
log_running_chromedriver_processes(debug_log)
print(f"A critical error occurred in the WebDriver session: {e}")
# Best-effort screenshot if driver existed during context
try:
if 'driver' in locals() and driver:
error_time = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_file = f"error_screenshot_{error_time}.png"
driver.save_screenshot(screenshot_file)
print(f"Saved screenshot to {screenshot_file} for debugging.")
except Exception:
pass

debug_log.log("perform_checks: END")
log_results(master_results)
Expand Down
25 changes: 25 additions & 0 deletions tests/test_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import sys
from unittest.mock import patch

# Ensure the main module can be imported
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

import main


def test_cleanup_old_processes_invokes_pkill():
with patch("main.subprocess.run") as mock_run:
main.cleanup_old_processes()
mock_run.assert_called()
args, kwargs = mock_run.call_args
# Ensure pkill is targeted at chromedriver with -f and shell=True
assert "pkill -f '[c]hromedriver'" in args[0]
assert kwargs.get("shell") is True
assert kwargs.get("check") is False


def test_cleanup_old_processes_handles_exception():
with patch("main.subprocess.run", side_effect=RuntimeError("boom")):
# Should not raise
main.cleanup_old_processes()
28 changes: 28 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import sys
from unittest.mock import MagicMock, patch

import pytest

# Ensure the main module can be imported
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

Expand Down Expand Up @@ -170,3 +172,29 @@ def test_wifi_diagnostics_failure(mock_run):
assert results["wifi_rssi"] == "N/A"
assert results["wifi_noise"] == "N/A"
assert results["wifi_bssid"] == "N/A"


@patch("main.subprocess.run")
@pytest.mark.parametrize(
"tx_key",
[
"Tx Rate",
"TxRate",
"Last Tx Rate",
"Max PHY Rate",
],
)
def test_wifi_diagnostics_tx_rate_variants(mock_run, tx_key):
"""Ensure tx rate is parsed correctly across multiple wdutil key variants."""
# Build wdutil output with the variant key
wd_output = f"RSSI: -55\nNoise: -90\n{tx_key}: 866\nChannel: 149,80"
route_output = "gateway: 192.168.1.1"
arp_output = "? (192.168.1.1) at a1:b2:c3:d4:e5:f6 on en0 ifscope [ethernet]"
mock_run.side_effect = [
MagicMock(stdout=wd_output, returncode=0, stderr=""), # wdutil
MagicMock(stdout=route_output, returncode=0, stderr=""), # route
MagicMock(stdout="", returncode=0, stderr=""), # ping
MagicMock(stdout=arp_output, returncode=0, stderr=""), # arp
]
results = run_wifi_diagnostics_task()
assert results["wifi_tx_rate"] == "866"
26 changes: 18 additions & 8 deletions tests/test_main_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def mock_tasks():
patch("main.run_local_speed_test_task", return_value={}) as mock_local_speed,
patch("main.run_ping_test_task", return_value={}) as mock_gateway_ping,
patch("main.run_speed_test_task", return_value={}) as mock_gateway_speed,
patch("main.cleanup_old_processes") as mock_cleanup,
patch("main.log_results") as mock_log,
patch("main.webdriver.Chrome", return_value=MagicMock()) as mock_chrome,
patch("main.get_access_code", return_value="test-code") as mock_access_code,
Expand All @@ -37,6 +38,7 @@ def mock_tasks():
"local_speed": mock_local_speed,
"gateway_ping": mock_gateway_ping,
"gateway_speed": mock_gateway_speed,
"cleanup": mock_cleanup,
"log": mock_log,
"chrome": mock_chrome,
"access_code": mock_access_code,
Expand Down Expand Up @@ -102,17 +104,25 @@ def test_perform_checks_gateway_speed_test_interval(mock_tasks, monkeypatch):
perform_checks() # Run 1
mock_tasks["gateway_speed"].assert_not_called()

mock_tasks["gateway_speed"].reset_mock()

# Should run on second call (run #2)
perform_checks() # Run 2
mock_tasks["gateway_speed"].assert_called_once()
def test_perform_checks_calls_cleanup_first(mock_tasks, monkeypatch):
"""Ensure the pre-emptive cleanup runs at the beginning of perform_checks."""
# Configure toggles so at least one downstream task runs
monkeypatch.setattr(config_module, "RUN_LOCAL_PING_TEST", True)
monkeypatch.setattr(config_module, "RUN_LOCAL_GATEWAY_PING_TEST", False)
monkeypatch.setattr(config_module, "RUN_LOCAL_SPEED_TEST", False)

mock_tasks["gateway_speed"].reset_mock()
events: list[str] = []
mock_tasks["cleanup"].side_effect = lambda *_, **__: events.append("cleanup")
mock_tasks["wifi"].side_effect = lambda: (events.append("wifi"), {})[1]

# Should NOT run on third call (run #3)
perform_checks() # Run 3
mock_tasks["gateway_speed"].assert_not_called()
perform_checks()

# cleanup should be called and recorded before wifi diagnostics
assert mock_tasks["cleanup"].call_count == 1
assert events and events[0] == "cleanup"

# Do not assert interval behavior here; covered by dedicated test


def test_perform_checks_gateway_speed_test_disabled(mock_tasks, monkeypatch):
Expand Down
Loading