diff --git a/.vscode/settings.json b/.vscode/settings.json index f7c8fa6..5406887 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -20,5 +20,8 @@ "*_test.py" ], "python.testing.pytestEnabled": false, - "python.testing.unittestEnabled": true + "python.testing.unittestEnabled": true, + "files.associations": { + "*.hss": "yaml" + } } diff --git a/autonomy/src/mission_planner/tests.hss b/autonomy/src/mission_planner/tests.hss new file mode 100644 index 0000000..ad84fde --- /dev/null +++ b/autonomy/src/mission_planner/tests.hss @@ -0,0 +1,36 @@ +# Mission Planner Tests Configuration +# Tests for mission planning and execution +name: "Mission Planner Tests" +description: "Tests for mission planning and execution modules" +strategy: "integration" + +tests: + - name: "Slalom Mission Test" + path: "test_slalom_integration.py" + args: [] + timeout: 300 + infinite_loop: false + description: "Integration test for slalom mission" + strategy: "UnitTestStrategy" + enabled: true + test_type: "python" + + - name: "Gate Mission Test" + path: "test_gate_mission.py" + args: [] + timeout: 300 + infinite_loop: false + description: "Integration test for gate mission" + strategy: "UnitTestStrategy" + enabled: true + test_type: "python" + + - name: "Mission Controller Test" + path: "test_mission_controller.py" + args: [] + timeout: 180 + infinite_loop: false + description: "Unit test for mission controller" + strategy: "UnitTestStrategy" + enabled: true + test_type: "python" diff --git a/autonomy/src/tests.hss b/autonomy/src/tests.hss new file mode 100644 index 0000000..4a13436 --- /dev/null +++ b/autonomy/src/tests.hss @@ -0,0 +1,36 @@ +# Computer Vision Tests Configuration +# Tests for the computer vision module +name: "Computer Vision Tests" +description: "Tests for CV publishers and detection systems" +strategy: "unit" + +tests: + - name: "CV Publishers Unit Test" + path: "test_cv_publishers.py" + args: [] + timeout: 120 + infinite_loop: false + description: "Unit tests for CV publishers module" + strategy: "UnitTestStrategy" + enabled: true + test_type: "python" + + - name: "Detection Core Test" + path: "../computer_vision/test_detection_core.py" + args: [] + timeout: 60 + infinite_loop: false + description: "Unit tests for detection core functionality" + strategy: "UnitTestStrategy" + enabled: true + test_type: "python" + + - name: "CV Integration Test" + path: "test_cv_integration.py" + args: [] + timeout: 180 + infinite_loop: false + description: "Integration tests for computer vision pipeline" + strategy: "UnitTestStrategy" + enabled: true + test_type: "python" diff --git a/devices/DVL/Wayfinder/tests.hss b/devices/DVL/Wayfinder/tests.hss new file mode 100644 index 0000000..5efee9e --- /dev/null +++ b/devices/DVL/Wayfinder/tests.hss @@ -0,0 +1,26 @@ +# DVL Driver Tests Configuration +# Tests for DVL (Doppler Velocity Log) driver functionality +name: "DVL Driver Tests" +description: "Tests for DVL driver and communication" +strategy: "unit" + +tests: + - name: "DVL Driver Basic Test" + path: "driver_test.py" + args: [] + timeout: 60 + infinite_loop: false + description: "Basic functionality test for DVL driver" + strategy: "UnitTestStrategy" + enabled: true + test_type: "python" + + - name: "DVL Communication Test" + path: "communication_test.py" + args: [] + timeout: 120 + infinite_loop: false + description: "Test DVL communication protocols" + strategy: "UnitTestStrategy" + enabled: false # Disabled by default (requires hardware) + test_type: "python" diff --git a/hydrus-cli b/hydrus-cli new file mode 100755 index 0000000..3074f4a --- /dev/null +++ b/hydrus-cli @@ -0,0 +1,15 @@ +#!/bin/bash +# Hydrus CLI Wrapper - executes python -m scripts.cli +# This wrapper ensures proper module execution with correct Python path + +# Get the directory where this wrapper is located (should be project root) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# Set HYDRUS_ROOT environment variable to ensure test system works correctly +export HYDRUS_ROOT="$SCRIPT_DIR" + +# Change to the project directory to ensure proper module resolution +cd "$SCRIPT_DIR" + +# Execute the CLI module with proper Python path +exec python3 -m scripts.cli "$@" diff --git a/scripts/cli.py b/scripts/cli.py index 2db199e..e5d849b 100755 --- a/scripts/cli.py +++ b/scripts/cli.py @@ -2,6 +2,7 @@ import typer from .commands.arduino import arduino_command +from .commands.autonomy import autonomy_app from .commands.ros import ros_app from .commands.test import test_app from .commands.tmux import tmux_command @@ -12,6 +13,9 @@ app.add_typer( arduino_command, name="arduino", help="Arduino device management commands" ) +app.add_typer( + autonomy_app, name="autonomy", help="Autonomy system and CV pipeline management" +) app.add_typer(ros_app, name="ros", help="ROS workspace and utilities management") app.add_typer(test_app, name="test", help="Test suite management and execution") app.add_typer(tmux_command, name="tmux", help="Tmux session management commands") diff --git a/scripts/commands/autonomy.py b/scripts/commands/autonomy.py index e69de29..818c701 100644 --- a/scripts/commands/autonomy.py +++ b/scripts/commands/autonomy.py @@ -0,0 +1,486 @@ +#!/usr/bin/env python3 +""" +Autonomy command module for managing CV pipeline and related systems. +""" + +import subprocess +import sys +import time +from pathlib import Path + +try: + import typer + from rich.console import Console + from rich.panel import Panel + from rich.progress import Progress, SpinnerColumn, TextColumn +except ImportError as e: + print(f"Error: Missing required dependency: {e}") + print("Please install with: pip install typer rich") + sys.exit(1) + +console = Console() +autonomy_app = typer.Typer(help="Autonomy system management commands") + +# Project root directory +PROJECT_ROOT = Path(__file__).parent.parent.parent + + +@autonomy_app.command() +def start_camera_publisher( + device: int = typer.Option(0, help="Camera device index (e.g., 0 for /dev/video0)"), + width: int = typer.Option(640, help="Camera image width"), + height: int = typer.Option(480, help="Camera image height"), + fps: int = typer.Option(30, help="Camera frame rate"), + rgb_topic: str = typer.Option( + "/camera/rgb/image_rect_color", help="RGB image topic name" + ), + depth_topic: str = typer.Option( + "/camera/depth/depth_registered", help="Depth image topic name" + ), + camera_info_topic: str = typer.Option( + "/camera/rgb/camera_info", help="Camera info topic name" + ), +): + """Start the webcam publisher that publishes ROS image topics for the CV pipeline.""" + console.print( + Panel.fit( + f"[bold green]Starting Camera Publisher[/bold green]\n" + f"Device: /dev/video{device}\n" + f"Resolution: {width}x{height}\n" + f"FPS: {fps}\n" + f"RGB Topic: {rgb_topic}\n" + f"Depth Topic: {depth_topic}\n" + f"Camera Info Topic: {camera_info_topic}", + title="Camera Publisher Configuration", + ) + ) + + # Path to the camera publisher script + script_path = PROJECT_ROOT / "autonomy" / "scripts" / "cv" / "camera_publisher.py" + + try: + # Run the camera publisher with the specified parameters + cmd = [ + sys.executable, + str(script_path), + "--device", + str(device), + "--width", + str(width), + "--height", + str(height), + "--fps", + str(fps), + "--rgb-topic", + rgb_topic, + "--depth-topic", + depth_topic, + "--camera-info-topic", + camera_info_topic, + ] + + console.print(f"[yellow]Executing: {' '.join(cmd)}[/yellow]") + subprocess.run(cmd, check=True) + + except subprocess.CalledProcessError as e: + console.print(f"[red]Error running camera publisher: {e}[/red]") + raise typer.Exit(1) + except KeyboardInterrupt: + console.print("\n[yellow]Camera publisher stopped by user[/yellow]") + except FileNotFoundError: + console.print(f"[red]Camera publisher script not found at: {script_path}[/red]") + console.print("[yellow]Creating the camera publisher script...[/yellow]") + _create_camera_publisher_script() + console.print( + "[green]Camera publisher script created! Please run the command again.[/green]" + ) + + +@autonomy_app.command() +def start_cv_pipeline( + model: str = typer.Option("yolov8n.pt", help="YOLO model to use"), + rgb_topic: str = typer.Option( + "/camera/rgb/image_rect_color", help="RGB image topic to subscribe to" + ), + depth_topic: str = typer.Option( + "/camera/depth/depth_registered", help="Depth image topic to subscribe to" + ), + camera_info_topic: str = typer.Option( + "/camera/rgb/camera_info", help="Camera info topic to subscribe to" + ), + camera_pose_topic: str = typer.Option( + "/camera/pose", help="Camera pose topic to subscribe to" + ), +): + """Start the complete CV pipeline (cv_publishers) with specified parameters.""" + console.print( + Panel.fit( + f"[bold green]Starting CV Pipeline[/bold green]\n" + f"YOLO Model: {model}\n" + f"RGB Topic: {rgb_topic}\n" + f"Depth Topic: {depth_topic}\n" + f"Camera Info Topic: {camera_info_topic}\n" + f"Camera Pose Topic: {camera_pose_topic}", + title="CV Pipeline Configuration", + ) + ) + + # Path to cv_publishers.py + cv_publishers_path = PROJECT_ROOT / "autonomy" / "src" / "cv_publishers.py" + + if not cv_publishers_path.exists(): + console.print( + f"[red]CV publishers script not found at: {cv_publishers_path}[/red]" + ) + raise typer.Exit(1) + + try: + # Set ROS parameters and run cv_publishers + cmd = [ + sys.executable, + str(cv_publishers_path), + f"_rgb_image_topic:={rgb_topic}", + f"_depth_image_topic:={depth_topic}", + f"_camera_info_topic:={camera_info_topic}", + f"_camera_pose_topic:={camera_pose_topic}", + ] + + console.print(f"[yellow]Executing: {' '.join(cmd)}[/yellow]") + + # Run with progress indicator + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + console=console, + ) as progress: + progress.add_task("Starting CV Pipeline...", total=None) + + # Start the process + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True + ) + + # Monitor the process + while process.poll() is None: + time.sleep(0.1) + + if process.returncode != 0: + output = process.stdout.read() if process.stdout else "" + console.print( + f"[red]CV Pipeline failed with return code {process.returncode}[/red]" + ) + if output: + console.print(f"[red]Output: {output}[/red]") + raise typer.Exit(1) + + except subprocess.CalledProcessError as e: + console.print(f"[red]Error running CV pipeline: {e}[/red]") + raise typer.Exit(1) + except KeyboardInterrupt: + console.print("\n[yellow]CV Pipeline stopped by user[/yellow]") + except Exception as e: + console.print(f"[red]Unexpected error: {e}[/red]") + raise typer.Exit(1) + + +@autonomy_app.command() +def start_full_pipeline( + camera_device: int = typer.Option(0, help="Camera device index"), + camera_width: int = typer.Option(640, help="Camera image width"), + camera_height: int = typer.Option(480, help="Camera image height"), + camera_fps: int = typer.Option(30, help="Camera frame rate"), + model: str = typer.Option("yolov8n.pt", help="YOLO model to use"), + delay: int = typer.Option( + 3, help="Delay between starting camera and CV pipeline (seconds)" + ), +): + """Start both the camera publisher and CV pipeline in sequence.""" + console.print( + Panel.fit( + "[bold green]Starting Full CV Pipeline[/bold green]\n" + "This will start:\n" + "1. Camera publisher (webcam โ†’ ROS topics)\n" + "2. CV pipeline (object detection & processing)", + title="Full Pipeline Startup", + ) + ) + + # Define common topic names + rgb_topic = "/camera/rgb/image_rect_color" + depth_topic = "/camera/depth/depth_registered" + camera_info_topic = "/camera/rgb/camera_info" + camera_pose_topic = "/camera/pose" + + try: + # Step 1: Start camera publisher in background + console.print("[yellow]Step 1: Starting camera publisher...[/yellow]") + camera_script = ( + PROJECT_ROOT / "autonomy" / "scripts" / "cv" / "camera_publisher.py" + ) + + if not camera_script.exists(): + console.print("[yellow]Creating camera publisher script...[/yellow]") + _create_camera_publisher_script() + + camera_cmd = [ + sys.executable, + str(camera_script), + "--device", + str(camera_device), + "--width", + str(camera_width), + "--height", + str(camera_height), + "--fps", + str(camera_fps), + "--rgb-topic", + rgb_topic, + "--depth-topic", + depth_topic, + "--camera-info-topic", + camera_info_topic, + ] + + camera_process = subprocess.Popen(camera_cmd) + console.print("[green]Camera publisher started![/green]") + + # Step 2: Wait for camera to initialize + console.print( + f"[yellow]Waiting {delay} seconds for camera to initialize...[/yellow]" + ) + time.sleep(delay) + + # Step 3: Start CV pipeline + console.print("[yellow]Step 2: Starting CV pipeline...[/yellow]") + cv_cmd = [ + sys.executable, + str(PROJECT_ROOT / "autonomy" / "src" / "cv_publishers.py"), + f"_rgb_image_topic:={rgb_topic}", + f"_depth_image_topic:={depth_topic}", + f"_camera_info_topic:={camera_info_topic}", + f"_camera_pose_topic:={camera_pose_topic}", + ] + + cv_process = subprocess.Popen(cv_cmd) + console.print("[green]CV pipeline started![/green]") + + console.print( + Panel.fit( + "[bold green]Full Pipeline Running![/bold green]\n" + "Press Ctrl+C to stop both processes", + title="Pipeline Status", + ) + ) + + # Wait for user interruption + try: + camera_process.wait() + except KeyboardInterrupt: + console.print("\n[yellow]Stopping pipeline...[/yellow]") + camera_process.terminate() + cv_process.terminate() + + # Wait for processes to terminate + camera_process.wait(timeout=5) + cv_process.wait(timeout=5) + + console.print("[green]Pipeline stopped successfully![/green]") + + except Exception as e: + console.print(f"[red]Error in full pipeline: {e}[/red]") + raise typer.Exit(1) + + +def _create_camera_publisher_script(): + """Create the camera publisher script if it doesn't exist.""" + script_dir = PROJECT_ROOT / "autonomy" / "scripts" / "cv" + script_dir.mkdir(parents=True, exist_ok=True) + + script_path = script_dir / "camera_publisher.py" + + script_content = '''#!/usr/bin/env python3 +""" +Camera Publisher - Publishes webcam images to ROS topics +This script captures images from a webcam and publishes them as ROS Image messages. +""" + +import argparse +import cv2 +import numpy as np +import rospy +from sensor_msgs.msg import Image, CameraInfo +from std_msgs.msg import Header + + +class CameraPublisher: + def __init__(self, device=0, width=640, height=480, fps=30, + rgb_topic="/camera/rgb/image_rect_color", + depth_topic="/camera/depth/depth_registered", + camera_info_topic="/camera/rgb/camera_info"): + + rospy.init_node('camera_publisher', anonymous=True) + + self.device = device + self.width = width + self.height = height + self.fps = fps + + # Initialize camera + self.cap = cv2.VideoCapture(device) + if not self.cap.isOpened(): + rospy.logerr(f"Failed to open camera device {device}") + return + + # Set camera properties + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + self.cap.set(cv2.CAP_PROP_FPS, fps) + + # Publishers + self.rgb_pub = rospy.Publisher(rgb_topic, Image, queue_size=1) + self.depth_pub = rospy.Publisher(depth_topic, Image, queue_size=1) + self.camera_info_pub = rospy.Publisher(camera_info_topic, CameraInfo, queue_size=1) + + # Camera info message (basic calibration) + self.camera_info = self._create_camera_info() + + rospy.loginfo(f"Camera publisher initialized:") + rospy.loginfo(f" Device: {device}") + rospy.loginfo(f" Resolution: {width}x{height}") + rospy.loginfo(f" FPS: {fps}") + rospy.loginfo(f" RGB Topic: {rgb_topic}") + rospy.loginfo(f" Depth Topic: {depth_topic}") + rospy.loginfo(f" Camera Info Topic: {camera_info_topic}") + + def _create_camera_info(self): + """Create a basic camera info message.""" + cam_info = CameraInfo() + cam_info.width = self.width + cam_info.height = self.height + + # Basic camera matrix (estimated for webcam) + fx = fy = self.width * 0.8 # Rough estimate + cx = self.width / 2.0 + cy = self.height / 2.0 + + cam_info.K = [fx, 0, cx, + 0, fy, cy, + 0, 0, 1] + + cam_info.D = [0, 0, 0, 0, 0] # No distortion + + cam_info.R = [1, 0, 0, + 0, 1, 0, + 0, 0, 1] + + cam_info.P = [fx, 0, cx, 0, + 0, fy, cy, 0, + 0, 0, 1, 0] + + return cam_info + + def cv2_to_ros_image(self, cv_image, encoding="bgr8"): + """Convert OpenCV image to ROS Image message.""" + ros_image = Image() + ros_image.header.stamp = rospy.Time.now() + ros_image.header.frame_id = "camera_link" + ros_image.height = cv_image.shape[0] + ros_image.width = cv_image.shape[1] + ros_image.encoding = encoding + ros_image.is_bigendian = False + ros_image.step = cv_image.shape[1] * cv_image.shape[2] if len(cv_image.shape) == 3 else cv_image.shape[1] + ros_image.data = cv_image.tobytes() + return ros_image + + def create_mock_depth(self, rgb_image): + """Create a mock depth image from RGB (for demonstration).""" + # Convert to grayscale and use as depth + gray = cv2.cvtColor(rgb_image, cv2.COLOR_BGR2GRAY) + # Scale to simulate depth (closer objects are brighter) + depth = 255 - gray + return depth.astype(np.uint8) + + def run(self): + """Main publishing loop.""" + rate = rospy.Rate(self.fps) + + rospy.loginfo("Starting camera publishing loop...") + + while not rospy.is_shutdown(): + ret, frame = self.cap.read() + if not ret: + rospy.logwarn("Failed to capture frame") + continue + + # Create timestamp + timestamp = rospy.Time.now() + + # Publish RGB image + rgb_msg = self.cv2_to_ros_image(frame, "bgr8") + rgb_msg.header.stamp = timestamp + self.rgb_pub.publish(rgb_msg) + + # Create and publish mock depth image + depth_frame = self.create_mock_depth(frame) + depth_msg = self.cv2_to_ros_image(depth_frame, "mono8") + depth_msg.header.stamp = timestamp + self.depth_pub.publish(depth_msg) + + # Publish camera info + self.camera_info.header.stamp = timestamp + self.camera_info.header.frame_id = "camera_link" + self.camera_info_pub.publish(self.camera_info) + + rate.sleep() + + def __del__(self): + """Cleanup resources.""" + if hasattr(self, 'cap') and self.cap.isOpened(): + self.cap.release() + + +def main(): + parser = argparse.ArgumentParser(description="Camera Publisher for ROS") + parser.add_argument("--device", type=int, default=0, help="Camera device index") + parser.add_argument("--width", type=int, default=640, help="Image width") + parser.add_argument("--height", type=int, default=480, help="Image height") + parser.add_argument("--fps", type=int, default=30, help="Frame rate") + parser.add_argument("--rgb-topic", default="/camera/rgb/image_rect_color", help="RGB topic") + parser.add_argument("--depth-topic", default="/camera/depth/depth_registered", help="Depth topic") + parser.add_argument("--camera-info-topic", default="/camera/rgb/camera_info", help="Camera info topic") + + args = parser.parse_args() + + try: + publisher = CameraPublisher( + device=args.device, + width=args.width, + height=args.height, + fps=args.fps, + rgb_topic=args.rgb_topic, + depth_topic=args.depth_topic, + camera_info_topic=args.camera_info_topic + ) + publisher.run() + except rospy.ROSInterruptException: + rospy.loginfo("Camera publisher interrupted") + except KeyboardInterrupt: + rospy.loginfo("Camera publisher stopped by user") + except Exception as e: + rospy.logerr(f"Camera publisher error: {e}") + + +if __name__ == "__main__": + main() +''' + + with open(script_path, "w") as f: + f.write(script_content) + + # Make script executable + script_path.chmod(0o755) + console.print(f"[green]Created camera publisher script at: {script_path}[/green]") + + +if __name__ == "__main__": + autonomy_app() diff --git a/scripts/commands/test.py b/scripts/commands/test.py deleted file mode 100644 index e3cd6a1..0000000 --- a/scripts/commands/test.py +++ /dev/null @@ -1,906 +0,0 @@ -import os -import signal -import subprocess -import sys -import time -from abc import ABC, abstractmethod -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional - -import typer - -from .utils import get_building_path - -test_app = typer.Typer() - -# Global state to store volume setting -_volume_mode = False - - -@test_app.callback() -def test_callback( - volume: bool = typer.Option( - False, "--volume", "-v", help="Use volume directory for tests" - ) -): - """Test suite management and execution""" - global _volume_mode - _volume_mode = volume - - -# === Strategy Pattern for Test Execution === - - -class TestExecutionStrategy(ABC): - @abstractmethod - def execute(self, test_case: "TestCase", env: Dict, runner: "HydrusTestManager"): - pass - - -class UnitTestStrategy(TestExecutionStrategy): - def execute(self, test_case, env, runner): - runner._run_unified_test( - test_case.name, - test_case.path, - test_case.args, - test_case.timeout, - test_case.infinite_loop, - test_case.description, - env, - ) - - -class RosTestStrategy(TestExecutionStrategy): - def execute(self, test_case, env, runner): - runner._run_rostest(test_case.name, str(test_case.path), env) - - -class ScriptTestStrategy(TestExecutionStrategy): - def execute(self, test_case, env, runner): - runner._run_unified_test( - test_case.name, - test_case.path, - test_case.args, - test_case.timeout, - test_case.infinite_loop, - test_case.description, - env, - ) - - -# === TestCase Definition === - - -class TestCase: - def __init__( - self, - name: str, - path: Path, - args: List[str], - timeout: int, - infinite_loop: bool, - description: str, - strategy: TestExecutionStrategy, - ): - self.name = name - self.path = path - self.args = args - self.timeout = timeout - self.infinite_loop = infinite_loop - self.description = description - self.strategy = strategy - - -# === Test Builders === - - -class TestBuilder(ABC): - @abstractmethod - def build_tests(self) -> List[TestCase]: - pass - - -class UnitTestBuilder(TestBuilder): - def __init__(self, ros_dir: Path): - self.ros_dir = ros_dir - - def build_tests(self) -> List[TestCase]: - return [ - TestCase( - name="Tagging Mission Unit Test", - path=self.ros_dir - / "src/hydrus-software-stack/autonomy/src/mission_planner/tagging_mission_test.py", - args=[], - timeout=30, - infinite_loop=False, - description="Unit test for tagging mission", - strategy=UnitTestStrategy(), - ) - ] - - -class RosIntegrationTestBuilder(TestBuilder): - def __init__(self, ros_dir: Path): - self.ros_dir = ros_dir - - def build_tests(self) -> List[TestCase]: - return [ - TestCase( - name="Controller Tests", - path=Path("controller.test"), - args=[], - timeout=600, - infinite_loop=False, - description="ROS controller integration tests using rostest", - strategy=RosTestStrategy(), - ), - TestCase( - name="Slalom Integration Tests", - path=self.ros_dir - / "src/hydrus-software-stack/autonomy/src/mission_planner/test_slalom_integration.py", - args=[], - timeout=300, - infinite_loop=False, - description="Slalom mission integration tests", - strategy=UnitTestStrategy(), - ), - TestCase( - name="Gate Mission Tests", - path=self.ros_dir - / "src/hydrus-software-stack/autonomy/src/mission_planner/test_gate_mission.py", - args=[], - timeout=300, - infinite_loop=False, - description="Gate mission functionality tests", - strategy=UnitTestStrategy(), - ), - TestCase( - name="DVL Driver Tests", - path=self.ros_dir - / "src/hydrus-software-stack/devices/DVL/Wayfinder/driver_test.py", - args=[], - timeout=60, - infinite_loop=False, - description="DVL driver functionality tests", - strategy=UnitTestStrategy(), - ), - ] - - -class ScriptTestBuilder(TestBuilder): - def __init__(self, ros_dir: Path): - self.ros_dir = ros_dir - - def build_tests(self) -> List[TestCase]: - return [ - TestCase( - name="API Server", - path=self.ros_dir - / "src/hydrus-software-stack/autonomy/src/api_server.py", - args=[], - timeout=2, # Increased slightly for safety - infinite_loop=True, - description="API Server script that runs indefinitely to handle API requests.", - strategy=ScriptTestStrategy(), - ), - TestCase( - name="Controllers Node", - path=self.ros_dir - / "src/hydrus-software-stack/autonomy/src/controllers.py", - args=[], - timeout=2, - infinite_loop=True, - description="Controllers Node script that runs indefinitely. Handles controller logic for the robot.", - strategy=ScriptTestStrategy(), - ), - TestCase( - name="Computer Vision Node", - path=self.ros_dir - / "src/hydrus-software-stack/autonomy/src/cv_publishers.py", - args=[], - timeout=2, - infinite_loop=True, - description="Computer vision Node script that runs indefinitely for image processing.", - strategy=ScriptTestStrategy(), - ), - TestCase( - name="Controller Monitor Node", - path=self.ros_dir - / "src/hydrus-software-stack/autonomy/scripts/controller/controller_monitor.py", - args=[], - timeout=2, - infinite_loop=True, - description="Controller Monitor Node script that runs indefinitely to manage missions.", - strategy=ScriptTestStrategy(), - ), - ] - - -# === Test Director === - - -class TestDirector: - def __init__(self, builder: TestBuilder): - self.builder = builder - - def construct_tests(self) -> List[TestCase]: - return self.builder.build_tests() - - -class HydrusTestManager: - def __init__(self, volume: bool = False, debug_mode: bool = False): - self.volume = volume - self.debug_mode = debug_mode - self.ros_dir = get_building_path(self.volume) - # Test tracking - self.total_tests = 0 - self.passed_tests = 0 - self.failed_tests = 0 - # Process tracking - self.roscore_pid: Optional[int] = None - - # Logging setup - self.test_logs_dir = self.ros_dir / "src/hydrus-software-stack/test_logs" - self.test_logs_dir.mkdir(exist_ok=True) - - # Create timestamped log file - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - self.log_file = self.test_logs_dir / f"test_run_{timestamp}.log" - self.current_test_log = None - - def _run_command( - self, - cmd: List[str], - timeout: Optional[int] = None, - check: bool = True, - capture_output: bool = False, - env: Optional[Dict] = None, - cwd: Optional[Path] = None, - ) -> subprocess.CompletedProcess: - """ - Execute a shell command with comprehensive error handling and logging. - - This method provides a robust wrapper around subprocess.run() with enhanced - error reporting, timeout handling, and execution context logging. It's designed - to execute commands in a controlled environment with clear feedback about - what's happening and why failures occur. - - Args: - cmd (List[str]): Command and arguments to execute as a list of strings. - Example: ['python3', '-m', 'pytest', 'tests/'] - timeout (Optional[int], optional): Maximum execution time in seconds. - If None, no timeout is applied. - Defaults to None. - check (bool, optional): If True, raises CalledProcessError for non-zero - exit codes. If False, allows commands to fail - without raising exceptions. Defaults to True. - capture_output (bool, optional): If True, captures stdout and stderr - for programmatic access. If False, - output goes directly to terminal. - Defaults to False. - env (Optional[Dict], optional): Environment variables for the command. - If None, inherits current environment. - Defaults to None. - cwd (Optional[Path], optional): Working directory for command execution. - If None, uses current working directory. - Defaults to None. - """ - if env is None: - env = os.environ.copy() - - # Always print the command being executed - cmd_str = " ".join(cmd) - cwd_str = f" (in {cwd})" if cwd else "" - print(f"๐Ÿ”ง Executing command: {cmd_str}{cwd_str}") - - try: - return subprocess.run( - cmd, - timeout=timeout, - check=check, - capture_output=capture_output, - env=env, - cwd=cwd, - text=True, - ) - except FileNotFoundError as e: - print(f"โŒ Command not found: {cmd_str}") - print(f" Error: {e}") - raise - except subprocess.TimeoutExpired: - print(f"โฐ Command timed out after {timeout} seconds: {cmd_str}") - raise - except subprocess.CalledProcessError as e: - if check: - print(f"โŒ Command failed: {cmd_str}") - print(f" Exit code: {e.returncode}") - if capture_output and e.stderr: - print(f" Error: {e.stderr}") - raise - - def _build_workspace(self): - """Build the catkin workspace""" - print("Building workspace...") - env = os.environ.copy() - - # Build command that sources ROS environment first, then runs catkin_make - cmake_command = "source /opt/ros/noetic/setup.bash && catkin_make --cmake-args -DCMAKE_BUILD_TYPE=Release -DPYTHON_EXECUTABLE=/usr/bin/python3 -DPYTHON_INCLUDE_DIR=/usr/include/python3.9 -DPYTHON_LIBRARY=/usr/lib/aarch64-linux-gnu/libpython3.9.so" - - cmake_args = ["bash", "-c", cmake_command] - - try: - self._run_command(cmake_args, check=False, cwd=self.ros_dir, env=env) - except subprocess.CalledProcessError: - print("Build failed, but continuing...") - - def _is_process_running(self, pid: int) -> bool: - """Check if a process is running""" - try: - os.kill(pid, 0) - return True - except OSError: - return False - - def _log_message(self, message: str, also_print: bool = True): - """Log a message to the main log file and optionally print to console""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - log_entry = f"[{timestamp}] {message}\n" - - # Write to main log file - with open(self.log_file, "a") as f: - f.write(log_entry) - - # Optionally print to console - if also_print: - print(message) - - def _signal_handler(self, signum, frame): - """Handle interrupt signals gracefully""" - print(f"\n๐Ÿ›‘ Received signal {signum}. Cleaning up...") - self._cleanup_roscore() - print("Cleanup completed. Exiting.") - sys.exit(1) - - def _start_roscore(self, env: Dict[str, str]): - """Start roscore in background""" - print("Starting roscore...") - - try: - process = subprocess.Popen( - ["roscore"], - env=env, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - self.roscore_pid = process.pid - time.sleep(5) - - # Check if roscore is running - if not self._is_process_running(self.roscore_pid): - print("Failed to start roscore") - sys.exit(1) - - print("Roscore started successfully") - - except Exception as e: - print(f"Failed to start roscore: {e}") - sys.exit(1) - - def _cleanup_roscore(self): - """Clean up roscore process""" - if self.roscore_pid and self._is_process_running(self.roscore_pid): - print("Cleaning up roscore...") - try: - os.kill(self.roscore_pid, signal.SIGTERM) - time.sleep(2) - - # Force kill if still running - if self._is_process_running(self.roscore_pid): - os.kill(self.roscore_pid, signal.SIGKILL) - print("Forcefully killed roscore") - else: - print("Roscore stopped gracefully") - - except OSError: - pass # Process was already dead - - def _run_rostest(self, test_name: str, test_file: str, env: Dict[str, str]): - """Run rostest with proper timeout and error handling""" - print("") - print("----------------------------------------") - print(f"Running rostest: {test_name}") - print("----------------------------------------") - - self.total_tests += 1 - - try: - bash_cmd = [ - "bash", - "-c", - f"source {self.ros_dir}/devel/setup.bash 2>/dev/null || true; rostest autonomy {test_file}", - ] - - self._run_command(bash_cmd, timeout=600, capture_output=False, env=env) - print(f"โœ… PASSED: {test_name}") - self.passed_tests += 1 - - except (subprocess.CalledProcessError, subprocess.TimeoutExpired): - print(f"โŒ FAILED: {test_name}") - self.failed_tests += 1 - - def _run_unified_test( - self, - test_name: str, - test_path: Path, - args: List[str], - timeout: int, - infinite_loop: bool, - description: str, - env: Dict[str, str], - ): - """Run a unified test using the strategy pattern""" - print("") - print("----------------------------------------") - print(f"Running test: {test_name}") - print(f"Description: {description}") - print("----------------------------------------") - - self.total_tests += 1 - - # Create individual test log file - test_log_file = ( - self.test_logs_dir / f"{test_name.replace(' ', '_').lower()}.log" - ) - - # Log test start - self._log_message(f"=== Starting Test: {test_name} ===") - self._log_message(f"Description: {description}") - self._log_message(f"Test path: {test_path}") - self._log_message(f"Args: {args}") - self._log_message(f"Timeout: {timeout}s") - self._log_message(f"Infinite loop: {infinite_loop}") - - try: - if test_path.suffix == ".py": - # Python test - need to source ROS environment for autonomy package - # Check if this is a ROS-dependent test - if any( - ros_keyword in str(test_path) - for ros_keyword in [ - "autonomy", - "mission_planner", - "api_server", - "controllers", - "cv_publishers", - ] - ): - # Run with sourced ROS environment - cmd_script = f"source /opt/ros/noetic/setup.bash && source {self.ros_dir}/devel/setup.bash && python3 {test_path} {' '.join(args)}" - cmd = ["bash", "-c", cmd_script] - else: - # Regular Python test - cmd = ["python3", str(test_path)] + args - else: - # Shell script or other executable - cmd = [str(test_path)] + args - - # Log the command - cmd_str = " ".join(cmd) - self._log_message(f"Command: {cmd_str}") - - # Execute test with timeout unless it's an infinite loop test - test_timeout = timeout if not infinite_loop else timeout - - with open(test_log_file, "w") as f: - try: - subprocess.run( - cmd, - timeout=test_timeout, - check=True, - stdout=f, - stderr=subprocess.STDOUT, - env=env, - cwd=self.ros_dir, - text=True, - ) - - if infinite_loop: - # For infinite loop tests, timeout is expected - print(f"โœ… PASSED: {test_name} (ran for expected {timeout}s)") - self.passed_tests += 1 - self._log_message(f"Test passed (infinite loop): {test_name}") - else: - print(f"โœ… PASSED: {test_name}") - self.passed_tests += 1 - self._log_message(f"Test passed: {test_name}") - - except subprocess.TimeoutExpired: - if infinite_loop: - # This is expected for infinite loop tests - print(f"โœ… PASSED: {test_name} (timed out as expected)") - self.passed_tests += 1 - self._log_message( - f"Test passed (expected timeout): {test_name}" - ) - else: - print(f"โฐ TIMEOUT: {test_name} (exceeded {timeout}s)") - self.failed_tests += 1 - self._log_message(f"Test timed out: {test_name}") - - except subprocess.CalledProcessError as e: - print(f"โŒ FAILED: {test_name} (exit code: {e.returncode})") - self.failed_tests += 1 - self._log_message(f"Test failed: {test_name} with exit code {e.returncode}") - - except FileNotFoundError: - print(f"โŒ FAILED: {test_name} (test file not found: {test_path})") - cmd_str = " ".join([str(test_path)] + args) - - # Write failure details to individual test log - with open(test_log_file, "w") as f: - f.write("Test failed: File not found\n") - f.write(f"Path: {test_path}\n") - f.write(f"Command: {cmd_str}\n") - - self.failed_tests += 1 - - # Log test completion - self._log_message(f"=== Completed Test: {test_name} ===\n") - - def execute_test_cases(self, test_cases: List[TestCase], env: Dict[str, str]): - """Execute a list of test cases using their strategies""" - for test_case in test_cases: - test_case.strategy.execute(test_case, env, self) - - def _run_unit_tests(self, env: Dict[str, str]): - """Run unit tests using strategy pattern""" - print("\n๐Ÿ”ฌ Running Unit Tests...") - - # Create unit test builder and director - unit_builder = UnitTestBuilder(self.ros_dir) - director = TestDirector(unit_builder) - unit_tests = director.construct_tests() - - # Execute tests - self.execute_test_cases(unit_tests, env) - - def _run_ros_integration_tests(self, env: Dict[str, str]): - """Run ROS integration tests using strategy pattern""" - print("\n๐Ÿ”— Running ROS Integration Tests...") - - # Start roscore for ROS tests - self._start_roscore(env) - - # Create ROS test builder and director - ros_builder = RosIntegrationTestBuilder(self.ros_dir) - director = TestDirector(ros_builder) - ros_tests = director.construct_tests() - - # Execute tests - self.execute_test_cases(ros_tests, env) - - def _run_script_tests(self, env: Dict[str, str]): - """Run script tests using strategy pattern""" - print("\n๐Ÿ“œ Running Script Tests...") - - # Create script test builder and director - script_builder = ScriptTestBuilder(self.ros_dir) - director = TestDirector(script_builder) - script_tests = director.construct_tests() - - # Execute tests - self.execute_test_cases(script_tests, env) - - def _print_summary(self) -> int: - """Print test results summary and return exit code""" - print("") - print("==========================================") - print("TEST RESULTS SUMMARY") - print("==========================================") - print(f"Total tests run: {self.total_tests}") - print(f"โœ… Passed: {self.passed_tests}") - print(f"โŒ Failed: {self.failed_tests}") - - if self.failed_tests == 0: - print("\n๐ŸŽ‰ All tests passed!") - return 0 - else: - success_rate = ( - (self.passed_tests / self.total_tests) * 100 - if self.total_tests > 0 - else 0 - ) - print(f"\n๐Ÿ“Š Success rate: {success_rate:.1f}%") - print(f"๐Ÿ“ Detailed logs available in: {self.test_logs_dir}") - return 1 - - def main(self): - """Main execution function""" - print("==========================================") - print("Running Hydrus Autonomy Test Suite") - print("==========================================") - - # Setup signal handlers - signal.signal(signal.SIGINT, self._signal_handler) - signal.signal(signal.SIGTERM, self._signal_handler) - env = os.environ.copy() - - try: - # Build workspace and setup environment - self._build_workspace() - # Run unit tests - self._run_unit_tests(env) - - # Run ROS integration tests - self._run_ros_integration_tests(env) - - # Run script tests - self._run_script_tests(env) - - finally: - # Always clean up - self._cleanup_roscore() - - # Print final results and exit - exit_code = self._print_summary() - sys.exit(exit_code) - - def _find_integration_test_by_name(self, test_name: str) -> Optional[TestCase]: - """Find an integration test by its name""" - ros_builder = RosIntegrationTestBuilder(self.ros_dir) - director = TestDirector(ros_builder) - ros_tests = director.construct_tests() - - for test in ros_tests: - if test.name.lower() == test_name.lower(): - return test - return None - - def _list_integration_test_names(self) -> List[str]: - """Get a list of all integration test names""" - ros_builder = RosIntegrationTestBuilder(self.ros_dir) - director = TestDirector(ros_builder) - ros_tests = director.construct_tests() - - return [test.name for test in ros_tests] - - def _run_specific_integration_test(self, test_name: str, env: Dict[str, str]): - """Run a specific integration test by name""" - test_case = self._find_integration_test_by_name(test_name) - - if not test_case: - available_tests = self._list_integration_test_names() - print(f"โŒ Integration test '{test_name}' not found.") - print("Available integration tests:") - for name in available_tests: - print(f" โ€ข {name}") - return - - print(f"\n๐Ÿ”— Running specific integration test: {test_name}") - - # Start roscore for ROS tests - self._start_roscore(env) - - # Execute the specific test - self.execute_test_cases([test_case], env) - - -# === Typer Commands === - - -@test_app.command("all") -def run_all_tests( - debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug mode"), -): - """Run all tests in the Hydrus software stack.""" - typer.echo("๐Ÿš€ Running all tests in the Hydrus software stack...") - - test_manager = HydrusTestManager(volume=_volume_mode, debug_mode=debug) - test_manager.main() - - -@test_app.command("unit") -def run_unit_tests_cmd( - debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug mode"), -): - """Run only unit tests.""" - typer.echo("๐Ÿ”ฌ Running unit tests...") - - test_manager = HydrusTestManager(volume=_volume_mode, debug_mode=debug) - env = os.environ.copy() - - try: - test_manager._build_workspace() - test_manager._run_unit_tests(env) - finally: - test_manager._cleanup_roscore() - - exit_code = test_manager._print_summary() - raise typer.Exit(exit_code) - - -@test_app.command("integration") -def run_integration_tests_cmd( - debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug mode"), -): - """Run only ROS integration tests.""" - typer.echo("๐Ÿ”— Running ROS integration tests...") - - test_manager = HydrusTestManager(volume=_volume_mode, debug_mode=debug) - env = os.environ.copy() - - try: - test_manager._build_workspace() - test_manager._run_ros_integration_tests(env) - finally: - test_manager._cleanup_roscore() - - exit_code = test_manager._print_summary() - raise typer.Exit(exit_code) - - -@test_app.command("integration-test") -def run_specific_integration_test( - test_name: str = typer.Argument(..., help="Name of the integration test to run"), - debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug mode"), -): - """Run a specific integration test by name.""" - typer.echo(f"๐Ÿ”— Running specific integration test: {test_name}") - - test_manager = HydrusTestManager(volume=_volume_mode, debug_mode=debug) - env = os.environ.copy() - - try: - test_manager._build_workspace() - test_manager._run_specific_integration_test(test_name, env) - finally: - test_manager._cleanup_roscore() - - exit_code = test_manager._print_summary() - raise typer.Exit(exit_code) - - -@test_app.command("scripts") -def run_script_tests_cmd( - debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug mode"), -): - """Run only script tests.""" - typer.echo("๐Ÿ“œ Running script tests...") - - test_manager = HydrusTestManager(volume=_volume_mode, debug_mode=debug) - env = os.environ.copy() - - try: - test_manager._build_workspace() - test_manager._run_script_tests(env) - finally: - test_manager._cleanup_roscore() - - exit_code = test_manager._print_summary() - raise typer.Exit(exit_code) - - -@test_app.command("custom") -def run_custom_test( - test_name: str = typer.Argument(..., help="Name of the test"), - test_path: str = typer.Argument(..., help="Path to the test file"), - debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug mode"), - timeout: int = typer.Option(300, "--timeout", "-t", help="Test timeout in seconds"), - args: List[str] = typer.Option( - [], "--args", "-a", help="Additional arguments for the test" - ), -): - """Run a custom test with specified parameters.""" - typer.echo(f"๐Ÿงช Running custom test: {test_name}") - - test_manager = HydrusTestManager(volume=_volume_mode, debug_mode=debug) - env = os.environ.copy() - - # Create a custom test case - custom_test = TestCase( - name=test_name, - path=Path(test_path), - args=args, - timeout=timeout, - infinite_loop=False, - description=f"Custom test: {test_name}", - strategy=UnitTestStrategy(), - ) - - try: - test_manager._build_workspace() - test_manager.execute_test_cases([custom_test], env) - finally: - test_manager._cleanup_roscore() - - exit_code = test_manager._print_summary() - raise typer.Exit(exit_code) - - -@test_app.command("list") -def list_available_tests( - integration_only: bool = typer.Option( - False, "--integration-only", "-i", help="Show only integration tests" - ), -): - """List all available tests.""" - if integration_only: - typer.echo("๐Ÿ“‹ Available Integration Tests:") - else: - typer.echo("๐Ÿ“‹ Available tests:") - - test_manager = HydrusTestManager(volume=_volume_mode) - - if not integration_only: - # List unit tests - unit_builder = UnitTestBuilder(test_manager.ros_dir) - director = TestDirector(unit_builder) - unit_tests = director.construct_tests() - - typer.echo("\n๐Ÿ”ฌ Unit Tests:") - for test in unit_tests: - typer.echo(f" โ€ข {test.name}: {test.description}") - - # List ROS integration tests - ros_builder = RosIntegrationTestBuilder(test_manager.ros_dir) - director = TestDirector(ros_builder) - ros_tests = director.construct_tests() - - if integration_only: - typer.echo("\n๐Ÿ”— Integration Tests:") - else: - typer.echo("\n๐Ÿ”— ROS Integration Tests:") - for test in ros_tests: - typer.echo(f" โ€ข {test.name}: {test.description}") - - if not integration_only: - # List script tests - script_builder = ScriptTestBuilder(test_manager.ros_dir) - director = TestDirector(script_builder) - script_tests = director.construct_tests() - - typer.echo("\n๐Ÿ“œ Script Tests:") - for test in script_tests: - typer.echo(f" โ€ข {test.name}: {test.description}") - - -@test_app.command("validate") -def validate_test_environment(): - """Validate the test environment setup.""" - typer.echo("๐Ÿ” Validating test environment...") - - test_manager = HydrusTestManager(volume=_volume_mode) - - # Check ROS workspace - if not test_manager.ros_dir.exists(): - typer.echo(f"โŒ ROS workspace not found: {test_manager.ros_dir}") - raise typer.Exit(1) - else: - typer.echo(f"โœ… ROS workspace found: {test_manager.ros_dir}") - - # Check test logs directory - if not test_manager.test_logs_dir.exists(): - typer.echo(f"โŒ Test logs directory not found: {test_manager.test_logs_dir}") - raise typer.Exit(1) - else: - typer.echo(f"โœ… Test logs directory found: {test_manager.test_logs_dir}") - - # Check for test files - unit_builder = UnitTestBuilder(test_manager.ros_dir) - director = TestDirector(unit_builder) - unit_tests = director.construct_tests() - - missing_tests = [] - for test in unit_tests: - if not test.path.exists(): - missing_tests.append(test.path) - - if missing_tests: - typer.echo("โš ๏ธ Missing test files:") - for path in missing_tests: - typer.echo(f" โ€ข {path}") - else: - typer.echo("โœ… All test files found") - - typer.echo("โœ… Test environment validation complete") diff --git a/scripts/commands/test/__init__.py b/scripts/commands/test/__init__.py new file mode 100644 index 0000000..3ce1e0d --- /dev/null +++ b/scripts/commands/test/__init__.py @@ -0,0 +1,13 @@ +""" +Test command module for Hydrus software stack. + +This module contains all test-related functionality including: +- Test execution and management +- Test configuration loading from distributed .hss files +- Test logging and result archiving +- Test caching functionality +""" + +from .test import test_app + +__all__ = ["test_app"] diff --git a/scripts/commands/test/test.py b/scripts/commands/test/test.py new file mode 100644 index 0000000..d3f78ea --- /dev/null +++ b/scripts/commands/test/test.py @@ -0,0 +1,258 @@ +import os +from pathlib import Path +from typing import Dict + +import typer +import yaml + +from ..utils import get_building_path +from .test_cases import RosTestStrategy, ScriptTestStrategy +from .test_config_loader import ( + DistributedTestConfigLoader, + TestConfig, + get_strategy_class, +) +from .test_manager import HydrusTestManager + +# Create the test app +test_app = typer.Typer() + +# Global state to store volume setting +_volume_mode = False + + +def _is_directory_target(target: str, repo_root: Path) -> bool: + """Determine if the target is a directory path.""" + # Check if it's a directory path + if target in [".", "./", ""]: + return True + + # Check if it's an absolute directory path + if target.startswith("/"): + abs_path = Path(target) + return abs_path.exists() and abs_path.is_dir() + + # Check if it's a relative directory path from repo root + rel_path = repo_root / target + return rel_path.exists() and rel_path.is_dir() + + +def _run_tests_in_directory( + target: str, + repo_root: Path, + config_loader: DistributedTestConfigLoader, + test_manager: HydrusTestManager, + env: Dict[str, str], +): + """Run all tests found in the specified directory.""" + # Determine search directory + if target == "." or target == "./": + search_dir = Path.cwd() + try: + search_dir.relative_to(repo_root) + except ValueError: + search_dir = repo_root + elif target.startswith("/"): + search_dir = Path(target) + if not str(search_dir).startswith(str(repo_root)): + typer.echo(f"โŒ Directory must be within repository: {repo_root}") + raise typer.Exit(1) + else: + search_dir = repo_root / target + + if not search_dir.exists(): + typer.echo(f"โŒ Directory not found: {search_dir}") + raise typer.Exit(1) + + typer.echo(f"๐Ÿ” Running tests in directory: {search_dir}") + + # Find all .hss files in the directory and subdirectories + hss_files = list(search_dir.rglob("*.hss")) + + if not hss_files: + typer.echo(f"โŒ No .hss files found in {search_dir}") + return + + typer.echo(f"๐Ÿ“ Found {len(hss_files)} .hss files") + + # Collect all test cases from the directory + all_test_cases = [] + + for hss_file in hss_files: + try: + with open(hss_file, "r") as f: + hss_content = yaml.safe_load(f) + + if not hss_content or "tests" not in hss_content: + continue + + # Convert tests to TestCase objects + for test_config_data in hss_content["tests"]: + if test_config_data.get("enabled", True): # Only run enabled tests + test_config = TestConfig( + name=test_config_data.get("name", "Unnamed Test"), + path=test_config_data.get("path", ""), + args=test_config_data.get("args", []), + timeout=test_config_data.get("timeout", 300), + infinite_loop=test_config_data.get("infinite_loop", False), + description=test_config_data.get("description", ""), + strategy=test_config_data.get("strategy", "UnitTestStrategy"), + enabled=test_config_data.get("enabled", True), + test_type=test_config_data.get("test_type", "python"), + source_file=str(hss_file), + ) + + strategy_class = get_strategy_class(test_config.strategy) + test_case = test_config.to_test_case( + test_manager.ros_dir, strategy_class + ) + all_test_cases.append(test_case) + + except Exception as e: + typer.echo(f"โš ๏ธ Warning: Error loading {hss_file.name}: {e}") + + if not all_test_cases: + typer.echo("โŒ No enabled tests found in the specified directory") + return + + typer.echo(f"๐Ÿš€ Running {len(all_test_cases)} tests from directory") + + # Check if any tests need ROS + needs_ros = any( + isinstance(test_case.strategy, (RosTestStrategy, ScriptTestStrategy)) + or any( + ros_keyword in str(test_case.path) + for ros_keyword in [ + "autonomy", + "mission_planner", + "api_server", + "controllers", + "cv_publishers", + ] + ) + for test_case in all_test_cases + ) + + if needs_ros: + test_manager._start_roscore(env) + + # Execute all test cases + test_manager.execute_test_cases(all_test_cases, env) + + +def _run_specific_test_by_name( + test_name: str, + config_loader: DistributedTestConfigLoader, + test_manager: HydrusTestManager, + env: Dict[str, str], +): + """Run a specific test by searching for it by name across all .hss files.""" + typer.echo(f"๐Ÿ” Searching for test: {test_name}") + + # Search across all test types + unit_tests = config_loader.load_unit_tests() + integration_tests = config_loader.load_integration_tests() + script_tests = config_loader.load_script_tests() + + all_tests = unit_tests.tests + integration_tests.tests + script_tests.tests + + # Find the matching test + matching_test = None + for test_config in all_tests: + if test_config.name.lower() == test_name.lower(): + matching_test = test_config + break + + if not matching_test: + typer.echo(f"โŒ Test '{test_name}' not found.") + + # Show available tests + typer.echo("\nAvailable tests:") + for test_config in all_tests: + if test_config.enabled: + strategy_icon = ( + "๐Ÿ”ฌ" + if "unit" in test_config.strategy.lower() + else "๐Ÿ”—" + if "ros" in test_config.strategy.lower() + else "๐Ÿ“œ" + ) + typer.echo(f" {strategy_icon} {test_config.name}") + + raise typer.Exit(1) + + if not matching_test.enabled: + typer.echo(f"โŒ Test '{test_name}' is disabled.") + raise typer.Exit(1) + + typer.echo(f"โœ… Found test: {matching_test.name}") + typer.echo(f"๐Ÿ“ Description: {matching_test.description}") + + # Convert to TestCase and execute + strategy_class = get_strategy_class(matching_test.strategy) + test_case = matching_test.to_test_case(test_manager.ros_dir, strategy_class) + + # Check if test needs ROS + needs_ros = isinstance( + test_case.strategy, (RosTestStrategy, ScriptTestStrategy) + ) or any( + ros_keyword in str(test_case.path) + for ros_keyword in [ + "autonomy", + "mission_planner", + "api_server", + "controllers", + "cv_publishers", + ] + ) + + if needs_ros: + test_manager._start_roscore(env) + + # Execute the test + test_manager.execute_test_cases([test_case], env) + + +# === Main Command === + + +@test_app.command() +def run( + target: str = typer.Argument(".", help="Directory path or test name to run"), + debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug mode"), + volume: bool = typer.Option( + False, "--volume", "-v", help="Use volume directory for tests" + ), +): + """Run tests automatically detecting the type and scope based on the target.""" + global _volume_mode + _volume_mode = volume + + config_loader = DistributedTestConfigLoader() + test_manager = HydrusTestManager(volume=_volume_mode, debug_mode=debug) + env = os.environ.copy() + + # Get the repository root + hydrus_root = os.environ.get("HYDRUS_ROOT") + if hydrus_root: + repo_root = Path(hydrus_root) + else: + building_path = get_building_path(_volume_mode) + repo_root = building_path.parent + + try: + test_manager._build_workspace() + + # Determine if target is a directory or test name + if _is_directory_target(target, repo_root): + # Handle directory target + _run_tests_in_directory(target, repo_root, config_loader, test_manager, env) + else: + # Handle test name target + _run_specific_test_by_name(target, config_loader, test_manager, env) + + finally: + test_manager._cleanup_roscore() + + exit_code = test_manager._print_summary() + raise typer.Exit(exit_code) diff --git a/scripts/commands/test/test_cache.py b/scripts/commands/test/test_cache.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/commands/test/test_cases.py b/scripts/commands/test/test_cases.py new file mode 100644 index 0000000..1afa955 --- /dev/null +++ b/scripts/commands/test/test_cases.py @@ -0,0 +1,65 @@ +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Dict, List + +# === Strategy Pattern for Test Execution === + + +class TestExecutionStrategy(ABC): + @abstractmethod + def execute(self, test_case: "TestCase", env: Dict, runner): + pass + + +class UnitTestStrategy(TestExecutionStrategy): + def execute(self, test_case, env, runner): + runner._run_unified_test( + test_case.name, + test_case.path, + test_case.args, + test_case.timeout, + test_case.infinite_loop, + test_case.description, + env, + ) + + +class RosTestStrategy(TestExecutionStrategy): + def execute(self, test_case, env, runner): + runner._run_rostest(test_case.name, str(test_case.path), env) + + +class ScriptTestStrategy(TestExecutionStrategy): + def execute(self, test_case, env, runner): + runner._run_unified_test( + test_case.name, + test_case.path, + test_case.args, + test_case.timeout, + test_case.infinite_loop, + test_case.description, + env, + ) + + +# === TestCase Definition === + + +class TestCase: + def __init__( + self, + name: str, + path: Path, + args: List[str], + timeout: int, + infinite_loop: bool, + description: str, + strategy: TestExecutionStrategy, + ): + self.name = name + self.path = path + self.args = args + self.timeout = timeout + self.infinite_loop = infinite_loop + self.description = description + self.strategy = strategy diff --git a/scripts/commands/test/test_config_loader.py b/scripts/commands/test/test_config_loader.py new file mode 100644 index 0000000..e378829 --- /dev/null +++ b/scripts/commands/test/test_config_loader.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python3 +""" +test_config_loader.py + +Distributed test configuration loader for the Hydrus test system. +Searches for .hss files throughout the repository and loads test configurations. +""" + +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional + +import yaml + + +@dataclass +class TestConfig: + name: str + path: str + args: List[str] + timeout: int + infinite_loop: bool + description: str + strategy: str + enabled: bool + test_type: str + source_file: Optional[str] = None # Track which .hss file this came from + dependencies: Optional[List[str]] = None # Explicit dependencies + + def to_test_case(self, ros_dir: Path, strategy_class): + """Convert TestConfig to TestCase object.""" + from .test_cases import TestCase + + # Handle absolute vs relative paths + if self.path.startswith("/"): + test_path = Path(self.path) + else: + # If source_file is available, make path relative to its directory + if self.source_file: + source_dir = Path(self.source_file).parent + test_path = source_dir / self.path + else: + # Fallback to old behavior + test_path = ros_dir / "src/hydrus-software-stack" / self.path + + return TestCase( + name=self.name, + path=test_path, + args=self.args, + timeout=self.timeout, + infinite_loop=self.infinite_loop, + description=self.description, + strategy=strategy_class(), + ) + + +@dataclass +class TestSuite: + name: str + description: str + strategy: str + tests: List[TestConfig] + source_files: List[str] # Track which files contributed to this suite + + +class DistributedTestConfigLoader: + """ + Loads test configurations from .hss files distributed throughout the repository. + Searches for .hss files in specific directories and aggregates them by test type. + """ + + def __init__(self, repository_root: Optional[Path] = None): + # First try to get root from environment variable + hydrus_root = os.environ.get("HYDRUS_ROOT") + if hydrus_root: + self.repository_root = Path(hydrus_root) + print(f"Using HYDRUS_ROOT environment variable: {self.repository_root}") + elif repository_root: + self.repository_root = repository_root + print(f"Using provided repository root: {self.repository_root}") + else: + # Fallback: try to detect based on the location of this script + script_path = Path(__file__).resolve() + # Go up from scripts/commands/test_config_loader.py to find the project root + potential_root = script_path.parent.parent.parent + if (potential_root / "hydrus-cli").exists() or ( + potential_root / "autonomy" + ).exists(): + self.repository_root = potential_root + print(f"Auto-detected repository root: {self.repository_root}") + else: + raise RuntimeError( + "Could not determine repository root. Please set HYDRUS_ROOT environment variable." + ) + + self.search_paths = [ + "autonomy", + "devices", + "simulator", + "scripts", + "test_configs", # Keep existing central configs + "test", # Root test directory + ] + + def find_hss_files(self) -> List[Path]: + """Find all .hss files in the repository.""" + hss_files = [] + + for search_path in self.search_paths: + search_dir = self.repository_root / search_path + if search_dir.exists(): + # Recursively find all .hss files + hss_files.extend(search_dir.rglob("*.hss")) + + return hss_files + + def _load_hss_file(self, file_path: Path) -> Dict: + """Load an .hss file and return its contents.""" + try: + with open(file_path, "r") as file: + data = yaml.safe_load(file) + return data if data is not None else {} + except Exception as e: + print(f"Warning: Failed to load .hss file {file_path}: {e}") + return {} + + def _parse_test_configs(self, data: Dict, source_file: Path) -> List[TestConfig]: + """Parse test configurations from .hss data.""" + test_configs = [] + + for test_data in data.get("tests", []): + test_config = TestConfig( + name=test_data["name"], + path=test_data["path"], + args=test_data.get("args", []), + timeout=test_data.get("timeout", 60), + infinite_loop=test_data.get("infinite_loop", False), + description=test_data.get("description", ""), + strategy=test_data.get("strategy", "UnitTestStrategy"), + enabled=test_data.get("enabled", True), + test_type=test_data.get("test_type", "python"), + source_file=str(source_file), + dependencies=test_data.get("dependencies", None), + ) + test_configs.append(test_config) + + return test_configs + + def load_tests_by_strategy(self, target_strategy: str) -> TestSuite: + """Load all tests that match a specific strategy from distributed .hss files.""" + hss_files = self.find_hss_files() + all_test_configs = [] + source_files = [] + + suite_name = f"{target_strategy.title()} Tests" + suite_description = ( + f"Tests loaded from distributed .hss files with strategy: {target_strategy}" + ) + + for hss_file in hss_files: + try: + data = self._load_hss_file(hss_file) + if not data: + continue + + # Check if this file contains tests for our target strategy + file_strategy = data.get("strategy", "") + test_configs = self._parse_test_configs(data, hss_file) + + # Filter tests that match our target strategy (either file-level or test-level) + matching_tests = [] + for test_config in test_configs: + if ( + file_strategy == target_strategy + or test_config.strategy.lower().replace("teststrategy", "") + == target_strategy.lower().replace("teststrategy", "") + or test_config.test_type == target_strategy + ): + matching_tests.append(test_config) + + if matching_tests: + all_test_configs.extend(matching_tests) + source_files.append(str(hss_file)) + + # Use the first file's metadata if available + if data.get("name") and len(all_test_configs) == len( + matching_tests + ): + suite_name = data.get("name", suite_name) + suite_description = data.get("description", suite_description) + + except Exception as e: + print(f"Warning: Error processing {hss_file}: {e}") + continue + + return TestSuite( + name=str(suite_name), + description=str(suite_description), + strategy=target_strategy, + tests=all_test_configs, + source_files=source_files, + ) + + def load_unit_tests(self) -> TestSuite: + """Load unit test configurations from distributed .hss files.""" + return self.load_tests_by_strategy("unit") + + def load_integration_tests(self) -> TestSuite: + """Load integration test configurations from distributed .hss files.""" + return self.load_tests_by_strategy("integration") + + def load_script_tests(self) -> TestSuite: + """Load script test configurations from distributed .hss files.""" + return self.load_tests_by_strategy("script") + + def get_enabled_tests(self, test_suite: TestSuite) -> List[TestConfig]: + """Filter and return only enabled tests.""" + return [test for test in test_suite.tests if test.enabled] + + def validate_test_config(self, test_config: TestConfig) -> bool: + """Validate that a test configuration is valid.""" + # Handle absolute vs relative paths + if test_config.path.startswith("/"): + test_path = Path(test_config.path) + else: + if test_config.source_file: + source_dir = Path(test_config.source_file).parent + test_path = source_dir / test_config.path + else: + test_path = ( + self.repository_root + / "src/hydrus-software-stack" + / test_config.path + ) + + if not test_path.exists(): + print(f"Warning: Test file does not exist: {test_path}") + return False + + if test_config.timeout <= 0: + print( + f"Warning: Invalid timeout for test {test_config.name}: {test_config.timeout}" + ) + return False + + return True + + def list_all_hss_files(self) -> Dict[str, List[Path]]: + """List all .hss files organized by directory.""" + hss_files = self.find_hss_files() + organized = {} + + for hss_file in hss_files: + # Get relative path from repository root + rel_path = hss_file.relative_to(self.repository_root) + directory = str(rel_path.parent) + + if directory not in organized: + organized[directory] = [] + organized[directory].append(rel_path) + + return organized + + +# Backward compatibility class +class TestConfigLoader(DistributedTestConfigLoader): + """Legacy class name for backward compatibility.""" + + def __init__(self, config_dir: Path): + # Convert old usage to new usage + # Assume config_dir is in repository, find repository root + repository_root = config_dir + while repository_root.parent != repository_root: + if (repository_root / ".git").exists() or ( + repository_root / "setup.py" + ).exists(): + break + repository_root = repository_root.parent + + super().__init__(repository_root) + + +def get_strategy_class(strategy_name: str): + """Get the strategy class based on the strategy name.""" + from .test_cases import RosTestStrategy, ScriptTestStrategy, UnitTestStrategy + + strategy_map = { + "UnitTestStrategy": UnitTestStrategy, + "RosTestStrategy": RosTestStrategy, + "ScriptTestStrategy": ScriptTestStrategy, + } + + return strategy_map.get(strategy_name, UnitTestStrategy) diff --git a/scripts/commands/test/test_logger.py b/scripts/commands/test/test_logger.py new file mode 100644 index 0000000..dba4b10 --- /dev/null +++ b/scripts/commands/test/test_logger.py @@ -0,0 +1,397 @@ +#!/usr/bin/env python3 +""" +test_logger.py + +Advanced logging system for Hydrus test execution with console output and archive creation. +""" + +import logging +import queue +import subprocess +import threading +import zipfile +from contextlib import contextmanager +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, TextIO + + +class TestLogCapture: + """Captures and manages logs for individual test processes.""" + + def __init__(self, test_name: str, log_dir: Path): + self.test_name = test_name + self.log_dir = log_dir + self.log_file = log_dir / f"{self._sanitize_name(test_name)}.log" + self.start_time: Optional[datetime] = None + self.end_time: Optional[datetime] = None + self.return_code: Optional[int] = None + self.log_queue = queue.Queue() + self.console_output = [] + + # Ensure log directory exists + self.log_dir.mkdir(parents=True, exist_ok=True) + + # Setup logging + self.logger = logging.getLogger(f"test.{test_name}") + self.logger.setLevel(logging.DEBUG) + + # File handler + file_handler = logging.FileHandler(self.log_file, mode="w", encoding="utf-8") + file_formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + file_handler.setFormatter(file_formatter) + self.logger.addHandler(file_handler) + + # Console handler for immediate output + console_handler = logging.StreamHandler() + console_formatter = logging.Formatter( + f"[{test_name}] %(levelname)s: %(message)s" + ) + console_handler.setFormatter(console_formatter) + self.logger.addHandler(console_handler) + + def _sanitize_name(self, name: str) -> str: + """Sanitize test name for use as filename.""" + return "".join(c for c in name if c.isalnum() or c in (" ", "-", "_")).rstrip() + + def start_capture(self): + """Start capturing logs for this test.""" + self.start_time = datetime.now() + self.logger.info(f"Starting test: {self.test_name}") + self.logger.info(f"Start time: {self.start_time.isoformat()}") + + def log_info(self, message: str): + """Log an info message.""" + self.logger.info(message) + self.console_output.append(f"INFO: {message}") + + def log_warning(self, message: str): + """Log a warning message.""" + self.logger.warning(message) + self.console_output.append(f"WARNING: {message}") + + def log_error(self, message: str): + """Log an error message.""" + self.logger.error(message) + self.console_output.append(f"ERROR: {message}") + + def log_debug(self, message: str): + """Log a debug message.""" + self.logger.debug(message) + self.console_output.append(f"DEBUG: {message}") + + def log_command_output(self, line: str): + """Log output from a command.""" + # Remove ANSI color codes for log file + import re + + clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line.strip()) + if clean_line: + self.logger.info(f"STDOUT: {clean_line}") + self.console_output.append(clean_line) + + def log_command_error(self, line: str): + """Log error output from a command.""" + import re + + clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line.strip()) + if clean_line: + self.logger.error(f"STDERR: {clean_line}") + self.console_output.append(f"ERROR: {clean_line}") + + def end_capture(self, return_code: int = 0): + """End capturing logs for this test.""" + self.end_time = datetime.now() + self.return_code = return_code + duration = ( + (self.end_time - self.start_time).total_seconds() if self.start_time else 0 + ) + + self.logger.info(f"Test completed: {self.test_name}") + self.logger.info(f"End time: {self.end_time.isoformat()}") + self.logger.info(f"Duration: {duration:.2f} seconds") + self.logger.info(f"Return code: {return_code}") + + if return_code == 0: + self.logger.info("Test PASSED") + else: + self.logger.error("Test FAILED") + + def get_summary(self) -> Dict: + """Get a summary of this test execution.""" + duration = 0 + if self.start_time and self.end_time: + duration = (self.end_time - self.start_time).total_seconds() + + return { + "test_name": self.test_name, + "start_time": self.start_time.isoformat() if self.start_time else None, + "end_time": self.end_time.isoformat() if self.end_time else None, + "duration": duration, + "return_code": self.return_code, + "status": "PASSED" if self.return_code == 0 else "FAILED", + "log_file": str(self.log_file), + "log_size": self.log_file.stat().st_size if self.log_file.exists() else 0, + } + + +class TestLogManager: + """Manages logging for all test executions and creates archives.""" + + def __init__(self, session_name: Optional[str] = None): + self.session_name = ( + session_name or f"test_session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + ) + self.base_log_dir = Path.cwd() / "test_logs" / self.session_name + self.test_captures: Dict[str, TestLogCapture] = {} + self.session_start_time = datetime.now() + self.session_end_time: Optional[datetime] = None + + # Create session log directory + self.base_log_dir.mkdir(parents=True, exist_ok=True) + + # Setup session logger + self.session_logger = logging.getLogger("test_session") + self.session_logger.setLevel(logging.INFO) + + session_log_file = self.base_log_dir / "session.log" + session_handler = logging.FileHandler( + session_log_file, mode="w", encoding="utf-8" + ) + session_formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + session_handler.setFormatter(session_formatter) + self.session_logger.addHandler(session_handler) + + # Console handler for session + console_handler = logging.StreamHandler() + console_formatter = logging.Formatter("%(levelname)s: %(message)s") + console_handler.setFormatter(console_formatter) + self.session_logger.addHandler(console_handler) + + self.session_logger.info(f"Starting test session: {self.session_name}") + self.session_logger.info(f"Log directory: {self.base_log_dir}") + + def create_test_capture(self, test_name: str) -> TestLogCapture: + """Create a new test log capture.""" + if test_name in self.test_captures: + self.session_logger.warning(f"Test capture already exists for: {test_name}") + + capture = TestLogCapture(test_name, self.base_log_dir) + self.test_captures[test_name] = capture + return capture + + def get_test_capture(self, test_name: str) -> Optional[TestLogCapture]: + """Get an existing test log capture.""" + return self.test_captures.get(test_name) + + @contextmanager + def capture_test(self, test_name: str): + """Context manager for capturing a test's logs.""" + capture = self.create_test_capture(test_name) + capture.start_capture() + try: + yield capture + except Exception as e: + capture.log_error(f"Test execution failed with exception: {str(e)}") + capture.end_capture(return_code=1) + raise + else: + capture.end_capture(return_code=0) + + def end_session(self): + """End the test session.""" + self.session_end_time = datetime.now() + duration = (self.session_end_time - self.session_start_time).total_seconds() + + # Log session summary + total_tests = len(self.test_captures) + passed_tests = sum( + 1 for capture in self.test_captures.values() if capture.return_code == 0 + ) + failed_tests = total_tests - passed_tests + + self.session_logger.info(f"Test session completed: {self.session_name}") + self.session_logger.info(f"Duration: {duration:.2f} seconds") + self.session_logger.info(f"Total tests: {total_tests}") + self.session_logger.info(f"Passed: {passed_tests}") + self.session_logger.info(f"Failed: {failed_tests}") + + # Create summary file + self._create_summary_file() + + return self.get_session_summary() + + def _create_summary_file(self): + """Create a summary file for the test session.""" + summary_file = self.base_log_dir / "summary.json" + import json + + summary = self.get_session_summary() + with open(summary_file, "w") as f: + json.dump(summary, f, indent=2, default=str) + + def get_session_summary(self) -> Dict: + """Get a summary of the entire test session.""" + duration = 0 + if self.session_end_time: + duration = (self.session_end_time - self.session_start_time).total_seconds() + + test_summaries = [ + capture.get_summary() for capture in self.test_captures.values() + ] + + return { + "session_name": self.session_name, + "start_time": self.session_start_time.isoformat(), + "end_time": self.session_end_time.isoformat() + if self.session_end_time + else None, + "duration": duration, + "total_tests": len(self.test_captures), + "passed_tests": sum(1 for s in test_summaries if s["status"] == "PASSED"), + "failed_tests": sum(1 for s in test_summaries if s["status"] == "FAILED"), + "log_directory": str(self.base_log_dir), + "tests": test_summaries, + } + + def create_archive(self, archive_path: Optional[Path] = None) -> Path: + """Create a zip archive of all test logs.""" + if archive_path is None: + archive_path = Path.cwd() / f"{self.session_name}_logs.zip" + + self.session_logger.info(f"Creating log archive: {archive_path}") + + with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zipf: + # Add all files in the log directory + for file_path in self.base_log_dir.rglob("*"): + if file_path.is_file(): + arcname = file_path.relative_to(self.base_log_dir.parent) + zipf.write(file_path, arcname) + + archive_size = archive_path.stat().st_size + self.session_logger.info( + f"Archive created: {archive_path} ({archive_size} bytes)" + ) + + return archive_path + + def print_session_report(self): + """Print a detailed session report to console.""" + summary = self.get_session_summary() + + print("\n" + "=" * 80) + print("TEST SESSION REPORT") + print("=" * 80) + print(f"Session: {summary['session_name']}") + print(f"Duration: {summary['duration']:.2f} seconds") + print(f"Total Tests: {summary['total_tests']}") + print(f"Passed: {summary['passed_tests']}") + print(f"Failed: {summary['failed_tests']}") + print(f"Log Directory: {summary['log_directory']}") + + if summary["tests"]: + print("\nTEST DETAILS:") + print("-" * 80) + for test in summary["tests"]: + status_symbol = "โœ“" if test["status"] == "PASSED" else "โœ—" + print( + f"{status_symbol} {test['test_name']:<40} {test['duration']:>8.2f}s {test['status']}" + ) + + print("=" * 80) + + +class ProcessLogCapture: + """Captures output from a subprocess in real-time.""" + + def __init__(self, test_capture: TestLogCapture): + self.test_capture = test_capture + self.stdout_thread: Optional[threading.Thread] = None + self.stderr_thread: Optional[threading.Thread] = None + + def _stream_reader(self, stream: TextIO, log_func): + """Read from a stream and log each line.""" + try: + for line in iter(stream.readline, ""): + if line: + log_func(line.rstrip()) + except Exception as e: + self.test_capture.log_error(f"Error reading stream: {e}") + + def start_capture(self, process: subprocess.Popen): + """Start capturing output from a process.""" + if process.stdout: + self.stdout_thread = threading.Thread( + target=self._stream_reader, + args=(process.stdout, self.test_capture.log_command_output), + ) + self.stdout_thread.daemon = True + self.stdout_thread.start() + + if process.stderr: + self.stderr_thread = threading.Thread( + target=self._stream_reader, + args=(process.stderr, self.test_capture.log_command_error), + ) + self.stderr_thread.daemon = True + self.stderr_thread.start() + + def wait_for_completion(self, timeout: Optional[float] = None): + """Wait for all capture threads to complete.""" + if self.stdout_thread: + self.stdout_thread.join(timeout) + if self.stderr_thread: + self.stderr_thread.join(timeout) + + +def run_command_with_logging( + command: List[str], + test_capture: TestLogCapture, + timeout: Optional[float] = None, + cwd: Optional[Path] = None, + env: Optional[Dict[str, str]] = None, +) -> int: + """Run a command with comprehensive logging.""" + test_capture.log_info(f"Executing command: {' '.join(command)}") + if cwd: + test_capture.log_info(f"Working directory: {cwd}") + + try: + # Start the process + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + cwd=cwd, + env=env, + bufsize=1, + universal_newlines=True, + ) + + # Start capturing output + log_capture = ProcessLogCapture(test_capture) + log_capture.start_capture(process) + + # Wait for completion + try: + return_code = process.wait(timeout=timeout) + log_capture.wait_for_completion(timeout=5) # Give threads time to finish + + test_capture.log_info(f"Command completed with return code: {return_code}") + return return_code + + except subprocess.TimeoutExpired: + test_capture.log_error(f"Command timed out after {timeout} seconds") + process.kill() + log_capture.wait_for_completion(timeout=5) + return -1 + + except Exception as e: + test_capture.log_error(f"Failed to execute command: {e}") + return -1 diff --git a/scripts/commands/test/test_manager.py b/scripts/commands/test/test_manager.py new file mode 100644 index 0000000..91b875b --- /dev/null +++ b/scripts/commands/test/test_manager.py @@ -0,0 +1,322 @@ +import os +import signal +import subprocess +import sys +import time +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +from ..utils import get_building_path +from .test_cases import TestCase +from .test_logger import TestLogManager, run_command_with_logging + + +class HydrusTestManager: + def __init__(self, volume: bool = False, debug_mode: bool = False): + self.volume = volume + self.debug_mode = debug_mode + self.ros_dir = get_building_path(self.volume) + # Test tracking + self.total_tests = 0 + self.passed_tests = 0 + self.failed_tests = 0 + # Process tracking + self.roscore_pid: Optional[int] = None + + # Initialize test logging system + self.log_manager = TestLogManager() + + # Legacy logging setup for compatibility + self.test_logs_dir = self.ros_dir / "src/hydrus-software-stack/test_logs" + self.test_logs_dir.mkdir(exist_ok=True) + + # Create timestamped log file + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.log_file = self.test_logs_dir / f"test_run_{timestamp}.log" + + def _run_command( + self, + cmd: List[str], + timeout: Optional[int] = None, + check: bool = True, + capture_output: bool = False, + env: Optional[Dict] = None, + cwd: Optional[Path] = None, + ) -> subprocess.CompletedProcess: + """Run a command with optional timeout and error handling""" + if self.debug_mode: + print(f"DEBUG: Running command: {' '.join(cmd)}") + + try: + result = subprocess.run( + cmd, + timeout=timeout, + check=check, + capture_output=capture_output, + text=True, + env=env, + cwd=cwd, + ) + return result + except subprocess.TimeoutExpired as e: + print(f"โฐ Command timed out after {timeout} seconds: {' '.join(cmd)}") + if self.debug_mode: + print(f"DEBUG: Timeout details: {e}") + raise + except subprocess.CalledProcessError as e: + if self.debug_mode: + print(f"DEBUG: Command failed with exit code {e.returncode}") + if e.stdout: + print(f"DEBUG: stdout: {e.stdout}") + if e.stderr: + print(f"DEBUG: stderr: {e.stderr}") + raise + + def _build_workspace(self): + """Build the catkin workspace""" + print("\n๐Ÿ”จ Building catkin workspace...") + try: + bash_cmd = [ + "bash", + "-c", + f"source /opt/ros/noetic/setup.bash && cd {self.ros_dir} && catkin_make", + ] + self._run_command(bash_cmd, timeout=300) + print("โœ… Workspace built successfully") + except subprocess.CalledProcessError: + print("โŒ Failed to build workspace") + sys.exit(1) + except subprocess.TimeoutExpired: + print("โŒ Workspace build timed out") + sys.exit(1) + + def _is_process_running(self, pid: int) -> bool: + """Check if a process is running""" + try: + os.kill(pid, 0) + return True + except OSError: + return False + + def _signal_handler(self, signum, frame): + """Handle interrupt signals gracefully""" + print("\n๐Ÿ›‘ Received interrupt signal. Cleaning up...") + self._cleanup_roscore() + sys.exit(1) + + def _start_roscore(self, env: Dict[str, str]): + """Start roscore in background""" + print("๐Ÿค– Starting roscore...") + try: + # Source ROS environment and start roscore + bash_cmd = [ + "bash", + "-c", + f"source /opt/ros/noetic/setup.bash && source {self.ros_dir}/devel/setup.bash && roscore", + ] + + roscore_process = subprocess.Popen( + bash_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env + ) + self.roscore_pid = roscore_process.pid + + # Give roscore time to start + time.sleep(3) + + if self._is_process_running(self.roscore_pid): + print(f"โœ… roscore started (PID: {self.roscore_pid})") + else: + print("โŒ Failed to start roscore") + self.roscore_pid = None + + except Exception as e: + print(f"โŒ Error starting roscore: {e}") + self.roscore_pid = None + + def _cleanup_roscore(self): + """Clean up roscore process""" + if self.roscore_pid and self._is_process_running(self.roscore_pid): + print(f"๐Ÿงน Cleaning up roscore (PID: {self.roscore_pid})") + try: + os.kill(self.roscore_pid, signal.SIGTERM) + time.sleep(2) + if self._is_process_running(self.roscore_pid): + os.kill(self.roscore_pid, signal.SIGKILL) + print("โœ… roscore cleaned up") + except OSError: + print("โš ๏ธ roscore process already terminated") + finally: + self.roscore_pid = None + + def _run_rostest(self, test_name: str, test_file: str, env: Dict[str, str]): + """Run rostest with proper timeout and error handling""" + print("") + print("----------------------------------------") + print(f"Running rostest: {test_name}") + print("----------------------------------------") + + self.total_tests += 1 + + # Use the new logging system + with self.log_manager.capture_test(test_name) as test_capture: + test_capture.log_info(f"Test file: {test_file}") + test_capture.log_info("Test type: rostest") + + try: + bash_cmd = [ + "bash", + "-c", + f"source /opt/ros/noetic/setup.bash && source {self.ros_dir}/devel/setup.bash && rostest autonomy {test_file}", + ] + + cmd_str = " ".join(bash_cmd) + test_capture.log_info(f"Command: {cmd_str}") + + # Use the new logging system with command execution + return_code = run_command_with_logging( + bash_cmd, test_capture, timeout=600, cwd=self.ros_dir, env=env + ) + + if return_code == 0: + print(f"โœ… PASSED: {test_name}") + self.passed_tests += 1 + else: + print(f"โŒ FAILED: {test_name} (exit code: {return_code})") + self.failed_tests += 1 + + except Exception as e: + test_capture.log_error(f"Unexpected error: {e}") + print(f"โŒ FAILED: {test_name} (error: {e})") + self.failed_tests += 1 + + def _run_unified_test( + self, + test_name: str, + test_path: Path, + args: List[str], + timeout: int, + infinite_loop: bool, + description: str, + env: Dict[str, str], + ): + """Run a unified test using the new logging system""" + print("") + print("----------------------------------------") + print(f"Running test: {test_name}") + print(f"Description: {description}") + print("----------------------------------------") + + self.total_tests += 1 + + # Use the new logging system + with self.log_manager.capture_test(test_name) as test_capture: + test_capture.log_info(f"Test path: {test_path}") + test_capture.log_info(f"Args: {args}") + test_capture.log_info(f"Timeout: {timeout}s") + test_capture.log_info(f"Infinite loop: {infinite_loop}") + + try: + if test_path.suffix == ".py": + # Python test - need to source ROS environment for autonomy package + if any( + ros_keyword in str(test_path) + for ros_keyword in [ + "autonomy", + "mission_planner", + "api_server", + "controllers", + "cv_publishers", + ] + ): + # Run with sourced ROS environment + cmd_script = f"source /opt/ros/noetic/setup.bash && source {self.ros_dir}/devel/setup.bash && python3 {test_path} {' '.join(args)}" + cmd = ["bash", "-c", cmd_script] + else: + # Regular Python test + cmd = ["python3", str(test_path)] + args + else: + # Shell script or other executable + cmd = [str(test_path)] + args + + # Log the command + cmd_str = " ".join(cmd) + test_capture.log_info(f"Command: {cmd_str}") + + # Execute test with timeout unless it's an infinite loop test + test_timeout = timeout if not infinite_loop else timeout + + # Use the new logging system with command execution + return_code = run_command_with_logging( + cmd, test_capture, timeout=test_timeout, cwd=self.ros_dir, env=env + ) + + if return_code == 0: + print(f"โœ… PASSED: {test_name}") + self.passed_tests += 1 + elif return_code == -1 and infinite_loop: + # Timeout is expected for infinite loop tests + print(f"โœ… PASSED: {test_name} (timed out as expected)") + self.passed_tests += 1 + else: + print(f"โŒ FAILED: {test_name} (exit code: {return_code})") + self.failed_tests += 1 + + except FileNotFoundError: + test_capture.log_error(f"Test file not found: {test_path}") + print(f"โŒ FAILED: {test_name} (test file not found: {test_path})") + self.failed_tests += 1 + except Exception as e: + test_capture.log_error(f"Unexpected error: {e}") + print(f"โŒ FAILED: {test_name} (error: {e})") + self.failed_tests += 1 + + def execute_test_cases(self, test_cases: List[TestCase], env: Dict[str, str]): + """Execute a list of test cases using their strategies""" + for test_case in test_cases: + test_case.strategy.execute(test_case, env, self) + + def _print_summary(self) -> int: + """Print test results summary, create log archive, and return exit code""" + print("") + print("==========================================") + print("TEST RESULTS SUMMARY") + print("==========================================") + print(f"Total tests run: {self.total_tests}") + print(f"โœ… Passed: {self.passed_tests}") + print(f"โŒ Failed: {self.failed_tests}") + + # End the logging session + self.log_manager.end_session() + + # Print detailed session report + self.log_manager.print_session_report() + + # Create log archive + try: + archive_path = self.log_manager.create_archive() + print(f"๐Ÿ“ฆ Log archive created: {archive_path}") + + # For CI/CD integration, also create a fixed name archive + ci_archive_path = Path.cwd() / "test_logs_latest.zip" + ci_archive = self.log_manager.create_archive(ci_archive_path) + print(f"๐Ÿ“ฆ CI/CD archive: {ci_archive}") + + except Exception as e: + print(f"โš ๏ธ Warning: Failed to create log archive: {e}") + + # Legacy logging + print(f"๐Ÿ“ Detailed logs available in: {self.log_manager.base_log_dir}") + print(f"๐Ÿ“ Legacy logs available in: {self.test_logs_dir}") + + if self.failed_tests == 0: + print("\n๐ŸŽ‰ All tests passed!") + return 0 + else: + success_rate = ( + (self.passed_tests / self.total_tests) * 100 + if self.total_tests > 0 + else 0 + ) + print(f"\n๐Ÿ“Š Success rate: {success_rate:.1f}%") + return 1 diff --git a/scripts/tests.hss b/scripts/tests.hss new file mode 100644 index 0000000..8285e4a --- /dev/null +++ b/scripts/tests.hss @@ -0,0 +1,36 @@ +# Script Tests Configuration +# Tests for running script nodes indefinitely +name: "Script Tests" +description: "Tests for long-running script nodes" +strategy: "script" + +tests: + - name: "API Server Script" + path: "../autonomy/src/api_server.py" + args: [] + timeout: 2 + infinite_loop: true + description: "API server script test" + strategy: "ScriptTestStrategy" + enabled: true + test_type: "script" + + - name: "Controllers Script" + path: "../autonomy/src/controllers.py" + args: [] + timeout: 2 + infinite_loop: true + description: "Controllers node script test" + strategy: "ScriptTestStrategy" + enabled: true + test_type: "script" + + - name: "CV Publishers Script" + path: "../autonomy/src/cv_publishers.py" + args: [] + timeout: 2 + infinite_loop: true + description: "Computer vision publishers script test" + strategy: "ScriptTestStrategy" + enabled: true + test_type: "script" diff --git a/test_logs/test_session_20250723_042043/session.log b/test_logs/test_session_20250723_042043/session.log new file mode 100644 index 0000000..f6ff660 --- /dev/null +++ b/test_logs/test_session_20250723_042043/session.log @@ -0,0 +1,2 @@ +2025-07-23 04:20:43 - INFO - Starting test session: test_session_20250723_042043 +2025-07-23 04:20:43 - INFO - Log directory: /home/catkin_ws/src/hydrus-software-stack/test_logs/test_session_20250723_042043