diff --git a/.github/workflows/web-server-tests.yml b/.github/workflows/web-server-tests.yml index 2ff37d11..f9bb5695 100644 --- a/.github/workflows/web-server-tests.yml +++ b/.github/workflows/web-server-tests.yml @@ -33,6 +33,9 @@ jobs: restore-keys: | ${{ runner.os }}-pip- + - name: Install system dependencies + run: sudo apt-get update && sudo apt-get install -y libcap-dev + - name: Install dependencies working-directory: Software/web-server run: | diff --git a/Software/web-server/calibration_manager.py b/Software/web-server/calibration_manager.py index 1b224f0d..0a48e2bb 100644 --- a/Software/web-server/calibration_manager.py +++ b/Software/web-server/calibration_manager.py @@ -27,15 +27,17 @@ class CalibrationManager: """Manages calibration processes for PiTrac cameras""" - def __init__(self, config_manager, pitrac_binary: str = "/usr/lib/pitrac/pitrac_lm"): + def __init__(self, config_manager, camera_stream_manager=None, pitrac_binary: str = "/usr/lib/pitrac/pitrac_lm"): """ Initialize calibration manager Args: config_manager: Configuration manager instance + camera_stream_manager: Optional camera stream manager to stop streams before calibration pitrac_binary: Path to pitrac_lm binary """ self.config_manager = config_manager + self.camera_stream_manager = camera_stream_manager self.pitrac_binary = pitrac_binary self.current_processes: Dict[str, asyncio.subprocess.Process] = {} self._process_lock = asyncio.Lock() @@ -309,8 +311,13 @@ async def check_ball_location(self, camera: str = "camera1") -> Dict[str, Any]: camera: Which camera to use ("camera1" or "camera2") Returns: - Dict with status and ball location info + Dict with status, ball location info, and image path for display """ + # Stop any active camera stream before running calibration + if self.camera_stream_manager and self.camera_stream_manager.is_streaming(): + logger.info(f"Stopping camera stream before ball location check for {camera}") + self.camera_stream_manager.stop_stream() + logger.info(f"Starting ball location check for {camera}") self.calibration_status[camera] = { @@ -348,6 +355,13 @@ async def check_ball_location(self, camera: str = "camera1") -> Dict[str, Any]: if camera_gain is None: camera_gain = 6.0 + # Create output filename for the captured image + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + output_file = f"ball_location_{camera}_{timestamp}.png" + images_dir = Path.home() / "LM_Shares" / "Images" + images_dir.mkdir(parents=True, exist_ok=True) + output_path = images_dir / output_file + cmd.extend( [ f"--search_center_x={search_x}", @@ -355,6 +369,7 @@ async def check_ball_location(self, camera: str = "camera1") -> Dict[str, Any]: f"--logging_level={logging_level}", "--artifact_save_level=all", f"--camera_gain={camera_gain}", + f"--output_filename={output_path}", ] ) cmd.extend(self._build_cli_args_from_metadata(camera)) @@ -368,10 +383,19 @@ async def check_ball_location(self, camera: str = "camera1") -> Dict[str, Any]: self.calibration_status[camera]["message"] = "Ball detected" if ball_info else "Ball not found" self.calibration_status[camera]["progress"] = 100 + image_url = None + if output_path.exists(): + image_url = f"/api/images/{output_file}" + logger.info(f"Ball location image saved: {output_path}") + else: + logger.warning(f"Ball location image not found at: {output_path}") + return { "status": "success", "ball_found": bool(ball_info), "ball_info": ball_info, + "image_url": image_url, + "image_path": str(output_path) if output_path.exists() else None, "output": result.get("output", ""), } @@ -392,6 +416,10 @@ async def run_auto_calibration(self, camera: str = "camera1") -> Dict[str, Any]: Dict with calibration results """ + # Stop any active camera stream before running calibration + if self.camera_stream_manager and self.camera_stream_manager.is_streaming(): + logger.info(f"Stopping camera stream before auto calibration for {camera}") + self.camera_stream_manager.stop_stream() generated_config_path = self.config_manager.generate_golf_sim_config() logger.info(f"Generated config file at: {generated_config_path}") @@ -864,6 +892,11 @@ async def run_manual_calibration(self, camera: str = "camera1") -> Dict[str, Any Returns: Dict with calibration results """ + # Stop any active camera stream before running calibration + if self.camera_stream_manager and self.camera_stream_manager.is_streaming(): + logger.info(f"Stopping camera stream before manual calibration for {camera}") + self.camera_stream_manager.stop_stream() + logger.info(f"Starting manual calibration for {camera}") self.calibration_status[camera] = { diff --git a/Software/web-server/camera_stream_manager.py b/Software/web-server/camera_stream_manager.py new file mode 100644 index 00000000..255f1ef0 --- /dev/null +++ b/Software/web-server/camera_stream_manager.py @@ -0,0 +1,212 @@ +""" +Camera Stream Manager for PiTrac Web Server + +Manages live camera preview streams using picamera2 for calibration workflow. +Only one camera stream can be active at a time to prevent resource conflicts. +""" + +import io +import logging +import os +from threading import Condition +from typing import Dict, Optional, Generator + +# Conditional import based on test environment +if os.environ.get("TESTING") == "true": + from tests.utils.mock_picamera import MockPicamera2 as Picamera2 + from tests.utils.mock_picamera import MockJpegEncoder as JpegEncoder + from tests.utils.mock_picamera import MockFileOutput as FileOutput +else: + from picamera2 import Picamera2 + from picamera2.encoders import JpegEncoder + from picamera2.outputs import FileOutput + +logger = logging.getLogger(__name__) + + +class StreamingOutput(io.BufferedIOBase): + """Buffer for MJPEG frames with thread-safe access""" + + def __init__(self): + self.frame = None + self.condition = Condition() + + def write(self, buf): + """Called by picamera2 encoder with each new frame""" + with self.condition: + self.frame = buf + self.condition.notify_all() + + +class CameraStreamManager: + """Manages camera streaming for live preview during calibration + + Only allows one camera stream at a time to prevent resource conflicts. + Automatically stops streams when calibration starts or page navigation occurs. + """ + + def __init__(self, config_manager): + """Initialize camera stream manager + + Args: + config_manager: Configuration manager instance for camera settings + """ + self.config_manager = config_manager + self.active_camera: Optional[str] = None + self.picam2: Optional[Picamera2] = None + self.output: Optional[StreamingOutput] = None + + def start_stream(self, camera: str) -> Dict[str, str]: + """Start streaming for specified camera + + Args: + camera: Camera identifier ("camera1" or "camera2") + + Returns: + Dict with status and camera ID + + Raises: + ValueError: If camera ID is invalid + RuntimeError: If camera cannot be initialized + """ + if camera not in ["camera1", "camera2"]: + raise ValueError(f"Invalid camera ID: {camera}") + + # Stop any existing stream first (only one at a time) + if self.active_camera: + logger.info(f"Stopping existing stream for {self.active_camera} before starting {camera}") + self.stop_stream() + + try: + # Map camera to picamera2 index + # Camera1 is typically index 0, Camera2 is index 1 + camera_index = 0 if camera == "camera1" else 1 + + logger.info(f"Starting stream for {camera} (picamera2 index {camera_index})") + + # Initialize picamera2 + self.picam2 = Picamera2(camera_index) + + # Configure for 640x480 streaming (good balance of quality/performance) + config = self.picam2.create_video_configuration( + main={"size": (640, 480), "format": "RGB888"} + ) + self.picam2.configure(config) + + # Create streaming output buffer + self.output = StreamingOutput() + + # Start recording JPEG frames to the output buffer + self.picam2.start_recording(JpegEncoder(), FileOutput(self.output)) + + self.active_camera = camera + logger.info(f"Successfully started stream for {camera}") + + return {"status": "started", "camera": camera} + + except Exception as e: + logger.error(f"Failed to start stream for {camera}: {e}", exc_info=True) + # Cleanup on failure + if self.picam2: + try: + self.picam2.close() + except Exception: + pass + self.picam2 = None + self.output = None + self.active_camera = None + raise RuntimeError(f"Failed to start camera stream: {e}") + + def stop_stream(self) -> Dict[str, str]: + """Stop the active camera stream + + Returns: + Dict with status and which camera was stopped + """ + if not self.active_camera: + return {"status": "no_stream_active"} + + camera = self.active_camera + logger.info(f"Stopping stream for {camera}") + + try: + if self.picam2: + self.picam2.stop_recording() + self.picam2.close() + self.picam2 = None + + self.output = None + self.active_camera = None + + logger.info(f"Successfully stopped stream for {camera}") + return {"status": "stopped", "camera": camera} + + except Exception as e: + logger.error(f"Error stopping stream for {camera}: {e}", exc_info=True) + # Force cleanup even on error + self.picam2 = None + self.output = None + self.active_camera = None + return {"status": "error", "camera": camera, "message": str(e)} + + def generate_frames(self) -> Generator[bytes, None, None]: + """Generate MJPEG frames for streaming + + Yields: + MJPEG frame boundaries with JPEG data + + Raises: + RuntimeError: If no stream is active + """ + if not self.active_camera or not self.output: + raise RuntimeError("No active camera stream") + + try: + logger.debug(f"Starting frame generation for {self.active_camera}") + while True: + # Wait for new frame from camera + with self.output.condition: + self.output.condition.wait() + frame = self.output.frame + + # Yield MJPEG formatted frame + yield ( + b'--FRAME\r\n' + b'Content-Type: image/jpeg\r\n' + b'Content-Length: ' + str(len(frame)).encode() + b'\r\n' + b'\r\n' + frame + b'\r\n' + ) + + except GeneratorExit: + # Client disconnected - this is normal + logger.debug(f"Client disconnected from {self.active_camera} stream") + pass + except Exception as e: + logger.error(f"Error generating frames: {e}", exc_info=True) + raise + + def is_streaming(self, camera: Optional[str] = None) -> bool: + """Check if a stream is active + + Args: + camera: Optional specific camera to check. If None, checks any stream. + + Returns: + True if specified camera (or any camera) is streaming + """ + if camera: + return self.active_camera == camera + return self.active_camera is not None + + def get_active_camera(self) -> Optional[str]: + """Get the currently active camera stream + + Returns: + Camera ID if streaming, None otherwise + """ + return self.active_camera + + def cleanup(self): + """Cleanup all resources - call on shutdown""" + logger.info("Cleaning up camera stream manager") + self.stop_stream() diff --git a/Software/web-server/requirements.txt b/Software/web-server/requirements.txt index bea8e344..83c77815 100644 --- a/Software/web-server/requirements.txt +++ b/Software/web-server/requirements.txt @@ -8,3 +8,4 @@ pillow==11.3.0 pyyaml==6.0.3 aiofiles==25.1.0 websockets==15.0.1 +picamera2>=0.3.12 diff --git a/Software/web-server/server.py b/Software/web-server/server.py index 43cdf7dc..c7b89c9a 100644 --- a/Software/web-server/server.py +++ b/Software/web-server/server.py @@ -10,12 +10,13 @@ import stomp import yaml from fastapi import FastAPI, File, Request, UploadFile, WebSocket, WebSocketDisconnect -from fastapi.responses import FileResponse, HTMLResponse, Response +from fastapi.responses import FileResponse, HTMLResponse, Response, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from calibration_manager import CalibrationManager from camera_detector import CameraDetector +from camera_stream_manager import CameraStreamManager from config_manager import ConfigurationManager from constants import ( CONFIG_FILE, @@ -44,7 +45,8 @@ def __init__(self): self.parser = ShotDataParser() self.config_manager = ConfigurationManager() self.pitrac_manager = PiTracProcessManager(self.config_manager) - self.calibration_manager = CalibrationManager(self.config_manager) + self.camera_stream_manager = CameraStreamManager(self.config_manager) + self.calibration_manager = CalibrationManager(self.config_manager, self.camera_stream_manager) self.testing_manager = TestingToolsManager(self.config_manager) self.mq_conn: Optional[stomp.Connection] = None self.listener: Optional[ActiveMQListener] = None @@ -360,6 +362,102 @@ async def stop_calibration() -> Dict[str, Any]: """Stop any running calibration process""" return await self.calibration_manager.stop_calibration() + @self.app.post("/api/camera/{camera_id}/stream/start") + async def start_camera_stream(camera_id: str) -> Dict[str, Any]: + """Start live camera preview for calibration + + Args: + camera_id: Camera identifier (camera1 or camera2) + + Returns: + Dict with status and camera ID + """ + if camera_id not in ["camera1", "camera2"]: + return {"status": "error", "message": "Invalid camera ID"} + + # Stop pitrac_lm if running (can't access cameras while it's running) + if self.pitrac_manager.is_running(): + logger.info(f"Stopping pitrac_lm before starting camera {camera_id} preview") + stop_result = await self.pitrac_manager.stop() + if stop_result.get("status") == "error": + return { + "status": "error", + "message": f"Failed to stop pitrac_lm: {stop_result.get('message')}", + } + + # Stop any active calibration processes + if self.calibration_manager.current_processes: + logger.info(f"Stopping calibration processes before starting camera {camera_id} preview") + await self.calibration_manager.stop_calibration() + + # Start the camera stream + try: + result = self.camera_stream_manager.start_stream(camera_id) + return result + except Exception as e: + logger.error(f"Failed to start camera stream: {e}", exc_info=True) + return {"status": "error", "message": str(e)} + + @self.app.post("/api/camera/{camera_id}/stream/stop") + async def stop_camera_stream(camera_id: str) -> Dict[str, Any]: + """Stop live camera preview + + Args: + camera_id: Camera identifier (camera1 or camera2) + + Returns: + Dict with status + """ + try: + result = self.camera_stream_manager.stop_stream() + return result + except Exception as e: + logger.error(f"Failed to stop camera stream: {e}", exc_info=True) + return {"status": "error", "message": str(e)} + + @self.app.get("/api/camera/{camera_id}/stream") + async def stream_camera_feed(camera_id: str) -> StreamingResponse: + """MJPEG stream endpoint for live camera preview + + Args: + camera_id: Camera identifier (camera1 or camera2) + + Returns: + StreamingResponse with MJPEG video stream + """ + if camera_id not in ["camera1", "camera2"]: + return Response(status_code=400, content="Invalid camera ID") + + if not self.camera_stream_manager.is_streaming(camera_id): + return Response(status_code=404, content=f"Camera {camera_id} stream not started") + + try: + return StreamingResponse( + self.camera_stream_manager.generate_frames(), + media_type="multipart/x-mixed-replace; boundary=FRAME", + headers={ + "Cache-Control": "no-cache, private", + "Pragma": "no-cache", + "Age": "0", + "X-Accel-Buffering": "no", # Disable nginx buffering if behind proxy + }, + ) + except Exception as e: + logger.error(f"Error streaming camera feed: {e}", exc_info=True) + return Response(status_code=500, content=str(e)) + + @self.app.get("/api/camera/status") + async def get_camera_stream_status() -> Dict[str, Any]: + """Get current camera stream status + + Returns: + Dict with active camera and streaming status + """ + return { + "is_streaming": self.camera_stream_manager.is_streaming(), + "active_camera": self.camera_stream_manager.get_active_camera(), + } + @self.app.get("/testing", response_class=HTMLResponse) async def testing_page(request: Request) -> Response: """Serve testing tools UI page""" @@ -839,6 +937,13 @@ async def shutdown_event(self) -> None: except Exception as e: logger.error(f"Error disconnecting from ActiveMQ: {e}") + # Stop any active camera streams + try: + self.camera_stream_manager.cleanup() + logger.info("Camera stream manager cleaned up") + except Exception as e: + logger.error(f"Error cleaning up camera stream manager: {e}") + for ws in self.connection_manager.connections: try: await ws.close() diff --git a/Software/web-server/static/js/calibration.js b/Software/web-server/static/js/calibration.js index ec23052f..7272e03a 100644 --- a/Software/web-server/static/js/calibration.js +++ b/Software/web-server/static/js/calibration.js @@ -15,6 +15,7 @@ class CalibrationManager { camera2: false }; this.calibrationResults = {}; // Store results from calibration API + this.activeStream = null; // Track which camera stream is active this.init(); this.setupPageCleanup(); @@ -46,7 +47,7 @@ class CalibrationManager { /** * Cleanup all intervals and resources */ - cleanup() { + async cleanup() { if (this.statusPollInterval) { clearInterval(this.statusPollInterval); this.statusPollInterval = null; @@ -56,6 +57,17 @@ class CalibrationManager { clearInterval(intervalId); }); this.cameraPollIntervals.clear(); + + // Stop any active camera streams + if (this.activeStream) { + try { + await fetch(`/api/camera/${this.activeStream}/stream/stop`, { + method: 'POST' + }); + } catch (error) { + console.error('Error stopping camera stream during cleanup:', error); + } + } } setupEventListeners() { @@ -214,6 +226,11 @@ class CalibrationManager { return; } + // Stop any active camera stream before calibrating + if (this.activeStream) { + await this.toggleLivePreview(this.activeStream, null); + } + const button = event?.target || event?.currentTarget; const originalText = button?.textContent || 'Calibrate'; @@ -274,7 +291,7 @@ class CalibrationManager { } const button = event?.target || event?.currentTarget; - const originalText = button?.textContent || 'Check Ball Location'; + const originalText = button?.textContent || 'Verify Ball Placement'; try { if (button) { @@ -290,6 +307,17 @@ class CalibrationManager { const result = await response.json(); const statusDiv = document.getElementById(`${camera}-ball-status`); + if (result.image_url) { + const img = document.getElementById(`${camera}-image`); + img.src = result.image_url + '?t=' + Date.now(); // Cache bust + img.style.display = 'block'; + + const placeholder = img.parentElement.querySelector('.camera-placeholder'); + if (placeholder) { + placeholder.style.display = 'none'; + } + } + if (result.ball_found) { statusDiv.className = 'ball-status success'; statusDiv.textContent = `Ball detected at position (${result.ball_info?.x || 0}, ${result.ball_info?.y || 0})`; @@ -321,6 +349,115 @@ class CalibrationManager { } } + /** + * Toggle live camera preview on/off + * @param {string} camera - Camera identifier (camera1 or camera2) + * @param {Event} event - The click event from the button (optional) + */ + async toggleLivePreview(camera, event) { + if (!this.validateCameraName(camera)) { + this.showMessage(`Invalid camera name: ${camera}`, 'error'); + return; + } + + event?.preventDefault(); + + const btn = document.getElementById(`${camera}-preview-btn`); + const calibrateBtn = document.getElementById(`${camera}-calibrate-btn`); + const img = document.getElementById(`${camera}-stream`); + const placeholder = document.getElementById(`${camera}-placeholder`); + + // If this camera is currently streaming, stop it + if (this.activeStream === camera) { + try { + btn.disabled = true; + btn.textContent = 'Stopping...'; + + const response = await fetch(`/api/camera/${camera}/stream/stop`, { + method: 'POST' + }); + + if (response.ok) { + // Hide stream, show placeholder + img.style.display = 'none'; + img.src = ''; + placeholder.style.display = 'block'; + + // Update button + btn.textContent = 'Start Live Preview'; + btn.className = 'btn btn-primary'; + + // Re-enable calibration button + if (calibrateBtn) { + calibrateBtn.disabled = false; + } + + this.activeStream = null; + + // Clear ball status + const statusDiv = document.getElementById(`${camera}-ball-status`); + if (statusDiv) { + statusDiv.className = 'ball-status'; + statusDiv.textContent = ''; + } + } else { + const result = await response.json(); + this.showMessage(`Failed to stop preview: ${result.message || 'Unknown error'}`, 'error'); + } + } catch (error) { + console.error('Error stopping camera preview:', error); + this.showMessage('Error stopping camera preview', 'error'); + } finally { + btn.disabled = false; + } + } + // Start streaming + else { + try { + btn.disabled = true; + btn.textContent = 'Starting...'; + + const response = await fetch(`/api/camera/${camera}/stream/start`, { + method: 'POST' + }); + + if (response.ok) { + const result = await response.json(); + + // Set stream URL with cache-busting parameter + img.src = `/api/camera/${camera}/stream?t=${Date.now()}`; + img.style.display = 'block'; + placeholder.style.display = 'none'; + + // Update button + btn.textContent = 'Stop Live Preview'; + btn.className = 'btn btn-danger'; + + // Disable calibration button while preview is active + if (calibrateBtn) { + calibrateBtn.disabled = true; + } + + this.activeStream = camera; + + const statusDiv = document.getElementById(`${camera}-ball-status`); + if (statusDiv) { + statusDiv.className = 'ball-status info'; + statusDiv.textContent = 'Live preview active. Position ball and click "Stop Live Preview" before calibrating.'; + } + } else { + const result = await response.json(); + this.showMessage(`Failed to start preview: ${result.message || 'Unknown error'}`, 'error'); + } + } catch (error) { + console.error('Error starting camera preview:', error); + this.showMessage('Error starting camera preview', 'error'); + } finally { + btn.disabled = false; + } + } + } + selectMethod(method) { this.calibrationMethod = method; diff --git a/Software/web-server/templates/calibration.html b/Software/web-server/templates/calibration.html index 71c9a37c..1d264065 100644 --- a/Software/web-server/templates/calibration.html +++ b/Software/web-server/templates/calibration.html @@ -94,17 +94,17 @@

