diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..402b97e --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,31 @@ +name: CI + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + lint-and-typecheck: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install uv + run: pip install uv + + - name: Sync dependencies + run: uv sync + + - name: Ruff (lint) + run: uv run ruff check + + - name: Type check (ty) + run: uv run ty check diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 054a390..28d1faa 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -51,6 +51,7 @@ import requests 3. Ensure all checks pass locally before opening a Pull Request: - `uv run ruff check --fix . && uv run ruff format .` + - (see [pyproject.toml](pyproject.toml) for ruff criteria) - `uv run ty check` - `uv run pytest` diff --git a/config.py b/config.py index 2b2bbb9..c90418d 100644 --- a/config.py +++ b/config.py @@ -9,6 +9,8 @@ RUN_INTERVAL_MINUTES: int = 5 # Set to True to run the browser in headless mode (no visible UI). HEADLESS_MODE: bool = True +# Set to True to enable verbose, high-resolution debug logging for troubleshooting. +ENABLE_DEBUG_LOGGING: bool = True # --- Gateway Configuration --- # The base URL for your AT&T gateway. diff --git a/main.py b/main.py index a4a7a0b..8033bdc 100644 --- a/main.py +++ b/main.py @@ -10,12 +10,13 @@ # Standard library imports import getpass import json +import logging import os import re import subprocess import time from datetime import datetime -from typing import Dict, Optional +from typing import ClassVar, Literal, Mapping, Optional, TypedDict # Third-party imports import schedule @@ -33,16 +34,98 @@ import config +# --- Debug Logger --- +class DebugLogger: + """High-resolution event and timing logger controlled by config.ENABLE_DEBUG_LOGGING.""" + + def __init__(self, start_time: float) -> None: + self.start_time = start_time + self.last_chromedriver_pid: Optional[int] = None + + def log(self, event_message: str) -> None: + if not getattr(config, "ENABLE_DEBUG_LOGGING", False): + return + now = datetime.now() + elapsed = time.time() - self.start_time + # Format timestamp with millisecond precision + ts = now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + print(f"[DEBUG | {ts} | +{elapsed:.3f}s] {event_message}") + + def set_chromedriver_pid(self, pid: int) -> None: + self.last_chromedriver_pid = pid + + +def log_running_chromedriver_processes(debug_logger: DebugLogger) -> None: + """Logs any active chromedriver processes using `ps`. + + Uses a shell pipeline to filter out the grep process itself. + """ + try: + cmd = "ps -ef | grep chromedriver | grep -v grep" + result = subprocess.run( + ["bash", "-lc", cmd], + capture_output=True, + text=True, + check=False, + ) + output = result.stdout.strip() + if output: + for line in output.splitlines(): + debug_logger.log(f"Found active chromedriver process: {line}") + else: + debug_logger.log("No active chromedriver processes found.") + except Exception as e: + debug_logger.log(f"Error while checking chromedriver processes: {e}") + + +# --- Typing Models --- +class GatewayPingResults(TypedDict, total=False): + """Structured results from gateway ping parsing.""" + + gateway_loss_percentage: float + gateway_rtt_avg_ms: float + + +class LocalPingResults(TypedDict, total=False): + """Structured results from local ping parsing.""" + + loss_percentage: float + rtt_avg_ms: float + ping_stddev: float + + +class SpeedResults(TypedDict, total=False): + """Structured results from speed tests (gateway or local).""" + + # From gateway speed test + downstream_speed: float + upstream_speed: float + # From local speed test + local_downstream_speed: float + local_upstream_speed: float + local_speedtest_jitter: float + + +class WifiDiagnostics(TypedDict, total=False): + """Structured Wi-Fi diagnostics values from local system utilities.""" + + wifi_rssi: str + wifi_noise: str + wifi_tx_rate: str + wifi_channel: str + wifi_bssid: str + + # --- ANSI Color Codes --- class Colors: """A class to hold ANSI color codes for terminal output.""" - RED = "\033[91m" - GREEN = "\033[92m" - YELLOW = "\033[93m" - CYAN = "\033[96m" - RESET = "\033[0m" - BOLD = "\033[1m" + RED: ClassVar[str] = "\033[91m" + GREEN: ClassVar[str] = "\033[92m" + YELLOW: ClassVar[str] = "\033[93m" + CYAN: ClassVar[str] = "\033[96m" + RESET: ClassVar[str] = "\033[0m" + BOLD: ClassVar[str] = "\033[1m" # Load environment variables @@ -64,9 +147,9 @@ def get_access_code() -> str: return entered_code -def parse_gateway_ping_results(full_results: str) -> dict[str, float]: +def parse_gateway_ping_results(full_results: str) -> GatewayPingResults: """Parses the full ping output from the GATEWAY, returning numerical values.""" - results: dict[str, float] = {} + results: GatewayPingResults = {} # Changed for strict typing loss_match = re.search(r"(\d+)% packet loss", full_results) if loss_match: results["gateway_loss_percentage"] = float(loss_match.group(1)) @@ -75,25 +158,22 @@ def parse_gateway_ping_results(full_results: str) -> dict[str, float]: if rtt_match: rtt_parts = rtt_match.group(1).split("/") if len(rtt_parts) >= 3: - # Extract the 'avg' value results["gateway_rtt_avg_ms"] = float(rtt_parts[1]) return results -def parse_local_ping_results(ping_output: str) -> dict[str, float]: +def parse_local_ping_results(ping_output: str) -> LocalPingResults: """ Parses local ping output, returning numerical values for key metrics. Focuses on packet loss percentage, average RTT, and standard deviation. """ - results: dict[str, float] = {} + results: LocalPingResults = {} # Changed for strict typing loss_match = re.search(r"(\d+(?:\.\d+)?)% packet loss", ping_output) if loss_match: results["loss_percentage"] = float(loss_match.group(1)) - rtt_match = re.search( - r"min/avg/max/(?:stddev|mdev)\s*=\s*([\d./]+)\s*ms", ping_output - ) + rtt_match = re.search(r"min/avg/max/(?:stddev|mdev)\s*=\s*([\d./]+)\s*ms", ping_output) if rtt_match: parts = rtt_match.group(1).split("/") if len(parts) == 4: @@ -103,7 +183,7 @@ def parse_local_ping_results(ping_output: str) -> dict[str, float]: return results -def log_results(all_data: dict[str, str | float | int | None]) -> None: +def log_results(all_data: Mapping[str, str | float | int | None]) -> None: """ Logs results to a CSV file and prints a color-coded summary to the console based on configured anomaly thresholds. @@ -137,9 +217,7 @@ def log_results(all_data: dict[str, str | float | int | None]) -> None: ] header = "Timestamp," + ",".join(data_points.keys()) + "\n" log_entry = timestamp + "," + ",".join(csv_values) + "\n" - write_header = ( - not os.path.exists(config.LOG_FILE) or os.path.getsize(config.LOG_FILE) == 0 - ) + write_header = not os.path.exists(config.LOG_FILE) or os.path.getsize(config.LOG_FILE) == 0 with open(config.LOG_FILE, "a") as f: if write_header: f.write(header) @@ -150,7 +228,7 @@ def format_value( value: Optional[float], unit: str, threshold: Optional[float], - comparison: str = "greater", + comparison: Literal["greater", "less"] = "greater", default_color: str = "", precision: int = 2, ) -> str: @@ -175,95 +253,89 @@ def format_value( # --- Print to Console --- print("\n--- Gateway Test Results ---") loss_pct = format_value( - data_points['Gateway_LossPercentage'], - '%', - config.PACKET_LOSS_THRESHOLD + data_points["Gateway_LossPercentage"], "%", config.PACKET_LOSS_THRESHOLD ) print(f" Packet Loss: {loss_pct}") - - rtt_avg = format_value( - data_points['Gateway_RTT_avg_ms'], - 'ms', - config.PING_RTT_THRESHOLD - ) + + rtt_avg = format_value(data_points["Gateway_RTT_avg_ms"], "ms", config.PING_RTT_THRESHOLD) print(f" WAN RTT (avg): {rtt_avg}") - + down_speed = format_value( - data_points['Gateway_Downstream_Mbps'], - 'Mbps', - config.GATEWAY_DOWNSTREAM_SPEED_THRESHOLD, - 'less' + data_points["Gateway_Downstream_Mbps"], + "Mbps", + config.GATEWAY_DOWNSTREAM_SPEED_THRESHOLD, + "less", ) print(f" Downstream Speed: {down_speed}") - + up_speed = format_value( - data_points['Gateway_Upstream_Mbps'], - 'Mbps', - config.GATEWAY_UPSTREAM_SPEED_THRESHOLD, - 'less' + data_points["Gateway_Upstream_Mbps"], + "Mbps", + config.GATEWAY_UPSTREAM_SPEED_THRESHOLD, + "less", ) print(f" Upstream Speed: {up_speed}") print("\n--- Local Machine Test Results ---") wan_loss = format_value( - data_points['Local_WAN_LossPercentage'], - '%', - config.PACKET_LOSS_THRESHOLD + data_points["Local_WAN_LossPercentage"], "%", config.PACKET_LOSS_THRESHOLD ) print(f" WAN Packet Loss: {wan_loss}") - - wan_rtt = format_value( - data_points['Local_WAN_RTT_avg_ms'], - 'ms', - config.PING_RTT_THRESHOLD - ) + + wan_rtt = format_value(data_points["Local_WAN_RTT_avg_ms"], "ms", config.PING_RTT_THRESHOLD) print(f" WAN RTT (avg): {wan_rtt}") - - jitter = format_value( - data_points['Local_WAN_Ping_StdDev'], - 'ms', - config.JITTER_THRESHOLD, - precision=3 + + wan_jitter = format_value( + data_points["Local_WAN_Ping_StdDev"], + "ms", + config.JITTER_THRESHOLD, + precision=3, ) - print(f" Ping Jitter (StdDev): {jitter}") - + print(f" WAN Jitter (StdDev): {wan_jitter}") + gw_loss = format_value( - data_points['Local_GW_LossPercentage'], - '%', - config.PACKET_LOSS_THRESHOLD + data_points["Local_GW_LossPercentage"], "%", config.PACKET_LOSS_THRESHOLD ) print(f" Gateway Packet Loss: {gw_loss}") - - # Special color for Local GW RTT + gw_rtt = format_value( - data_points['Local_GW_RTT_avg_ms'], - 'ms', - config.PING_RTT_THRESHOLD, - default_color=Colors.CYAN + data_points["Local_GW_RTT_avg_ms"], + "ms", + config.PING_RTT_THRESHOLD, + default_color=Colors.CYAN, ) print(f" Gateway RTT (avg): {gw_rtt}") - + + gw_jitter = format_value( + data_points["Local_GW_Ping_StdDev"], + "ms", + config.JITTER_THRESHOLD, + precision=3, + default_color=Colors.CYAN, + ) + print(f" Gateway Jitter (StdDev): {gw_jitter}") + local_down = format_value( - data_points['Local_Downstream_Mbps'], - 'Mbps', - config.LOCAL_DOWNSTREAM_SPEED_THRESHOLD, - 'less' + data_points["Local_Downstream_Mbps"], + "Mbps", + config.LOCAL_DOWNSTREAM_SPEED_THRESHOLD, + "less", ) print(f" Downstream Speed: {local_down}") - + local_up = format_value( - data_points['Local_Upstream_Mbps'], - 'Mbps', - config.LOCAL_UPSTREAM_SPEED_THRESHOLD, - 'less' + data_points["Local_Upstream_Mbps"], + "Mbps", + config.LOCAL_UPSTREAM_SPEED_THRESHOLD, + "less", ) print(f" Upstream Speed: {local_up}") - + speed_jitter = format_value( - data_points['Local_Speedtest_Jitter_ms'], - 'ms', - config.JITTER_THRESHOLD, - precision=3 + data_points["Local_Speedtest_Jitter_ms"], + "ms", + config.JITTER_THRESHOLD, + precision=3, ) print(f" Speedtest Jitter: {speed_jitter}") @@ -278,7 +350,7 @@ def format_value( print(f"Results appended to: {full_path}") -def run_ping_test_task(driver: WebDriver) -> Optional[Dict[str, float]]: +def run_ping_test_task(driver: WebDriver) -> Optional[GatewayPingResults]: """Runs the ping test on the gateway's diagnostics page and logs raw output.""" print("Navigating to gateway diagnostics page for ping test...") driver.get(config.DIAG_URL) @@ -286,9 +358,7 @@ def run_ping_test_task(driver: WebDriver) -> Optional[Dict[str, float]]: target_input = WebDriverWait(driver, 20).until( EC.visibility_of_element_located((By.ID, "webaddress")) ) - driver.execute_script( - f"arguments[0].value = '{config.PING_TARGET}';", target_input - ) + driver.execute_script(f"arguments[0].value = '{config.PING_TARGET}';", target_input) ping_button = driver.find_element(By.NAME, "Ping") driver.execute_script("arguments[0].click();", ping_button) print(f"Gateway ping test started for {config.PING_TARGET}.") @@ -315,9 +385,7 @@ def run_ping_test_task(driver: WebDriver) -> Optional[Dict[str, float]]: return None -def run_speed_test_task( - driver: WebDriver, access_code: str -) -> Optional[Dict[str, float]]: +def run_speed_test_task(driver: WebDriver, access_code: str) -> Optional[SpeedResults]: """ Automates the gateway speed test, returning numerical values for speeds. """ @@ -335,15 +403,13 @@ def run_speed_test_task( except TimeoutException: print("Already logged in or no password required for gateway speed test.") - run_button = WebDriverWait(driver, 15).until( - EC.element_to_be_clickable((By.NAME, "run")) - ) + run_button = WebDriverWait(driver, 15).until(EC.element_to_be_clickable((By.NAME, "run"))) run_button.click() print("Gateway speed test initiated. This will take up to 90 seconds...") WebDriverWait(driver, 90).until(EC.element_to_be_clickable((By.NAME, "run"))) print("Gateway speed test complete. Parsing results...") - results: Dict[str, float] = {} + results: SpeedResults = {} table = WebDriverWait(driver, 10).until( EC.visibility_of_element_located((By.CSS_SELECTOR, "table.grid.table100")) ) @@ -368,7 +434,7 @@ def run_speed_test_task( return None -def run_local_ping_task(target: str) -> Dict[str, float]: +def run_local_ping_task(target: str) -> LocalPingResults: """Runs a ping test from the local OS to the specified target.""" print(f"Running local ping test to {target}...") try: @@ -378,25 +444,27 @@ def run_local_ping_task(target: str) -> Dict[str, float]: print(f"Local ping to {target} complete.") return parse_local_ping_results(process.stdout) else: - print( - f"Warning: Local ping test to {target} failed. Stderr: {process.stderr}" - ) + print(f"Warning: Local ping test to {target} failed. Stderr: {process.stderr}") return {} except FileNotFoundError: - print( - "Error: 'ping' command not found. Please ensure it's in your system's PATH." - ) + print("Error: 'ping' command not found. Please ensure it's in your system's PATH.") return {} except Exception as e: print(f"An error occurred during local ping test to {target}: {e}") return {} -def run_local_speed_test_task() -> Optional[Dict[str, float]]: +# In main.py + + +def run_local_speed_test_task() -> Optional[SpeedResults]: """ - Runs a local speed test, returning numerical values for key metrics. + Runs a local speed test with a retry mechanism, returning numerical + values for key metrics. """ print("Running local speed test using the official Ookla CLI...") + max_retries = 3 + retry_delay_seconds = 10 ookla_path = None possible_paths = ["/opt/homebrew/bin/speedtest", "/usr/local/bin/speedtest"] @@ -408,68 +476,80 @@ def run_local_speed_test_task() -> Optional[Dict[str, float]]: if not ookla_path: print("\n---") print("Error: Could not find the Ookla 'speedtest' executable.") - print( - "Please ensure it is installed via Homebrew and located in one of these paths:" - ) + print("Please ensure it is installed via Homebrew and located in one of these paths:") print(f" {', '.join(possible_paths)}") print("Installation command: brew install speedtest") print("---\n") return None - try: - command = [ookla_path, "--accept-license", "--accept-gdpr", "--format=json"] - process = subprocess.run( - command, capture_output=True, text=True, timeout=120, check=True - ) - - json_output = None - for line in process.stdout.splitlines(): - if line.strip().startswith("{"): - json_output = line - break - - if not json_output: - print("Error: Could not find JSON in the speedtest command output.") - print(f"--- Raw STDOUT ---\n{process.stdout}\n--------------------") - if process.stderr: - print(f"--- Raw STDERR ---\n{process.stderr}\n--------------------") - return None - - results = json.loads(json_output) - - download_speed = ( - results.get("download", {}).get("bandwidth", 0) * 8 - ) / 1_000_000 - upload_speed = (results.get("upload", {}).get("bandwidth", 0) * 8) / 1_000_000 - jitter = results.get("ping", {}).get("jitter", 0.0) + for attempt in range(max_retries): + try: + command = [ + ookla_path, + "--accept-license", + "--accept-gdpr", + "--format=json", + ] + process = subprocess.run( + command, capture_output=True, text=True, timeout=120, check=True + ) - print("Local speed test complete.") + json_output = None + for line in process.stdout.splitlines(): + if line.strip().startswith("{"): + json_output = line + break + + if not json_output: + raise json.JSONDecodeError("No JSON found in speedtest output", process.stdout, 0) + + results = json.loads(json_output) + # Check for explicit error messages from the speedtest CLI + if "error" in results: + raise Exception(f"Speedtest CLI returned an error: {results['error']}") + + download_speed = (results.get("download", {}).get("bandwidth", 0) * 8) / 1_000_000 + upload_speed = (results.get("upload", {}).get("bandwidth", 0) * 8) / 1_000_000 + jitter = results.get("ping", {}).get("jitter", 0.0) + + print("Local speed test complete.") + return { + "local_downstream_speed": download_speed, + "local_upstream_speed": upload_speed, + "local_speedtest_jitter": jitter, + } + + except subprocess.CalledProcessError as e: + msg = ( + f"Warning (Attempt {attempt + 1}/{max_retries}): " + f"The 'speedtest' command failed with return code {e.returncode}." + ) + print(msg) + print(f"Stdout: {e.stdout}") + print(f"Stderr: {e.stderr}") + except json.JSONDecodeError as e: + msg = ( + f"Warning (Attempt {attempt + 1}/{max_retries}): " + "Could not parse JSON from speedtest." + ) + print(msg) + # The exception object in this case might contain the raw output + print(f"--- Raw STDOUT ---\n{e.doc}\n--------------------") + except Exception as e: + msg = ( + f"Warning (Attempt {attempt + 1}/{max_retries}): An unexpected error occurred: {e}" + ) + print(msg) - return { - "local_downstream_speed": download_speed, - "local_upstream_speed": upload_speed, - "local_speedtest_jitter": jitter, - } + if attempt < max_retries - 1: + print(f"Waiting {retry_delay_seconds} seconds before retrying...") + time.sleep(retry_delay_seconds) - except subprocess.CalledProcessError as e: - print(f"Error: The 'speedtest' command failed with return code {e.returncode}.") - print(f"Stdout: {e.stdout}") - print(f"Stderr: {e.stderr}") - return None - except subprocess.TimeoutExpired: - print("Error: The speed test command timed out after 120 seconds.") - return None - except json.JSONDecodeError: - print("Error: Could not parse JSON output from the 'speedtest' command.") - if "process" in locals(): - print(f"--- Raw STDOUT ---\n{process.stdout}\n--------------------") - return None - except Exception as e: - print(f"An unexpected error occurred during the local speed test: {e}") - return None + print("Error: Local speed test failed after multiple attempts.") + return None -def run_wifi_diagnostics_task() -> dict[str, str]: +def run_wifi_diagnostics_task() -> WifiDiagnostics: """ Uses a hybrid approach: wdutil for live Wi-Fi stats (signal, etc.) and arp for a reliable BSSID (via the default gateway's MAC address). @@ -478,15 +558,13 @@ def run_wifi_diagnostics_task() -> dict[str, str]: A dictionary of Wi-Fi metrics. """ print("Running local Wi-Fi diagnostics...") - results: dict[str, str] = {} + results: WifiDiagnostics = {} # --- Part 1: Get Signal, Noise, etc. from wdutil --- try: # This command requires the sudoers file to be configured for NOPASSWD. command = ["sudo", "wdutil", "info"] - process = subprocess.run( - command, capture_output=True, text=True, timeout=10, check=True - ) + process = subprocess.run(command, capture_output=True, text=True, timeout=10, check=True) output = process.stdout def find_value(key: str, text: str) -> str: @@ -508,9 +586,7 @@ def find_value(key: str, text: str) -> str: route_process = subprocess.run( route_command, capture_output=True, text=True, timeout=10, check=True ) - gateway_match = re.search( - r"^\s*gateway:\s*(\S+)", route_process.stdout, re.MULTILINE - ) + gateway_match = re.search(r"^\s*gateway:\s*(\S+)", route_process.stdout, re.MULTILINE) if not gateway_match: raise Exception("Could not determine default gateway IP.") @@ -549,28 +625,39 @@ def perform_checks() -> None: """Main automation function to run all configured tests and log results.""" global run_counter, DEVICE_ACCESS_CODE run_counter += 1 - master_results = {} + master_results: dict[str, str | float | int | None] = {} + debug_log = DebugLogger(start_time=time.time()) + debug_log.log("perform_checks: START") print( f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] " f"Starting checks (Run #{run_counter})..." ) + # Check for any lingering chromedriver processes from previous runs + log_running_chromedriver_processes(debug_log) + # --- Run Local Tests (No Browser Required) --- + debug_log.log("run_wifi_diagnostics_task: START") wifi_results = run_wifi_diagnostics_task() + debug_log.log("run_wifi_diagnostics_task: END") if wifi_results: master_results.update(wifi_results) if config.RUN_LOCAL_PING_TEST: + debug_log.log("run_local_ping_task (WAN): START") wan_ping_results = run_local_ping_task(config.PING_TARGET) - master_results.update( - {f"local_wan_{k}": v for k, v in wan_ping_results.items()} - ) + debug_log.log("run_local_ping_task (WAN): END") + master_results.update({f"local_wan_{k}": v for k, v in wan_ping_results.items()}) if config.RUN_LOCAL_GATEWAY_PING_TEST: + debug_log.log("run_local_ping_task (Gateway): START") gateway_ip = config.GATEWAY_URL.split("//")[-1].split("/")[0] gw_ping_results = run_local_ping_task(gateway_ip) + debug_log.log("run_local_ping_task (Gateway): END") master_results.update({f"local_gw_{k}": v for k, v in gw_ping_results.items()}) if config.RUN_LOCAL_SPEED_TEST: + debug_log.log("run_local_speed_test_task: START") local_speed_results = run_local_speed_test_task() + debug_log.log("run_local_speed_test_task: END") if local_speed_results: master_results.update(local_speed_results) @@ -589,21 +676,35 @@ def perform_checks() -> None: chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") - driver = None - service = ChromeService() # Use Selenium Manager to handle the driver + driver: Optional[WebDriver] = None + # Use Selenium Manager to handle the driver + service: ChromeService = ChromeService() try: + debug_log.log("Selenium setup: START") print("Setting up WebDriver for gateway tests...") driver = webdriver.Chrome( - service=service, # Use the predefined service object + service=service, options=chrome_options, ) + # Track and log the chromedriver PID if available + try: + if service and service.process and service.process.pid: + debug_log.set_chromedriver_pid(service.process.pid) + debug_log.log(f"WebDriver service started with PID: {service.process.pid}") + except Exception as e: + debug_log.log(f"Unable to retrieve WebDriver PID: {e}") + debug_log.log("Selenium setup: END") driver.get(config.GATEWAY_URL) time.sleep(2) + debug_log.log("run_ping_test_task: START") gateway_ping_results = run_ping_test_task(driver) + debug_log.log("run_ping_test_task: END") if gateway_ping_results: master_results.update(gateway_ping_results) if should_run_gateway_speed_test: + debug_log.log("run_speed_test_task: START") gateway_speed_results = run_speed_test_task(driver, DEVICE_ACCESS_CODE) + debug_log.log("run_speed_test_task: END") if gateway_speed_results: master_results.update(gateway_speed_results) except Exception as e: @@ -615,30 +716,40 @@ def perform_checks() -> None: print(f"Saved screenshot to {screenshot_file} for debugging.") finally: if driver: + debug_log.log("WebDriver quit: START") print("Closing WebDriver...") driver.quit() - # NEW: Add a more aggressive cleanup to prevent zombie processes - if service and service.process: - print("Ensuring chromedriver service is terminated...") - try: - service.process.kill() - except Exception as e: - print(f"Error while trying to kill service process: {e}") - + debug_log.log("WebDriver quit: END") + if service: + debug_log.log("WebDriver service.stop(): START") + service.stop() + debug_log.log("WebDriver service.stop(): END") + # Verify process state immediately after stopping service + log_running_chromedriver_processes(debug_log) + + debug_log.log("perform_checks: END") log_results(master_results) print("\n" + "=" * 60 + "\n") + # ADD THIS LINE: + if schedule.jobs: + _nr = schedule.next_run() + if _nr is not None: + next_run_time = _nr.strftime("%Y-%m-%d %H:%M:%S") + print(f"Next test is scheduled for: {next_run_time}") + # --- Scheduler --- if __name__ == "__main__": + if getattr(config, "ENABLE_DEBUG_LOGGING", False): + logging.basicConfig() + schedule_logger = logging.getLogger("schedule") + schedule_logger.setLevel(level=logging.DEBUG) perform_checks() schedule.every(config.RUN_INTERVAL_MINUTES).minutes.do(perform_checks) if schedule.jobs: next_run_datetime = schedule.jobs[0].next_run - print( - "Next test is scheduled for: " - f"{next_run_datetime.strftime('%Y-%m-%d %H:%M:%S')}" - ) + print(f"Next test is scheduled for: {next_run_datetime.strftime('%Y-%m-%d %H:%M:%S')}") print("Press Ctrl+C to exit.") while True: schedule.run_pending() diff --git a/tests/test_core.py b/tests/test_core.py index 5364e23..9f53542 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -133,9 +133,7 @@ def test_parse_gateway_ping_garbage_input(): @patch("main.subprocess.run") def test_local_speed_test_parsing(mock_run, mock_exists): """Ensures the local speed test task correctly parses JSON and returns floats.""" - mock_run.return_value = MagicMock( - stdout=SPEEDTEST_JSON_OUTPUT, returncode=0, stderr="" - ) + mock_run.return_value = MagicMock(stdout=SPEEDTEST_JSON_OUTPUT, returncode=0, stderr="") results = run_local_speed_test_task() assert results is not None # Bandwidth is in bytes, so we convert to Mbps (bytes * 8 / 1,000,000) diff --git a/tests/test_logging.py b/tests/test_logging.py index 9617886..1e03a7d 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -1,29 +1,59 @@ import importlib import os import sys +from typing import Mapping, TypeAlias from unittest.mock import mock_open, patch +import pytest + # Ensure the main module can be imported sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import config from main import Colors, log_results +# Light typing for inputs to log_results +ResultRow: TypeAlias = Mapping[str, str | float | int | None] + # --- Test Data --- -MOCK_DATA = { - "gateway_loss_percentage": 1.5, # Anomaly - "gateway_rtt_avg_ms": 45.0, # Anomaly - "downstream_speed": 40.0, # Anomaly - "upstream_speed": 5.0, # Anomaly +# Scenario 1: All data is present and correct. +MOCK_DATA_COMPLETE: ResultRow = { + "gateway_loss_percentage": 1.5, + "gateway_rtt_avg_ms": 45.0, + "downstream_speed": 40.0, + "upstream_speed": 5.0, "local_wan_loss_percentage": 0.0, "local_wan_rtt_avg_ms": 20.0, "local_wan_ping_stddev": 5.0, "local_gw_loss_percentage": 0.0, "local_gw_rtt_avg_ms": 1.0, + "local_gw_ping_stddev": 0.531, # Gateway jitter is present "local_downstream_speed": 450.0, "local_upstream_speed": 25.0, - "local_speedtest_jitter": 12.0, # Anomaly + "local_speedtest_jitter": 12.0, + "wifi_bssid": "a1:b2:c3:d4:e5:f6", + "wifi_channel": "149,80", + "wifi_rssi": "-55", + "wifi_noise": "-90", + "wifi_tx_rate": "866", +} + +# Scenario 2: Gateway ping failed, so its RTT and jitter are missing (None). +MOCK_DATA_MISSING_GW_JITTER: ResultRow = { + "gateway_loss_percentage": 1.5, + "gateway_rtt_avg_ms": 45.0, + "downstream_speed": 40.0, + "upstream_speed": 5.0, + "local_wan_loss_percentage": 0.0, + "local_wan_rtt_avg_ms": 20.0, + "local_wan_ping_stddev": 5.0, + "local_gw_loss_percentage": 100.0, + "local_gw_rtt_avg_ms": None, # RTT is None + "local_gw_ping_stddev": None, # Gateway jitter is None + "local_downstream_speed": 450.0, + "local_upstream_speed": 25.0, + "local_speedtest_jitter": 12.0, "wifi_bssid": "a1:b2:c3:d4:e5:f6", "wifi_channel": "149,80", "wifi_rssi": "-55", @@ -33,51 +63,43 @@ # --- Tests for log_results function --- + @patch("builtins.open", new_callable=mock_open) @patch("main.os.path.exists", return_value=False) -def test_log_results_csv_creation(mock_exists, mock_open_file): +def test_log_results_csv_creation(mock_exists, mock_open_file) -> None: """Tests that a new CSV file is created with a header.""" - log_results(MOCK_DATA) + log_results(MOCK_DATA_COMPLETE) handle = mock_open_file() - - header = ("Timestamp,Gateway_LossPercentage,Gateway_RTT_avg_ms,Gateway_Downstream_Mbps," - "Gateway_Upstream_Mbps,Local_WAN_LossPercentage,Local_WAN_RTT_avg_ms," - "Local_WAN_Ping_StdDev,Local_GW_LossPercentage,Local_GW_RTT_avg_ms," - "Local_GW_Ping_StdDev,Local_Downstream_Mbps,Local_Upstream_Mbps,Local_Speedtest_Jitter_ms," - "WiFi_BSSID,WiFi_Channel,WiFi_RSSI,WiFi_Noise,WiFi_TxRate_Mbps\n") - - # The mock records all calls, including the context manager __enter__ and __exit__. - # We are interested in the calls to the write method. - - # Extract the written data from the mock calls + header = ( + "Timestamp,Gateway_LossPercentage,Gateway_RTT_avg_ms,Gateway_Downstream_Mbps," + "Gateway_Upstream_Mbps,Local_WAN_LossPercentage,Local_WAN_RTT_avg_ms," + "Local_WAN_Ping_StdDev,Local_GW_LossPercentage,Local_GW_RTT_avg_ms," + "Local_GW_Ping_StdDev,Local_Downstream_Mbps,Local_Upstream_Mbps,Local_Speedtest_Jitter_ms," + "WiFi_BSSID,WiFi_Channel,WiFi_RSSI,WiFi_Noise,WiFi_TxRate_Mbps\n" + ) written_header = handle.mock_calls[1][1][0] written_data_row = handle.mock_calls[2][1][0] - assert header.strip() == written_header.strip() - assert MOCK_DATA['wifi_bssid'] in written_data_row - + assert MOCK_DATA_COMPLETE["wifi_bssid"] in written_data_row assert handle.write.call_count == 2 @patch("builtins.open", new_callable=mock_open) @patch("main.os.path.exists", return_value=True) @patch("main.os.path.getsize", return_value=100) -def test_log_results_csv_append(mock_getsize, mock_exists, mock_open_file): +def test_log_results_csv_append(mock_getsize, mock_exists, mock_open_file) -> None: """Tests that log_results appends to an existing CSV without a header.""" - log_results(MOCK_DATA) + log_results(MOCK_DATA_COMPLETE) handle = mock_open_file() - # Header should not be written, only the data row handle.write.assert_called_once() - # Extract the string that was written written_string = handle.write.call_args[0][0] assert "Timestamp" not in written_string - assert "1.500" in written_string # Check for a formatted value + assert "1.500" in written_string @patch("builtins.print") -def test_log_results_console_output_highlighting(mock_print): +def test_log_results_console_output_highlighting(mock_print) -> None: """Tests that anomaly highlighting works correctly in console output.""" - # Temporarily modify config for this test config.ENABLE_ANOMALY_HIGHLIGHTING = True config.PACKET_LOSS_THRESHOLD = 1.0 config.PING_RTT_THRESHOLD = 40.0 @@ -85,23 +107,52 @@ def test_log_results_console_output_highlighting(mock_print): config.GATEWAY_UPSTREAM_SPEED_THRESHOLD = 10.0 config.JITTER_THRESHOLD = 10.0 - log_results(MOCK_DATA) - - # Convert mock_print calls to a single string for easier searching + log_results(MOCK_DATA_COMPLETE) all_output = " ".join([str(call.args[0]) for call in mock_print.call_args_list]) - # Check for RED color code on anomalous values assert f"{Colors.RED}1.50{Colors.RESET}" in all_output assert f"{Colors.RED}45.00{Colors.RESET}" in all_output assert f"{Colors.RED}40.00{Colors.RESET}" in all_output assert f"{Colors.RED}5.00{Colors.RESET}" in all_output assert f"{Colors.RED}12.000{Colors.RESET}" in all_output - - # Check for a non-anomalous value to ensure it's not colored red - assert f"{Colors.RED}20.00{Colors.RESET}" not in all_output - # Check for the special CYAN color on Local GW RTT assert f"{Colors.CYAN}1.00{Colors.RESET}" in all_output - - # Restore config - # This is important if other tests depend on the original config importlib.reload(config) + + +@pytest.mark.parametrize( + "test_data, expected_jitter_csv, expected_jitter_console", + [ + (MOCK_DATA_COMPLETE, ",0.531,", "0.531"), # Happy path + (MOCK_DATA_MISSING_GW_JITTER, ",N/A,", "N/A"), # Failure case + ], +) +@patch("builtins.open", new_callable=mock_open) +@patch("main.os.path.exists", return_value=False) +@patch("builtins.print") +def test_log_results_handles_gateway_jitter( + mock_print, + mock_exists, + mock_open_file, + test_data: ResultRow, + expected_jitter_csv: str, + expected_jitter_console: str, +) -> None: + """ + Tests that gateway ping jitter is logged correctly, handling both + successful results and missing data ('N/A'). + """ + log_results(test_data) + handle = mock_open_file() + + # 1. Check that the CSV Header is always correct + written_header = handle.mock_calls[1][1][0] + assert "Local_GW_Ping_StdDev" in written_header + + # 2. Check that the CSV Data Row contains the expected value ('0.531' or 'N/A') + written_data_row = handle.mock_calls[2][1][0] + assert expected_jitter_csv in written_data_row + + # 3. Check that the Console Output contains the expected value + all_output = " ".join([str(call.args[0]) for call in mock_print.call_args_list if call.args]) + assert "Gateway Jitter (StdDev):" in all_output + assert expected_jitter_console in all_output diff --git a/tests/test_main_flow.py b/tests/test_main_flow.py index 0e372d7..b437cb1 100644 --- a/tests/test_main_flow.py +++ b/tests/test_main_flow.py @@ -13,20 +13,22 @@ # --- Mocks for all task functions --- + @pytest.fixture def mock_tasks(): """Mocks all the individual task functions and log_results.""" - with patch("main.run_wifi_diagnostics_task", return_value={}) as mock_wifi, \ - patch("main.run_local_ping_task", return_value={}) as mock_local_ping, \ - patch("main.run_local_speed_test_task", return_value={}) as mock_local_speed, \ - patch("main.run_ping_test_task", return_value={}) as mock_gateway_ping, \ - patch("main.run_speed_test_task", return_value={}) as mock_gateway_speed, \ - patch("main.log_results") as mock_log, \ - patch("main.webdriver.Chrome", return_value=MagicMock()) as mock_chrome, \ - patch("main.get_access_code", return_value="test-code") as mock_access_code, \ - patch("main.ChromeService") as mock_service, \ - patch("main.time.sleep"): - + with ( + patch("main.run_wifi_diagnostics_task", return_value={}) as mock_wifi, + patch("main.run_local_ping_task", return_value={}) as mock_local_ping, + patch("main.run_local_speed_test_task", return_value={}) as mock_local_speed, + patch("main.run_ping_test_task", return_value={}) as mock_gateway_ping, + patch("main.run_speed_test_task", return_value={}) as mock_gateway_speed, + patch("main.log_results") as mock_log, + patch("main.webdriver.Chrome", return_value=MagicMock()) as mock_chrome, + patch("main.get_access_code", return_value="test-code") as mock_access_code, + patch("main.ChromeService") as mock_service, + patch("main.time.sleep"), + ): mock_service.return_value.process = MagicMock() yield { @@ -37,11 +39,13 @@ def mock_tasks(): "gateway_speed": mock_gateway_speed, "log": mock_log, "chrome": mock_chrome, - "access_code": mock_access_code + "access_code": mock_access_code, } + # --- Tests for perform_checks orchestration --- + def test_perform_checks_local_ping_toggle(mock_tasks, monkeypatch): """Tests that RUN_LOCAL_PING_TEST config toggles the task call.""" monkeypatch.setattr(config_module, "RUN_LOCAL_GATEWAY_PING_TEST", False) @@ -56,6 +60,7 @@ def test_perform_checks_local_ping_toggle(mock_tasks, monkeypatch): perform_checks() mock_tasks["local_ping"].assert_not_called() + def test_perform_checks_local_gateway_ping_toggle(mock_tasks, monkeypatch): """Tests that RUN_LOCAL_GATEWAY_PING_TEST config toggles the task call.""" monkeypatch.setattr(config_module, "RUN_LOCAL_PING_TEST", False) @@ -71,6 +76,7 @@ def test_perform_checks_local_gateway_ping_toggle(mock_tasks, monkeypatch): perform_checks() mock_tasks["local_ping"].assert_not_called() + def test_perform_checks_local_speed_test_toggle(mock_tasks, monkeypatch): """Tests that RUN_LOCAL_SPEED_TEST config toggles the task call.""" monkeypatch.setattr(config_module, "RUN_LOCAL_SPEED_TEST", True) @@ -83,6 +89,7 @@ def test_perform_checks_local_speed_test_toggle(mock_tasks, monkeypatch): perform_checks() mock_tasks["local_speed"].assert_not_called() + def test_perform_checks_gateway_speed_test_interval(mock_tasks, monkeypatch): """Tests that RUN_GATEWAY_SPEED_TEST_INTERVAL config works correctly.""" # Reset the global run counter for predictable behavior @@ -92,21 +99,22 @@ def test_perform_checks_gateway_speed_test_interval(mock_tasks, monkeypatch): monkeypatch.setattr(config_module, "RUN_GATEWAY_SPEED_TEST_INTERVAL", 2) # Should NOT run on first call (run #1) - perform_checks() # Run 1 + perform_checks() # Run 1 mock_tasks["gateway_speed"].assert_not_called() mock_tasks["gateway_speed"].reset_mock() # Should run on second call (run #2) - perform_checks() # Run 2 + perform_checks() # Run 2 mock_tasks["gateway_speed"].assert_called_once() mock_tasks["gateway_speed"].reset_mock() # Should NOT run on third call (run #3) - perform_checks() # Run 3 + perform_checks() # Run 3 mock_tasks["gateway_speed"].assert_not_called() + def test_perform_checks_gateway_speed_test_disabled(mock_tasks, monkeypatch): """Tests that gateway speed test is disabled when interval is 0.""" main_module.run_counter = 0 diff --git a/tests/test_selenium_tasks.py b/tests/test_selenium_tasks.py index d0a85aa..d89fae2 100644 --- a/tests/test_selenium_tasks.py +++ b/tests/test_selenium_tasks.py @@ -12,6 +12,7 @@ # --- Fixtures --- + @pytest.fixture def mock_driver(): """Provides a mock Selenium WebDriver object.""" @@ -21,8 +22,10 @@ def mock_driver(): driver.find_elements.return_value = [MagicMock(), MagicMock()] return driver + # --- Tests for run_ping_test_task --- + @patch("main.time.sleep") @patch("main.WebDriverWait") def test_ping_task_success(mock_wait, mock_sleep, mock_driver): @@ -53,6 +56,7 @@ def test_ping_task_success(mock_wait, mock_sleep, mock_driver): assert results["gateway_loss_percentage"] == 0.0 assert results["gateway_rtt_avg_ms"] == 15.2 + @patch("main.time.sleep") @patch("main.WebDriverWait") def test_ping_task_timeout_exception(mock_wait, mock_sleep, mock_driver): @@ -61,6 +65,7 @@ def test_ping_task_timeout_exception(mock_wait, mock_sleep, mock_driver): results = run_ping_test_task(mock_driver) assert results is None + @patch("main.time.sleep") def test_ping_task_empty_results(mock_sleep, mock_driver): """Tests that the function returns None if the result text is empty.""" @@ -71,8 +76,10 @@ def test_ping_task_empty_results(mock_sleep, mock_driver): results = run_ping_test_task(mock_driver) assert results is None + # --- Tests for run_speed_test_task --- + @patch("main.time.sleep") @patch("main.WebDriverWait") def test_speed_test_task_success(mock_wait, mock_sleep, mock_driver): @@ -85,9 +92,9 @@ def test_speed_test_task_success(mock_wait, mock_sleep, mock_driver): mock_speed_col_down.text = "123.45" mock_row_down = MagicMock() mock_row_down.find_elements.return_value = [ - MagicMock(), - mock_downstream_col, - mock_speed_col_down + MagicMock(), + mock_downstream_col, + mock_speed_col_down, ] mock_upstream_col = MagicMock() mock_upstream_col.text = "Upstream" @@ -98,13 +105,13 @@ def test_speed_test_task_success(mock_wait, mock_sleep, mock_driver): mock_table = MagicMock() mock_table.find_elements.return_value = [mock_row_down, mock_row_up] - mock_driver.find_element.return_value = MagicMock() # for run button + mock_driver.find_element.return_value = MagicMock() # for run button mock_wait.return_value.until.side_effect = [ - TimeoutException("No password field"), # First wait for password fails - MagicMock(), # Second wait for run button succeeds - MagicMock(), # Third wait for run button to be clickable after test - mock_table, # Fourth wait for results table + TimeoutException("No password field"), # First wait for password fails + MagicMock(), # Second wait for run button succeeds + MagicMock(), # Third wait for run button to be clickable after test + mock_table, # Fourth wait for results table ] results = run_speed_test_task(mock_driver, "test_code") @@ -114,6 +121,7 @@ def test_speed_test_task_success(mock_wait, mock_sleep, mock_driver): assert results["downstream_speed"] == 123.45 assert results["upstream_speed"] == 67.89 + @patch("main.time.sleep") @patch("main.WebDriverWait") def test_speed_test_task_login_required(mock_wait, mock_sleep, mock_driver): @@ -122,14 +130,14 @@ def test_speed_test_task_login_required(mock_wait, mock_sleep, mock_driver): mock_continue_button = MagicMock() mock_run_button = MagicMock() mock_results_table = MagicMock() - mock_results_table.find_elements.return_value = [] # No results + mock_results_table.find_elements.return_value = [] # No results # Simulate the sequence of waits mock_wait.return_value.until.side_effect = [ - mock_password_input, # First wait finds password field - mock_run_button, # Second wait finds run button - mock_run_button, # Third wait for button to be clickable after test - mock_results_table, # Fourth wait for results table + mock_password_input, # First wait finds password field + mock_run_button, # Second wait finds run button + mock_run_button, # Third wait for button to be clickable after test + mock_results_table, # Fourth wait for results table ] mock_driver.find_element.return_value = mock_continue_button @@ -139,6 +147,7 @@ def test_speed_test_task_login_required(mock_wait, mock_sleep, mock_driver): mock_continue_button.click.assert_called_once() mock_run_button.click.assert_called_once() + @patch("main.time.sleep") @patch("main.WebDriverWait") def test_speed_test_task_timeout_exception(mock_wait, mock_sleep, mock_driver):