Verify Ball Placement

Camera 1 - Tee Camera (Top/Angled or Straight)

- -
-

Click "Calibrate Camera 1" to verify setup

+ +
+

Click "Start Live Preview" to view camera feed

- -
@@ -113,17 +113,17 @@

Camera 1 - Tee Camera (Top/Angled or Straight)

Camera 2 - Flight Camera (Bottom/Straight)

- -
-

Click "Calibrate Camera 2" to verify setup

+ +
+

Click "Start Live Preview" to view camera feed

- -
@@ -212,7 +212,7 @@

Process Log:

Calibration Complete

-
🎉
+

Calibration Results:

diff --git a/Software/web-server/tests/__init__.py b/Software/web-server/tests/__init__.py new file mode 100644 index 00000000..501aaf49 --- /dev/null +++ b/Software/web-server/tests/__init__.py @@ -0,0 +1 @@ +"""PiTrac web-server test package""" diff --git a/Software/web-server/tests/conftest.py b/Software/web-server/tests/conftest.py index 4bd20cdb..c8e7ae95 100644 --- a/Software/web-server/tests/conftest.py +++ b/Software/web-server/tests/conftest.py @@ -17,11 +17,11 @@ from parsers import ShotDataParser from server import PiTracServer -from utils.mock_factories import ( +from .utils.mock_factories import ( MockActiveMQFactory, MockWebSocketFactory, ) -from utils.test_helpers import ShotDataHelper +from .utils.test_helpers import ShotDataHelper @pytest.fixture @@ -94,7 +94,7 @@ def shot_data_instance(): @pytest.fixture def mock_home_dir(tmp_path): """Mock home directory for testing with simplified structure""" - from utils.test_helpers import ConfigTestHelper + from .utils.test_helpers import ConfigTestHelper home = ConfigTestHelper.create_temp_config_dir() diff --git a/Software/web-server/tests/test_calibration_manager.py b/Software/web-server/tests/test_calibration_manager.py index 0969e27d..9a63d925 100644 --- a/Software/web-server/tests/test_calibration_manager.py +++ b/Software/web-server/tests/test_calibration_manager.py @@ -40,7 +40,7 @@ def test_init_with_custom_binary(self): mock_config_manager.register_callback = Mock() custom_binary = "/custom/path/pitrac_lm" - manager = CalibrationManager(mock_config_manager, custom_binary) + manager = CalibrationManager(mock_config_manager, pitrac_binary=custom_binary) assert manager.pitrac_binary == custom_binary @@ -206,7 +206,7 @@ def mock_config_manager(self): def test_ball_location_command_single_mode(self, mock_config_manager): """Test command building for ball location in single mode""" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") config = mock_config_manager.get_config() system_mode = config.get("system", {}).get("mode", "single") @@ -233,7 +233,7 @@ def test_ball_location_command_dual_mode(self): } mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") config = mock_config_manager.get_config() system_mode = config.get("system", {}).get("mode", "single") @@ -362,7 +362,7 @@ async def test_capture_still_image_success(self): mock_config_manager.get_config.return_value = {"system": {"mode": "single"}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -387,7 +387,7 @@ async def test_capture_still_image_file_not_created(self): mock_config_manager.get_config.return_value = {"system": {"mode": "single"}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -410,7 +410,7 @@ async def test_capture_still_image_process_error(self): mock_config_manager.get_config.return_value = {"system": {"mode": "single"}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_subprocess.side_effect = Exception("Camera not found") @@ -448,7 +448,7 @@ async def test_log_file_creation_during_calibration(self): mock_config_manager.get_config.return_value = {"system": {"mode": "single"}, "calibration": {}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -493,7 +493,7 @@ async def test_check_ball_location_timeout(self): mock_config_manager.get_config.return_value = {"system": {"mode": "single"}, "calibration": {}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -517,7 +517,7 @@ async def test_calibration_process_failure(self): mock_config_manager.get_config.return_value = {"system": {"mode": "single"}, "calibration": {}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -634,7 +634,7 @@ async def test_auto_calibration_success_workflow(self): mock_config_manager.generated_config_path = "/tmp/test_config.yaml" mock_config_manager.reload = Mock() - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -680,7 +680,7 @@ async def test_manual_calibration_success_workflow(self): mock_config_manager.generated_config_path = "/tmp/test_config.yaml" mock_config_manager.reload = Mock() - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -710,7 +710,7 @@ async def test_calibration_failure_no_results(self): mock_config_manager.get_config.return_value = {"system": {"mode": "single"}, "calibration": {}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -753,7 +753,7 @@ async def test_ball_location_detection_success(self): } mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -778,7 +778,7 @@ async def test_still_image_capture_with_dual_mode(self): mock_config_manager.get_config.return_value = {"system": {"mode": "dual"}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -813,7 +813,7 @@ async def test_full_calibration_workflow_success(self): mock_config_manager.generated_config_path = "/tmp/test_config.yaml" mock_config_manager.reload = Mock() - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() @@ -860,7 +860,7 @@ async def test_calibration_workflow_with_failures(self): mock_config_manager.get_config.return_value = {"system": {"mode": "single"}, "calibration": {}} mock_config_manager.generated_config_path = "/tmp/test_config.yaml" - manager = CalibrationManager(mock_config_manager, "/test/pitrac_lm") + manager = CalibrationManager(mock_config_manager, pitrac_binary="/test/pitrac_lm") with patch("calibration_manager.asyncio.create_subprocess_exec") as mock_subprocess: mock_process = AsyncMock() diff --git a/Software/web-server/tests/test_config_api.py b/Software/web-server/tests/test_config_api.py index bd54d6f2..8be952e6 100644 --- a/Software/web-server/tests/test_config_api.py +++ b/Software/web-server/tests/test_config_api.py @@ -2,7 +2,7 @@ import pytest -from utils.mock_factories import MockConfigManagerFactory +from .utils.mock_factories import MockConfigManagerFactory @pytest.mark.unit diff --git a/Software/web-server/tests/utils/mock_picamera.py b/Software/web-server/tests/utils/mock_picamera.py new file mode 100644 index 00000000..311d4833 --- /dev/null +++ b/Software/web-server/tests/utils/mock_picamera.py @@ -0,0 +1,108 @@ +"""Mock picamera2 classes for testing without Pi hardware + +Provides mock implementations of picamera2 classes to enable testing +in non-Pi environments (WSL, CI, Mac, etc.) where the actual picamera2 +library cannot be installed. +""" +import logging + +logger = logging.getLogger(__name__) + + +class MockPicamera2: + """Mock for Picamera2 camera interface + + Provides the interface used by CameraStreamManager without requiring + actual Raspberry Pi camera hardware. + """ + + def __init__(self, camera_num=0): + """Initialize mock camera + + Args: + camera_num: Camera index (0 or 1) + """ + self.camera_num = camera_num + self.configuration = None + self.is_recording = False + self.encoder = None + self.output = None + logger.debug(f"MockPicamera2 initialized for camera {camera_num}") + + def create_video_configuration(self, main=None): + """Mock video configuration creation + + Args: + main: Main stream configuration dict + + Returns: + Configuration dict + """ + config = {"main": main or {"size": (640, 480), "format": "RGB888"}} + logger.debug(f"Created mock video config: {config}") + return config + + def configure(self, config): + """Mock configure camera + + Args: + config: Configuration dict to apply + """ + self.configuration = config + logger.debug(f"Configured mock camera with: {config}") + + def start_recording(self, encoder, output): + """Mock start recording + + Args: + encoder: Video encoder instance + output: Output destination + """ + self.encoder = encoder + self.output = output + self.is_recording = True + logger.debug(f"Mock camera {self.camera_num} started recording") + + def stop_recording(self): + """Mock stop recording""" + self.is_recording = False + logger.debug(f"Mock camera {self.camera_num} stopped recording") + + def close(self): + """Mock close camera""" + self.is_recording = False + self.encoder = None + self.output = None + logger.debug(f"Mock camera {self.camera_num} closed") + + +class MockJpegEncoder: + """Mock for JpegEncoder + + Provides the interface for JPEG video encoding without actual encoding. + """ + + def __init__(self, quality=85): + """Initialize mock encoder + + Args: + quality: JPEG quality (0-100) + """ + self.quality = quality + logger.debug(f"MockJpegEncoder created with quality {quality}") + + +class MockFileOutput: + """Mock for FileOutput that wraps a buffer + + Provides the interface for file output without actual file operations. + """ + + def __init__(self, output): + """Initialize mock file output + + Args: + output: Output buffer to wrap + """ + self.output = output + logger.debug("MockFileOutput created")