diff --git a/Software/web-server/config_manager.py b/Software/web-server/config_manager.py index f6520d19..e3fd9ec2 100644 --- a/Software/web-server/config_manager.py +++ b/Software/web-server/config_manager.py @@ -829,6 +829,7 @@ def _is_calibration_field(self, key: str) -> bool: "calibration.", "kAutoCalibration", "_ENCLOSURE_", + "kDAC_setting", ] return any(pattern in key for pattern in calibration_patterns) diff --git a/Software/web-server/configurations.json b/Software/web-server/configurations.json index efd8c8ba..29d64250 100644 --- a/Software/web-server/configurations.json +++ b/Software/web-server/configurations.json @@ -266,7 +266,7 @@ }, "gs_config.strobing.kConnectionBoardVersion": { "category": "System", - "subcategory": "advanced", + "subcategory": "basic", "basicSubcategory": "System", "displayName": "Connection Board Version", "description": "The original Ver. 1.0 board inverts the external shutter signal, so this setting tells the system whether to pre-invert the signal or not.", @@ -281,9 +281,20 @@ "passedVia": "json", "passedTo": "both" }, + "gs_config.strobing.kDAC_setting": { + "category": "Strobing", + "subcategory": "advanced", + "displayName": "Strobe DAC Setting", + "description": "Calibrated DAC value for strobe LED current control. Set by the strobe calibration tool.", + "type": "number", + "min": 0, + "max": 255, + "default": null, + "internal": true + }, "gs_config.system.kEnclosureVersion": { "category": "System", - "subcategory": "advanced", + "subcategory": "basic", "basicSubcategory": "System", "displayName": "Enclosure Version", "description": "The enclosure version (type) can affect various aspects of the system. For example, the distance of the reference ball in the calibration rig, as different enclosures position the cameras at different locations.", diff --git a/Software/web-server/requirements.txt b/Software/web-server/requirements.txt index bea8e344..cf32276e 100644 --- a/Software/web-server/requirements.txt +++ b/Software/web-server/requirements.txt @@ -8,3 +8,5 @@ pillow==11.3.0 pyyaml==6.0.3 aiofiles==25.1.0 websockets==15.0.1 +spidev>=3.6 +gpiozero>=2.0 diff --git a/Software/web-server/server.py b/Software/web-server/server.py index 43cdf7dc..b284b443 100644 --- a/Software/web-server/server.py +++ b/Software/web-server/server.py @@ -29,6 +29,7 @@ from managers import ConnectionManager, ShotDataStore from parsers import ShotDataParser from pitrac_manager import PiTracProcessManager +from strobe_calibration_manager import StrobeCalibrationManager from testing_tools_manager import TestingToolsManager logger = logging.getLogger(__name__) @@ -46,6 +47,7 @@ def __init__(self): self.pitrac_manager = PiTracProcessManager(self.config_manager) self.calibration_manager = CalibrationManager(self.config_manager) self.testing_manager = TestingToolsManager(self.config_manager) + self.strobe_calibration_manager = StrobeCalibrationManager(self.config_manager) self.mq_conn: Optional[stomp.Connection] = None self.listener: Optional[ActiveMQListener] = None self.reconnect_task: Optional[asyncio.Task] = None @@ -436,6 +438,62 @@ async def upload_test_image(file: UploadFile = File(...)) -> Dict[str, Any]: logger.error(f"Error uploading test image: {e}") return {"status": "error", "message": str(e)} + # Strobe calibration endpoints + @self.app.get("/api/strobe-calibration/status") + async def strobe_calibration_status() -> Dict[str, Any]: + """Get current strobe calibration status""" + return self.strobe_calibration_manager.get_status() + + @self.app.get("/api/strobe-calibration/settings") + async def strobe_calibration_settings() -> Dict[str, Any]: + """Get saved strobe calibration settings""" + return await self.strobe_calibration_manager.get_saved_settings() + + @self.app.post("/api/strobe-calibration/start") + async def start_strobe_calibration(request: Request) -> Dict[str, Any]: + """Start strobe calibration as a background task""" + body = await request.json() + led_type = body.get("led_type", "v3") + target_current = body.get("target_current") + overwrite = body.get("overwrite", False) + + task = asyncio.create_task( + self.strobe_calibration_manager.start_calibration( + led_type=led_type, + target_current=target_current, + overwrite=overwrite, + ) + ) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + + return {"status": "started"} + + @self.app.post("/api/strobe-calibration/cancel") + async def cancel_strobe_calibration() -> Dict[str, Any]: + """Cancel a running strobe calibration""" + self.strobe_calibration_manager.cancel() + return {"status": "ok"} + + @self.app.get("/api/strobe-calibration/diagnostics") + async def strobe_calibration_diagnostics() -> Dict[str, Any]: + """Read strobe hardware diagnostics""" + return await self.strobe_calibration_manager.read_diagnostics() + + @self.app.post("/api/strobe-calibration/set-dac") + async def strobe_set_dac(request: Request) -> Dict[str, Any]: + """Manually set the DAC value""" + body = await request.json() + value = body.get("value") + if value is None or not isinstance(value, int): + return {"status": "error", "message": "Integer 'value' is required"} + return await self.strobe_calibration_manager.set_dac_manual(value) + + @self.app.get("/api/strobe-calibration/dac-start") + async def strobe_dac_start() -> Dict[str, Any]: + """Get the safe DAC starting value""" + return await self.strobe_calibration_manager.get_dac_start() + @self.app.get("/api/cameras/detect") async def detect_cameras() -> Dict[str, Any]: """Auto-detect connected cameras""" diff --git a/Software/web-server/static/css/calibration.css b/Software/web-server/static/css/calibration.css index 7b0f31a6..53916be9 100644 --- a/Software/web-server/static/css/calibration.css +++ b/Software/web-server/static/css/calibration.css @@ -582,29 +582,121 @@ color: var(--error); } +/* Strobe Calibration */ +.strobe-calibration { + background: var(--bg-card); + border-radius: 12px; + padding: 2rem; + margin-bottom: 2rem; + box-shadow: var(--shadow-md); +} + +.strobe-header { + margin-bottom: 1.5rem; +} + +.strobe-header h3 { + color: var(--text-primary); + margin-bottom: 1rem; +} + +.strobe-status-bar { + display: flex; + gap: 2rem; + background: var(--bg-hover); + padding: 0.75rem 1rem; + border-radius: 6px; + border: 1px solid var(--border-color); +} + +.strobe-controls { + margin-bottom: 1.5rem; +} + +.strobe-control-row { + display: flex; + align-items: center; + gap: 1rem; + margin-bottom: 1rem; +} + +.strobe-label { + color: var(--text-secondary); + font-weight: 600; + min-width: 80px; +} + +.strobe-select { + padding: 0.5rem 1rem; + border: 1px solid var(--border-color); + border-radius: 6px; + background: var(--bg-hover); + color: var(--text-primary); + font-size: 0.95rem; + cursor: pointer; +} + +.strobe-calibration .progress-bar { + margin-bottom: 0.5rem; +} + +.strobe-calibration h4 { + color: var(--text-primary); + margin-bottom: 1rem; +} + +#strobe-result-area .result-card { + border-width: 2px; + border-style: solid; + border-color: var(--border-color); + transition: border-color 0.3s ease; +} + +#strobe-diagnostics-area { + margin-top: 1.5rem; +} + +#strobe-diagnostics-area h4 { + color: var(--text-primary); + margin-bottom: 1rem; +} + +#strobe-diagnostics-warning { + margin-top: 1rem; +} + /* Mobile Responsive */ @media (max-width: 768px) { .main-content { padding: 1rem; } - + .calibration-wizard { padding: 1rem; } - + .wizard-steps { margin-bottom: 2rem; } - + .step-label { font-size: 0.8rem; } - + .camera-options { grid-template-columns: 1fr; } - + .result-cards { grid-template-columns: 1fr; } + + .strobe-status-bar { + flex-direction: column; + gap: 0.5rem; + } + + .strobe-control-row { + flex-wrap: wrap; + } } \ No newline at end of file diff --git a/Software/web-server/static/js/calibration.js b/Software/web-server/static/js/calibration.js index ec23052f..be282f21 100644 --- a/Software/web-server/static/js/calibration.js +++ b/Software/web-server/static/js/calibration.js @@ -14,7 +14,8 @@ class CalibrationManager { camera1: false, camera2: false }; - this.calibrationResults = {}; // Store results from calibration API + this.calibrationResults = {}; + this.strobePollingTimer = null; this.init(); this.setupPageCleanup(); @@ -25,6 +26,8 @@ class CalibrationManager { await this.loadCalibrationData(); + await this.loadStrobeSettings(); + this.setupEventListeners(); this.startStatusPolling(); @@ -52,6 +55,11 @@ class CalibrationManager { this.statusPollInterval = null; } + if (this.strobePollingTimer) { + clearTimeout(this.strobePollingTimer); + this.strobePollingTimer = null; + } + this.cameraPollIntervals.forEach((intervalId) => { clearInterval(intervalId); }); @@ -634,6 +642,265 @@ class CalibrationManager { } }, 5000); } + + // -- Strobe Calibration -- + + async loadStrobeSettings() { + try { + const [settingsRes, configRes] = await Promise.all([ + fetch('/api/strobe-calibration/settings'), + fetch('/api/config?key=gs_config.strobing.kConnectionBoardVersion') + ]); + + const calBtn = document.getElementById('strobe-calibrate-btn'); + const diagBtn = document.getElementById('strobe-diagnostics-btn'); + const controls = document.getElementById('strobe-controls'); + const warning = document.getElementById('strobe-board-warning'); + + let boardVersion = null; + if (configRes.ok) { + const config = await configRes.json(); + boardVersion = config.data; + } + + const boardEl = document.getElementById('strobe-board-version'); + boardEl.textContent = boardVersion ? 'V' + boardVersion : 'Not set'; + + const isV3 = boardVersion !== null && parseInt(boardVersion) === 3; + if (!isV3) { + calBtn.disabled = true; + diagBtn.disabled = true; + controls.style.opacity = '0.5'; + warning.style.display = 'block'; + } else { + calBtn.disabled = false; + diagBtn.disabled = false; + controls.style.opacity = '1'; + warning.style.display = 'none'; + } + + if (settingsRes.ok) { + const settings = await settingsRes.json(); + const dacEl = document.getElementById('strobe-saved-dac'); + if (settings.dac_setting !== null && settings.dac_setting !== undefined) { + dacEl.textContent = '0x' + parseInt(settings.dac_setting).toString(16).toUpperCase().padStart(2, '0'); + if (isV3) calBtn.textContent = 'Recalibrate'; + } else { + dacEl.textContent = 'Not calibrated'; + if (isV3) calBtn.textContent = 'Calibrate'; + } + } + } catch (error) { + console.error('Error loading strobe settings:', error); + } + } + + async startStrobeCalibration() { + const btn = document.getElementById('strobe-calibrate-btn'); + const cancelBtn = document.getElementById('strobe-cancel-btn'); + const originalText = btn.textContent; + + const ledType = document.getElementById('strobe-led-type').value; + const targetCurrent = ledType === 'v3' ? 10.0 : 9.0; + + try { + btn.disabled = true; + btn.textContent = 'Starting...'; + cancelBtn.style.display = ''; + + // Reset and show progress area, hide stale results + document.getElementById('strobe-progress-area').style.display = 'block'; + document.getElementById('strobe-result-area').style.display = 'none'; + document.getElementById('strobe-progress-fill').style.width = '0%'; + document.getElementById('strobe-progress-fill').style.background = ''; + document.getElementById('strobe-progress-message').textContent = 'Starting...'; + document.getElementById('strobe-state').textContent = 'Running'; + + const response = await fetch('/api/strobe-calibration/start', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + led_type: ledType, + target_current: targetCurrent, + overwrite: true + }) + }); + + if (!response.ok) { + const err = await response.json().catch(() => ({})); + throw new Error(err.error || 'Failed to start calibration'); + } + + btn.textContent = 'Calibrating...'; + this.pollStrobeStatus(); + } catch (error) { + console.error('Error starting strobe calibration:', error); + btn.disabled = false; + btn.textContent = originalText; + cancelBtn.style.display = 'none'; + document.getElementById('strobe-state').textContent = 'Failed'; + document.getElementById('strobe-progress-message').textContent = error.message; + } + } + + pollStrobeStatus() { + this.strobePollingTimer = setTimeout(async () => { + try { + const response = await fetch('/api/strobe-calibration/status'); + if (!response.ok) { + this.pollStrobeStatus(); + return; + } + + const status = await response.json(); + const progressFill = document.getElementById('strobe-progress-fill'); + const progressMsg = document.getElementById('strobe-progress-message'); + + if (status.progress !== undefined) { + progressFill.style.width = status.progress + '%'; + } + if (status.message) { + progressMsg.textContent = status.message; + } + + if (status.state === 'complete' || status.state === 'completed') { + this.onStrobeCalibrationDone(status); + } else if (status.state === 'failed' || status.state === 'error') { + this.onStrobeCalibrationFailed(status); + } else if (status.state === 'cancelled') { + this.onStrobeCalibrationCancelled(); + } else { + this.pollStrobeStatus(); + } + } catch (error) { + console.error('Error polling strobe status:', error); + this.pollStrobeStatus(); + } + }, 500); + } + + onStrobeCalibrationDone(status) { + const btn = document.getElementById('strobe-calibrate-btn'); + const cancelBtn = document.getElementById('strobe-cancel-btn'); + btn.disabled = false; + btn.textContent = 'Recalibrate'; + cancelBtn.style.display = 'none'; + + document.getElementById('strobe-progress-fill').style.width = '100%'; + document.getElementById('strobe-state').textContent = 'Complete'; + + // Show results + const resultArea = document.getElementById('strobe-result-area'); + const resultCard = document.getElementById('strobe-result-card'); + resultCard.style.borderColor = 'var(--success)'; + document.getElementById('strobe-result-title').textContent = 'Calibration Successful'; + + if (status.dac_setting !== undefined) { + document.getElementById('strobe-result-dac').textContent = + '0x' + status.dac_setting.toString(16).toUpperCase().padStart(2, '0'); + } + if (status.led_current !== undefined) { + document.getElementById('strobe-result-current').textContent = status.led_current.toFixed(2) + ' A'; + } + if (status.ldo_voltage !== undefined) { + document.getElementById('strobe-result-ldo').textContent = status.ldo_voltage.toFixed(2) + ' V'; + } + + resultArea.style.display = 'block'; + + // Refresh the saved DAC display + this.loadStrobeSettings(); + } + + onStrobeCalibrationFailed(status) { + const cancelBtn = document.getElementById('strobe-cancel-btn'); + cancelBtn.style.display = 'none'; + this.loadStrobeSettings(); + + document.getElementById('strobe-progress-fill').style.width = '100%'; + document.getElementById('strobe-progress-fill').style.background = 'var(--error)'; + document.getElementById('strobe-state').textContent = 'Failed'; + document.getElementById('strobe-progress-message').textContent = + status.message || 'Calibration failed'; + + // Show failure in result area + const resultArea = document.getElementById('strobe-result-area'); + const resultCard = document.getElementById('strobe-result-card'); + resultCard.style.borderColor = 'var(--error)'; + document.getElementById('strobe-result-title').textContent = 'Calibration Failed'; + document.getElementById('strobe-result-dac').textContent = '--'; + document.getElementById('strobe-result-current').textContent = '--'; + document.getElementById('strobe-result-ldo').textContent = '--'; + resultArea.style.display = 'block'; + } + + onStrobeCalibrationCancelled() { + const cancelBtn = document.getElementById('strobe-cancel-btn'); + cancelBtn.style.display = 'none'; + + document.getElementById('strobe-state').textContent = 'Idle'; + document.getElementById('strobe-progress-message').textContent = 'Cancelled by user'; + this.loadStrobeSettings(); + } + + async cancelStrobeCalibration() { + try { + const cancelBtn = document.getElementById('strobe-cancel-btn'); + cancelBtn.disabled = true; + cancelBtn.textContent = 'Cancelling...'; + + await fetch('/api/strobe-calibration/cancel', { method: 'POST' }); + // Polling loop will pick up the cancelled state + } catch (error) { + console.error('Error cancelling strobe calibration:', error); + } + } + + async readStrobeDiagnostics() { + const btn = document.getElementById('strobe-diagnostics-btn'); + const originalText = btn.textContent; + + try { + btn.disabled = true; + btn.textContent = 'Reading...'; + + const response = await fetch('/api/strobe-calibration/diagnostics'); + if (!response.ok) { + throw new Error('Failed to read diagnostics'); + } + + const data = await response.json(); + const grid = document.getElementById('strobe-diagnostics-grid'); + grid.innerHTML = ''; + + const items = [ + { label: 'LDO Voltage', value: data.ldo_voltage != null ? data.ldo_voltage.toFixed(2) + ' V' : '--' }, + { label: 'LED Current', value: data.led_current != null ? data.led_current.toFixed(2) + ' A' : '--' }, + { label: 'ADC CH0 Raw', value: data.adc_ch0_raw != null ? data.adc_ch0_raw : '--' }, + { label: 'ADC CH1 Raw', value: data.adc_ch1_raw != null ? data.adc_ch1_raw : '--' } + ]; + + items.forEach(item => { + this.addCalibrationDataItem(grid, item.label, item.value); + }); + + // Handle warnings + const warningEl = document.getElementById('strobe-diagnostics-warning'); + if (data.warning) { + warningEl.textContent = data.warning; + warningEl.style.display = 'block'; + } else { + warningEl.style.display = 'none'; + } + + document.getElementById('strobe-diagnostics-area').style.display = 'block'; + } catch (error) { + console.error('Error reading strobe diagnostics:', error); + } finally { + btn.disabled = false; + btn.textContent = originalText; + } + } } const calibration = new CalibrationManager(); diff --git a/Software/web-server/strobe_calibration_manager.py b/Software/web-server/strobe_calibration_manager.py new file mode 100644 index 00000000..a50c52c3 --- /dev/null +++ b/Software/web-server/strobe_calibration_manager.py @@ -0,0 +1,494 @@ +"""Strobe Calibration Manager for PiTrac Web Server + +Controls MCP4801 DAC and MCP3202 ADC over SPI1 to calibrate the IR strobe +LED current on the V3 Connector Board. +""" + +import asyncio +import gc +import logging +import os +import time +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + +try: + import spidev +except ImportError: + spidev = None + +try: + os.environ.setdefault('LG_WD', '/tmp') + from gpiozero import DigitalOutputDevice +except ImportError: + DigitalOutputDevice = None + + +class StrobeCalibrationManager: + """Manages strobe LED calibration via SPI hardware on the Connector Board""" + + # SPI bus 1 (auxiliary), CS0 = DAC, CS1 = ADC + SPI_BUS = 1 + SPI_DAC_DEVICE = 0 + SPI_ADC_DEVICE = 1 + SPI_MAX_SPEED_HZ = 1_000_000 + + # DIAG pin gates the strobe LED (BCM numbering) + DIAG_GPIO_PIN = 10 + + # MCP4801 8-bit DAC write command (1x gain, active output) + MCP4801_WRITE_CMD = 0x30 + + # MCP3202 12-bit ADC channel commands (single-ended) + ADC_CH0_CMD = 0x80 # LED current sense + ADC_CH1_CMD = 0xC0 # LDO voltage + + # DAC range + DAC_MIN = 0 + DAC_MAX = 0xFF + + # Safe fallback DAC value if calibration fails + SAFE_DAC_VALUE = 0x96 + + # If ADC CH0 reads above this with strobe off, something is wrong (blown MOSFET, shorted gate driver) + PREFLIGHT_CURRENT_THRESHOLD = 6 + + # LDO voltage bounds + LDO_MIN_V = 4.5 + LDO_MAX_V = 11.0 + + # Target LED currents (amps) + V3_TARGET_CURRENT = 10.0 + LEGACY_TARGET_CURRENT = 9.0 + HARD_CAP_CURRENT = 12.0 + + # Config key for persisting the result + DAC_CONFIG_KEY = "gs_config.strobing.kDAC_setting" + + def __init__(self, config_manager): + self.config_manager = config_manager + + self._spi_dac = None + self._spi_adc = None + self._diag_pin = None + + self._cancel_requested = False + + self.status: Dict[str, Any] = { + "state": "idle", + "progress": 0, + "message": "", + } + + # ------------------------------------------------------------------ + # Hardware lifecycle + # ------------------------------------------------------------------ + + def _open_hardware(self): + if spidev is None: + raise RuntimeError("spidev library not available -- not running on a Raspberry Pi?") + if DigitalOutputDevice is None: + raise RuntimeError("gpiozero library not available -- not running on a Raspberry Pi?") + + self._spi_dac = spidev.SpiDev() + self._spi_dac.open(self.SPI_BUS, self.SPI_DAC_DEVICE) + self._spi_dac.max_speed_hz = self.SPI_MAX_SPEED_HZ + self._spi_dac.mode = 0 + + self._spi_adc = spidev.SpiDev() + self._spi_adc.open(self.SPI_BUS, self.SPI_ADC_DEVICE) + self._spi_adc.max_speed_hz = self.SPI_MAX_SPEED_HZ + self._spi_adc.mode = 0 + + self._diag_pin = DigitalOutputDevice(self.DIAG_GPIO_PIN) + + def _close_hardware(self): + for name, resource in [("diag", self._diag_pin), + ("dac", self._spi_dac), + ("adc", self._spi_adc)]: + if resource is None: + continue + try: + if name == "diag": + resource.off() + time.sleep(0.1) + resource.close() + except Exception: + logger.debug(f"Error closing {name}", exc_info=True) + + self._diag_pin = None + self._spi_dac = None + self._spi_adc = None + + # ------------------------------------------------------------------ + # DAC / ADC primitives + # ------------------------------------------------------------------ + + def _set_dac(self, value: int): + """Write an 8-bit value to the MCP4801 DAC.""" + msb = self.MCP4801_WRITE_CMD | ((value >> 4) & 0x0F) + lsb = (value << 4) & 0xF0 + self._spi_dac.xfer2([msb, lsb]) + + def _read_adc(self, channel_cmd: int) -> int: + """Read a 12-bit value from the MCP3202 ADC.""" + response = self._spi_adc.xfer2([0x01, channel_cmd, 0x00]) + return ((response[1] & 0x0F) << 8) | response[2] + + def get_ldo_voltage(self) -> float: + """Read the LDO gate voltage via ADC CH1 (2k/1k resistor divider).""" + adc_value = self._read_adc(self.ADC_CH1_CMD) + return (3.3 / 4096) * adc_value * 3.0 + + def get_led_current(self) -> float: + """Pulse DIAG, read LED current sense via ADC CH0 (0.1 ohm sense resistor). + + Uses real-time scheduling and GC disable for deterministic timing. + DIAG is always turned off in the finally block. + """ + msg = [0x01, self.ADC_CH0_CMD, 0x00] + spi = self._spi_adc + diag = self._diag_pin + + gc.disable() + try: + try: + param = os.sched_param(os.sched_get_priority_max(os.SCHED_FIFO)) + os.sched_setscheduler(0, os.SCHED_FIFO, param) + except (PermissionError, AttributeError, OSError): + pass + + time.sleep(0) + + try: + diag.on() + response = spi.xfer2(msg) + finally: + diag.off() + try: + os.sched_setscheduler(0, os.SCHED_OTHER, os.sched_param(0)) + except (PermissionError, AttributeError, OSError): + pass + finally: + gc.enable() + + adc_value = ((response[1] & 0x0F) << 8) | response[2] + return (3.3 / 4096) * adc_value * 10.0 + + # ------------------------------------------------------------------ + # Calibration algorithm + # ------------------------------------------------------------------ + + def _find_dac_start(self): + """Sweep DAC 0->255, return last value where LDO stays >= LDO_MIN_V. + + Returns: + (dac_value, ldo_voltage) — dac_value is -1 if even DAC 0 is unsafe. + """ + dac_start = 0 + ldo = 0.0 + + for i in range(self.DAC_MAX + 1): + self._set_dac(i) + time.sleep(0.1) + ldo = self.get_ldo_voltage() + logger.debug(f"DAC={i:#04x}, LDO={ldo:.2f}V") + + if ldo < self.LDO_MIN_V: + dac_start = i - 1 + return dac_start, ldo + + dac_start = i + + return dac_start, ldo + + def _calibrate(self, target_current: float): + """Run full calibration: find safe start, sweep down to target, average. + + Returns: + (success, final_dac, led_current) + """ + # Pre-flight: check for current with strobe off — indicates blown MOSFET or gate driver + idle_adc = self._read_adc(self.ADC_CH0_CMD) + if idle_adc > self.PREFLIGHT_CURRENT_THRESHOLD: + self.status["message"] = f"Current detected with strobe off (ADC CH0={idle_adc}). Likely blown MOSFET or gate driver — check V3 Connector Board." + return False, -1, -1 + + # Phase 1: find safe starting DAC + dac_start, ldo = self._find_dac_start() + + if dac_start < 0: + self.status["message"] = f"DAC value of 0 is below minimum LDO voltage ({self.LDO_MIN_V:.2f}V): {ldo:.2f}V. This indicates a problem with the controller board." + return False, -1, -1 + + logger.debug(f"Calibrating: target={target_current}A, dac_start={dac_start:#04x}") + + # Phase 2: sweep from dac_start downward, looking for target crossing + final_dac = self.DAC_MIN + crossed = False + + for dac in range(dac_start, self.DAC_MIN - 1, -1): + if self._cancel_requested: + logger.info("Calibration cancelled by user") + return False, -1, -1 + + self._set_dac(dac) + time.sleep(0.1) + + steps_done = dac_start - dac + total_steps = dac_start - self.DAC_MIN + 1 + if total_steps > 0: + self.status["progress"] = int(20 + (steps_done / total_steps) * 60) + + ldo = self.get_ldo_voltage() + + if ldo < self.LDO_MIN_V: + logger.debug(f"LDO {ldo:.2f}V below min at DAC={dac:#04x}, skipping") + final_dac = dac + continue + + if ldo > self.LDO_MAX_V: + self.status["message"] = f"LDO voltage ({ldo:.2f}V) above maximum ({self.LDO_MAX_V}V). Stopping calibration, as something is wrong." + return False, -1, -1 + + led_current = self.get_led_current() + logger.debug(f"DAC={dac:#04x}, current={led_current:.2f}A") + + if led_current > self.HARD_CAP_CURRENT: + self.status["message"] = f"LED current ({led_current:.2f}A) exceeds hard cap ({self.HARD_CAP_CURRENT}A). This strongly indicates the LED is shorted." + return False, -1, -1 + + if led_current > target_current: + logger.debug(f"Crossed target at DAC={dac:#04x} ({led_current:.2f}A)") + final_dac = dac + 1 + crossed = True + break + + final_dac = dac + + if not crossed: + self.status["message"] = f"Reached MIN_DAC without reaching target ({target_current}A). This generally indicates a problem." + return False, -1, -1 + if final_dac >= self.DAC_MAX: + self.status["message"] = "MAX_DAC resulted in current above target. This generally indicates a problem." + return False, -1, -1 + + # Phase 3: average readings at the final setting to refine + led_current = 0.0 + n_avg = 10 + + while True: + if self._cancel_requested: + return False, -1, -1 + + self._set_dac(final_dac) + time.sleep(0.1) + + ldo = self.get_ldo_voltage() + if ldo < self.LDO_MIN_V: + final_dac -= 1 + break + + current_sum = 0.0 + for _ in range(n_avg): + current_sum += self.get_led_current() + time.sleep(0.1) + led_current = current_sum / n_avg + + if led_current > target_current: + final_dac += 1 + if final_dac > self.DAC_MAX: + logger.error("Averaging loop exceeded DAC_MAX") + return False, -1, -1 + else: + break + + self.status["progress"] = 90 + logger.debug(f"Calibration result: DAC={final_dac:#04x}, current={led_current:.2f}A") + return True, final_dac, led_current + + # ------------------------------------------------------------------ + # Public async API (called from web server endpoints) + # ------------------------------------------------------------------ + + async def start_calibration(self, led_type: str = "v3", + target_current: Optional[float] = None, + overwrite: bool = False) -> Dict[str, Any]: + """Run full strobe calibration. Blocking I/O is offloaded to a thread.""" + + if self.status.get("state") == "calibrating": + return {"status": "error", "message": "Calibration already in progress"} + + # Validate board version + board_version = self.config_manager.get_config("gs_config.strobing.kConnectionBoardVersion") + if board_version is None or int(board_version) != 3: + msg = f"Board version must be V3 (got {board_version})" + self.status = {"state": "error", "progress": 0, "message": msg} + return {"status": "error", "message": msg} + + # Check for existing calibration + existing = self.config_manager.get_config(self.DAC_CONFIG_KEY) + if existing is not None and not overwrite: + msg = "Calibration already exists. Set overwrite=true to replace." + self.status = {"state": "error", "progress": 0, "message": msg} + return {"status": "error", "message": msg} + + # Resolve target + if target_current is not None: + target = target_current + elif led_type == "legacy": + target = self.LEGACY_TARGET_CURRENT + else: + target = self.V3_TARGET_CURRENT + + self._cancel_requested = False + self.status = {"state": "calibrating", "progress": 0, "message": "Starting calibration"} + + loop = asyncio.get_event_loop() + try: + result = await loop.run_in_executor(None, self._run_calibration_sync, target) + return result + except Exception as e: + logger.error(f"Calibration error: {e}") + self.status = {"state": "error", "progress": 0, "message": str(e)} + return {"status": "error", "message": str(e)} + + def _run_calibration_sync(self, target: float) -> Dict[str, Any]: + """Synchronous calibration wrapper — runs in executor thread.""" + try: + self._open_hardware() + + success, final_dac, led_current = self._calibrate(target) + + if success and final_dac > 0: + self.config_manager.set_config(self.DAC_CONFIG_KEY, final_dac) + self.status = { + "state": "complete", "progress": 100, + "message": f"DAC=0x{final_dac:02X}, current={led_current:.2f}A", + "dac_setting": final_dac, + "led_current": round(led_current, 2), + } + return self.status + elif self._cancel_requested: + self.status = {"state": "cancelled", "progress": 0, + "message": "Calibration cancelled by user"} + return self.status + else: + self._set_dac(self.SAFE_DAC_VALUE) + reason = self.status.get("message", "Calibration failed") + self.status = {"state": "failed", "progress": 0, + "message": f"{reason} DAC set to safe fallback."} + return self.status + + except Exception as e: + logger.error(f"Calibration exception: {e}") + self.status = {"state": "error", "progress": 0, "message": str(e)} + return {"status": "error", "message": str(e)} + finally: + self._close_hardware() + + def cancel(self): + """Request cancellation of a running calibration.""" + self._cancel_requested = True + + def get_status(self) -> Dict[str, Any]: + """Return a snapshot of the current calibration status.""" + return dict(self.status) + + async def read_diagnostics(self) -> Dict[str, Any]: + """Read LDO voltage, LED current, and raw ADC values.""" + loop = asyncio.get_event_loop() + try: + return await loop.run_in_executor(None, self._read_diagnostics_sync) + except Exception as e: + return {"status": "error", "message": str(e)} + + def _read_diagnostics_sync(self) -> Dict[str, Any]: + try: + self._open_hardware() + + ldo = self.get_ldo_voltage() + adc_ch0 = self._read_adc(self.ADC_CH0_CMD) + adc_ch1 = self._read_adc(self.ADC_CH1_CMD) + + result: Dict[str, Any] = { + "ldo_voltage": round(ldo, 2), + "adc_ch0_raw": adc_ch0, + "adc_ch1_raw": adc_ch1, + } + + if ldo >= self.LDO_MIN_V: + led_current = self.get_led_current() + result["led_current"] = round(led_current, 2) + else: + result["led_current"] = None + result["warning"] = f"LDO unsafe ({ldo:.2f}V) — skipped LED current read" + + return result + + except Exception as e: + logger.error(f"Diagnostics error: {e}") + return {"status": "error", "message": str(e)} + finally: + self._close_hardware() + + async def set_dac_manual(self, value: int) -> Dict[str, Any]: + """Set DAC to a specific value and report LDO voltage.""" + if value < self.DAC_MIN or value > self.DAC_MAX: + return {"status": "error", + "message": f"DAC value must be {self.DAC_MIN}-{self.DAC_MAX}"} + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self._set_dac_manual_sync, value) + + def _set_dac_manual_sync(self, value: int) -> Dict[str, Any]: + try: + self._open_hardware() + self._set_dac(value) + time.sleep(0.1) + ldo = self.get_ldo_voltage() + + result: Dict[str, Any] = { + "status": "success", + "dac_value": value, + "ldo_voltage": round(ldo, 2), + } + + if ldo < self.LDO_MIN_V: + result["warning"] = f"LDO voltage {ldo:.2f}V is below minimum {self.LDO_MIN_V}V" + + return result + except Exception as e: + return {"status": "error", "message": str(e)} + finally: + self._close_hardware() + + async def get_dac_start(self) -> Dict[str, Any]: + """Run the safe-start sweep and return the boundary DAC value.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self._get_dac_start_sync) + + def _get_dac_start_sync(self) -> Dict[str, Any]: + try: + self._open_hardware() + dac_start, ldo = self._find_dac_start() + + if dac_start >= 0: + self._set_dac(dac_start) + time.sleep(0.1) + ldo = self.get_ldo_voltage() + + return { + "dac_start": dac_start, + "ldo_voltage": round(ldo, 2), + } + except Exception as e: + return {"status": "error", "message": str(e)} + finally: + self._close_hardware() + + async def get_saved_settings(self) -> Dict[str, Any]: + """Read the saved kDAC_setting from config.""" + value = self.config_manager.get_config(self.DAC_CONFIG_KEY) + return {"dac_setting": value} diff --git a/Software/web-server/templates/calibration.html b/Software/web-server/templates/calibration.html index 71c9a37c..49b891ba 100644 --- a/Software/web-server/templates/calibration.html +++ b/Software/web-server/templates/calibration.html @@ -285,6 +285,88 @@

Next Steps:

+
+
+

Strobe Calibration

+
+
+ Board: + Loading... +
+
+ Saved DAC: + Loading... +
+
+ State: + Idle +
+
+
+ + + +
+
+ + +
+
+ + + +
+
+ + + + + + +
+

Current Calibration Data

diff --git a/Software/web-server/tests/test_strobe_calibration_api.py b/Software/web-server/tests/test_strobe_calibration_api.py new file mode 100644 index 00000000..c0a05116 --- /dev/null +++ b/Software/web-server/tests/test_strobe_calibration_api.py @@ -0,0 +1,168 @@ +"""Tests for strobe calibration API endpoints""" + +import pytest +from unittest.mock import Mock, AsyncMock, patch + + +@pytest.mark.unit +class TestStrobeCalibrationAPI: + """Test suite for strobe calibration API endpoints""" + + @pytest.fixture + def mock_strobe_manager(self): + """Create a mock strobe calibration manager""" + manager = Mock() + manager.get_status = Mock( + return_value={"state": "idle", "progress": 0, "message": ""} + ) + manager.get_saved_settings = AsyncMock( + return_value={"dac_setting": None} + ) + manager.start_calibration = AsyncMock( + return_value={"status": "success", "dac_setting": 150, "led_current": 9.85} + ) + manager.cancel = Mock() + manager.read_diagnostics = AsyncMock( + return_value={ + "ldo_voltage": 7.42, + "adc_ch0_raw": 1234, + "adc_ch1_raw": 2048, + "led_current": 9.5, + } + ) + manager.set_dac_manual = AsyncMock( + return_value={"status": "success", "dac_value": 128, "ldo_voltage": 6.8} + ) + manager.get_dac_start = AsyncMock( + return_value={"dac_start": 200, "ldo_voltage": 5.1} + ) + return manager + + def test_get_status(self, client, server_instance, mock_strobe_manager): + """Status endpoint returns expected shape""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.get("/api/strobe-calibration/status") + assert response.status_code == 200 + + data = response.json() + assert "state" in data + assert "progress" in data + assert "message" in data + assert data["state"] == "idle" + + def test_get_settings(self, client, server_instance, mock_strobe_manager): + """Settings endpoint returns saved DAC value""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.get("/api/strobe-calibration/settings") + assert response.status_code == 200 + + data = response.json() + assert "dac_setting" in data + + def test_start_calibration(self, client, server_instance, mock_strobe_manager): + """Start endpoint accepts parameters and returns immediately""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.post( + "/api/strobe-calibration/start", + json={"led_type": "v3", "target_current": 10.0, "overwrite": True}, + ) + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "started" + + def test_start_calibration_defaults(self, client, server_instance, mock_strobe_manager): + """Start endpoint works with empty body (all defaults)""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.post("/api/strobe-calibration/start", json={}) + assert response.status_code == 200 + assert response.json()["status"] == "started" + + def test_cancel(self, client, server_instance, mock_strobe_manager): + """Cancel endpoint calls manager.cancel and returns ok""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.post("/api/strobe-calibration/cancel") + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "ok" + mock_strobe_manager.cancel.assert_called_once() + + def test_get_diagnostics(self, client, server_instance, mock_strobe_manager): + """Diagnostics endpoint returns hardware readings""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.get("/api/strobe-calibration/diagnostics") + assert response.status_code == 200 + + data = response.json() + assert "ldo_voltage" in data + assert "led_current" in data + + def test_set_dac_valid(self, client, server_instance, mock_strobe_manager): + """Set-dac endpoint accepts a valid integer""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.post( + "/api/strobe-calibration/set-dac", json={"value": 128} + ) + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "success" + mock_strobe_manager.set_dac_manual.assert_called_once_with(128) + + def test_set_dac_missing_value(self, client, server_instance, mock_strobe_manager): + """Set-dac rejects request with no value""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.post("/api/strobe-calibration/set-dac", json={}) + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "error" + assert "value" in data["message"].lower() + + def test_set_dac_non_integer(self, client, server_instance, mock_strobe_manager): + """Set-dac rejects a non-integer value""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.post( + "/api/strobe-calibration/set-dac", json={"value": "abc"} + ) + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "error" + + def test_get_dac_start(self, client, server_instance, mock_strobe_manager): + """Dac-start endpoint returns boundary value""" + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.get("/api/strobe-calibration/dac-start") + assert response.status_code == 200 + + data = response.json() + assert "dac_start" in data + assert "ldo_voltage" in data + + def test_status_during_calibration(self, client, server_instance, mock_strobe_manager): + """Status reflects in-progress calibration""" + mock_strobe_manager.get_status.return_value = { + "state": "calibrating", + "progress": 45, + "message": "Sweeping DAC", + } + server_instance.strobe_calibration_manager = mock_strobe_manager + + response = client.get("/api/strobe-calibration/status") + assert response.status_code == 200 + + data = response.json() + assert data["state"] == "calibrating" + assert data["progress"] == 45 diff --git a/Software/web-server/tests/test_strobe_calibration_manager.py b/Software/web-server/tests/test_strobe_calibration_manager.py new file mode 100644 index 00000000..e5acba75 --- /dev/null +++ b/Software/web-server/tests/test_strobe_calibration_manager.py @@ -0,0 +1,987 @@ +"""Tests for StrobeCalibrationManager""" + +import asyncio +import gc +import os +import time +import pytest +from unittest.mock import Mock, MagicMock, patch, PropertyMock + + +# --------------------------------------------------------------------------- +# Increment 1: Skeleton + hardware open/close +# --------------------------------------------------------------------------- + +class TestStrobeCalibrationInit: + """Initialization and default state""" + + def test_init_stores_config_manager(self): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + mgr = StrobeCalibrationManager(cm) + assert mgr.config_manager is cm + + def test_init_status_is_idle(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + assert mgr.status["state"] == "idle" + assert mgr.status["progress"] == 0 + assert mgr.status["message"] == "" + + def test_init_hardware_refs_are_none(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + assert mgr._spi_dac is None + assert mgr._spi_adc is None + assert mgr._diag_pin is None + + def test_init_cancel_flag_false(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + assert mgr._cancel_requested is False + + +class TestOpenHardware: + """_open_hardware sets up SPI and GPIO""" + + @patch("strobe_calibration_manager.spidev") + @patch("strobe_calibration_manager.DigitalOutputDevice") + def test_open_creates_spi_and_gpio(self, mock_led_cls, mock_spidev_mod): + from strobe_calibration_manager import StrobeCalibrationManager + + mock_dac = MagicMock() + mock_adc = MagicMock() + mock_spidev_mod.SpiDev.side_effect = [mock_dac, mock_adc] + + mgr = StrobeCalibrationManager(Mock()) + mgr._open_hardware() + + assert mgr._spi_dac is mock_dac + mock_dac.open.assert_called_once_with(1, 0) + assert mock_dac.max_speed_hz == 1_000_000 + assert mock_dac.mode == 0 + + assert mgr._spi_adc is mock_adc + mock_adc.open.assert_called_once_with(1, 1) + assert mock_adc.max_speed_hz == 1_000_000 + assert mock_adc.mode == 0 + + mock_led_cls.assert_called_once_with(10) + assert mgr._diag_pin is mock_led_cls.return_value + + @patch("strobe_calibration_manager.spidev", None) + def test_open_raises_when_spidev_missing(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + with pytest.raises(RuntimeError, match="spidev"): + mgr._open_hardware() + + +class TestCloseHardware: + """_close_hardware tears down SPI and GPIO safely""" + + def test_close_calls_close_on_all(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mock_dac = MagicMock() + mock_adc = MagicMock() + mock_diag = MagicMock() + mgr._spi_dac = mock_dac + mgr._spi_adc = mock_adc + mgr._diag_pin = mock_diag + + mgr._close_hardware() + + mock_dac.close.assert_called_once() + mock_adc.close.assert_called_once() + mock_diag.close.assert_called_once() + assert mgr._spi_dac is None + assert mgr._spi_adc is None + assert mgr._diag_pin is None + + def test_close_tolerates_none_refs(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + # all refs are None by default -- should not raise + mgr._close_hardware() + + def test_close_turns_diag_off_first(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + pin = MagicMock() + mgr._diag_pin = pin + + mgr._close_hardware() + # off() should be called before close() + pin.off.assert_called_once() + pin.close.assert_called_once() + + +# --------------------------------------------------------------------------- +# Increment 2: DAC/ADC primitives +# --------------------------------------------------------------------------- + +class TestSetDac: + """_set_dac encodes value into MCP4801 protocol and sends via SPI""" + + def test_set_dac_zero(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_dac = MagicMock() + + mgr._set_dac(0) + mgr._spi_dac.xfer2.assert_called_once_with([0x30, 0x00]) + + def test_set_dac_max(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_dac = MagicMock() + + mgr._set_dac(0xFF) + # 0x30 | (0xFF >> 4 & 0x0F) = 0x30 | 0x0F = 0x3F + # (0xFF << 4) & 0xF0 = 0xF0 + mgr._spi_dac.xfer2.assert_called_once_with([0x3F, 0xF0]) + + def test_set_dac_midrange(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_dac = MagicMock() + + # 0x96 = 150 + # msb: 0x30 | (0x96 >> 4 & 0x0F) = 0x30 | 0x09 = 0x39 + # lsb: (0x96 << 4) & 0xF0 = 0x60 + mgr._set_dac(0x96) + mgr._spi_dac.xfer2.assert_called_once_with([0x39, 0x60]) + + +class TestReadAdc: + """_read_adc sends command and parses 12-bit response""" + + def test_read_adc_parses_12bit(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_adc = MagicMock() + # response: first byte ignored, upper nibble in byte 1, full byte 2 + # 0x0A << 8 | 0xBC = 0xABC = 2748 + mgr._spi_adc.xfer2.return_value = [0x00, 0x0A, 0xBC] + + result = mgr._read_adc(0x80) + mgr._spi_adc.xfer2.assert_called_once_with([0x01, 0x80, 0x00]) + assert result == 2748 + + def test_read_adc_masks_upper_nibble(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_adc = MagicMock() + # byte 1 has extra bits above nibble -- should be masked + mgr._spi_adc.xfer2.return_value = [0xFF, 0xFF, 0xFF] + + result = mgr._read_adc(0xC0) + # (0xFF & 0x0F) << 8 | 0xFF = 0x0FFF = 4095 + assert result == 4095 + + def test_read_adc_zero(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_adc = MagicMock() + mgr._spi_adc.xfer2.return_value = [0x00, 0x00, 0x00] + + assert mgr._read_adc(0x80) == 0 + + +class TestGetLdoVoltage: + """get_ldo_voltage reads CH1 and applies resistor divider formula""" + + def test_ldo_voltage_calculation(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_adc = MagicMock() + # pick adc value that gives a nice voltage + # LDO = (3.3 / 4096) * adc * 3.0 + # for adc = 2048: (3.3/4096)*2048*3 = 4.95 + mgr._spi_adc.xfer2.return_value = [0x00, 0x08, 0x00] # 2048 + + voltage = mgr.get_ldo_voltage() + assert abs(voltage - 4.95) < 0.01 + + def test_ldo_voltage_zero(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_adc = MagicMock() + mgr._spi_adc.xfer2.return_value = [0x00, 0x00, 0x00] + + assert mgr.get_ldo_voltage() == 0.0 + + +class TestGetLedCurrent: + """get_led_current pulses DIAG, reads CH0, converts to amps""" + + def test_led_current_calculation(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_adc = MagicMock() + mgr._diag_pin = MagicMock() + # LED current = (3.3 / 4096) * adc * 10.0 + # for adc = 1240: (3.3/4096)*1240*10 = ~9.98 + mgr._spi_adc.xfer2.return_value = [0x00, 0x04, 0xD8] # 0x04D8 = 1240 + + with patch("strobe_calibration_manager.gc"), \ + patch("strobe_calibration_manager.time.sleep"): + current = mgr.get_led_current() + + expected = (3.3 / 4096) * 1240 * 10.0 + assert abs(current - expected) < 0.01 + + def test_led_current_always_turns_diag_off(self): + """Even if SPI read raises, DIAG must be turned off""" + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_adc = MagicMock() + mgr._spi_adc.xfer2.side_effect = RuntimeError("SPI failure") + mgr._diag_pin = MagicMock() + + with patch("strobe_calibration_manager.gc"), \ + patch("strobe_calibration_manager.time.sleep"): + with pytest.raises(RuntimeError): + mgr.get_led_current() + + mgr._diag_pin.off.assert_called_once() + + def test_led_current_pulses_diag_on_then_off(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_adc = MagicMock() + mgr._spi_adc.xfer2.return_value = [0x00, 0x04, 0x00] + mgr._diag_pin = MagicMock() + + call_order = [] + mgr._diag_pin.on.side_effect = lambda: call_order.append("on") + mgr._diag_pin.off.side_effect = lambda: call_order.append("off") + + with patch("strobe_calibration_manager.gc"), \ + patch("strobe_calibration_manager.time.sleep"): + mgr.get_led_current() + + assert call_order == ["on", "off"] + + +# --------------------------------------------------------------------------- +# Increment 3: Calibration algorithm +# --------------------------------------------------------------------------- + +def _make_mgr_with_hw(): + """Helper: create a manager with mocked hardware attached.""" + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._spi_dac = MagicMock() + mgr._spi_adc = MagicMock() + mgr._spi_adc.xfer2.return_value = [0x00, 0x00, 0x00] + mgr._diag_pin = MagicMock() + return mgr + + +class TestFindDacStart: + """_find_dac_start sweeps DAC 0->255, returns last value where LDO >= 4.5V""" + + @patch("strobe_calibration_manager.time.sleep") + def test_finds_boundary(self, _sleep): + mgr = _make_mgr_with_hw() + + # LDO drops below 4.5V at DAC=5, so start should be 4 + voltages = [8.0, 7.5, 6.5, 5.5, 4.8, 4.3] + call_idx = [0] + + def fake_ldo(): + v = voltages[min(call_idx[0], len(voltages) - 1)] + call_idx[0] += 1 + return v + + mgr.get_ldo_voltage = fake_ldo + + dac_start, ldo = mgr._find_dac_start() + assert dac_start == 4 + + @patch("strobe_calibration_manager.time.sleep") + def test_dac_zero_already_unsafe(self, _sleep): + mgr = _make_mgr_with_hw() + + mgr.get_ldo_voltage = lambda: 3.0 # always below 4.5V + + dac_start, ldo = mgr._find_dac_start() + assert dac_start == -1 + + @patch("strobe_calibration_manager.time.sleep") + def test_never_drops_returns_255(self, _sleep): + mgr = _make_mgr_with_hw() + + mgr.get_ldo_voltage = lambda: 9.0 # always safe + + dac_start, ldo = mgr._find_dac_start() + assert dac_start == 255 + + @patch("strobe_calibration_manager.time.sleep") + def test_sets_dac_at_each_step(self, _sleep): + mgr = _make_mgr_with_hw() + + calls = [] + mgr._set_dac = lambda v: calls.append(v) + mgr.get_ldo_voltage = lambda: 3.0 # immediately unsafe + + mgr._find_dac_start() + # Should have set DAC=0, then LDO was bad, so start=-1 + assert calls[0] == 0 + + +class TestCalibrate: + """_calibrate runs all 3 phases: find_start, main sweep, averaging""" + + @patch("strobe_calibration_manager.time.sleep") + def test_preflight_fails_when_current_detected_with_strobe_off(self, _sleep): + mgr = _make_mgr_with_hw() + mgr._spi_adc.xfer2.return_value = [0x00, 0x00, 0x10] # ADC=16, above threshold of 6 + + success, dac, current = mgr._calibrate(10.0) + assert success is False + + @patch("strobe_calibration_manager.time.sleep") + def test_succeeds_with_realistic_data(self, _sleep): + mgr = _make_mgr_with_hw() + + # Phase 1: LDO drops at DAC=100 so start=99 + phase1_voltages = [9.0] * 100 + [4.0] + phase1_idx = [0] + + def phase1_ldo(): + v = phase1_voltages[min(phase1_idx[0], len(phase1_voltages) - 1)] + phase1_idx[0] += 1 + return v + + # Phase 2: sweep DAC from 99 down, LED current rises as DAC decreases. + # Current crosses 10A (V3 target) at DAC=50 + phase2_ldo = [7.0] # always safe during phase 2 + phase2_currents = {} + for d in range(100): + # current goes from ~5A at DAC=99 up to ~12A at DAC=0 + phase2_currents[d] = 5.0 + (99 - d) * (7.0 / 99) + + # Phase 3: averaging returns just under target + avg_current = [9.8] + + # Wire up the mock methods + phase = [1] + ldo_call = [0] + current_call = [0] + current_dac_val = [99] + + def smart_set_dac(v): + current_dac_val[0] = v + + def smart_ldo(): + if phase[0] == 1: + return phase1_ldo() + return phase2_ldo[0] + + def smart_current(): + return phase2_currents.get(current_dac_val[0], 5.0) + + def smart_avg_current(): + return avg_current[0] + + # Patch _find_dac_start to return a known start + mgr._find_dac_start = lambda: (99, 7.0) + mgr._set_dac = smart_set_dac + mgr.get_ldo_voltage = lambda: 7.0 # always safe + mgr.get_led_current = smart_current + phase[0] = 2 + + success, final_dac, current = mgr._calibrate(10.0) + assert success is True + assert final_dac > 0 + assert final_dac <= 99 + + @patch("strobe_calibration_manager.time.sleep") + def test_fails_when_dac_zero_unsafe(self, _sleep): + mgr = _make_mgr_with_hw() + + mgr._find_dac_start = lambda: (-1, 3.0) + + success, dac, current = mgr._calibrate(10.0) + assert success is False + assert dac == -1 + + @patch("strobe_calibration_manager.time.sleep") + def test_fails_when_ldo_too_high(self, _sleep): + mgr = _make_mgr_with_hw() + + mgr._find_dac_start = lambda: (200, 7.0) + mgr._set_dac = lambda v: None + mgr.get_ldo_voltage = lambda: 12.0 # above LDO_MAX_V of 11.0 + + success, dac, current = mgr._calibrate(10.0) + assert success is False + + @patch("strobe_calibration_manager.time.sleep") + def test_fails_when_min_dac_reached(self, _sleep): + mgr = _make_mgr_with_hw() + + mgr._find_dac_start = lambda: (10, 7.0) + mgr._set_dac = lambda v: None + mgr.get_ldo_voltage = lambda: 7.0 + mgr.get_led_current = lambda: 5.0 # never reaches target + + success, dac, current = mgr._calibrate(10.0) + assert success is False + + @patch("strobe_calibration_manager.time.sleep") + def test_cancel_during_main_sweep(self, _sleep): + mgr = _make_mgr_with_hw() + + mgr._find_dac_start = lambda: (200, 7.0) + mgr._set_dac = lambda v: None + mgr.get_ldo_voltage = lambda: 7.0 + mgr.get_led_current = lambda: 5.0 + + # Cancel after a few iterations + counter = [0] + original_set_dac = mgr._set_dac + def counting_set_dac(v): + counter[0] += 1 + if counter[0] > 5: + mgr._cancel_requested = True + original_set_dac(v) + mgr._set_dac = counting_set_dac + + success, dac, current = mgr._calibrate(10.0) + assert success is False + + @patch("strobe_calibration_manager.time.sleep") + def test_hard_cap_rejects_excessive_current(self, _sleep): + """If LED current ever exceeds HARD_CAP_CURRENT, calibration should fail""" + mgr = _make_mgr_with_hw() + + mgr._find_dac_start = lambda: (50, 7.0) + mgr._set_dac = lambda v: None + mgr.get_ldo_voltage = lambda: 7.0 + mgr.get_led_current = lambda: 13.0 # above 12A hard cap + + success, dac, current = mgr._calibrate(10.0) + assert success is False + + @patch("strobe_calibration_manager.time.sleep") + def test_skips_ldo_below_min_during_sweep(self, _sleep): + """When LDO drops below min during sweep, that step is skipped""" + mgr = _make_mgr_with_hw() + + mgr._find_dac_start = lambda: (5, 7.0) + dac_calls = [] + mgr._set_dac = lambda v: dac_calls.append(v) + + # LDO goes below min at DAC=4, then back up for DAC=3..0 + ldo_values = {5: 7.0, 4: 4.0, 3: 7.0, 2: 7.0, 1: 7.0, 0: 7.0} + mgr.get_ldo_voltage = lambda: ldo_values.get(dac_calls[-1] if dac_calls else 5, 7.0) + mgr.get_led_current = lambda: 5.0 # never reaches target + + success, dac, current = mgr._calibrate(10.0) + # Should have continued past the low LDO step + assert success is False # reaches MIN_DAC + assert 4 in dac_calls # did try DAC=4 + + @patch("strobe_calibration_manager.time.sleep") + def test_averaging_steps_up_when_still_over_target(self, _sleep): + """Phase 3: if average current still above target, increment DAC""" + mgr = _make_mgr_with_hw() + + dac_val = [0] + + def track_dac(v): + dac_val[0] = v + + mgr._find_dac_start = lambda: (50, 7.0) + mgr._set_dac = track_dac + mgr.get_ldo_voltage = lambda: 7.0 + + def smart_current(): + d = dac_val[0] + # Sweep phase: crosses target at DAC=25 (current > 10A) + if d <= 25: + return 10.5 + # Averaging phase at DAC=26: first time over target, second time under + if d == 26: + # Return slightly above target so it steps up to 27 + smart_current._avg_call_count = getattr(smart_current, '_avg_call_count', 0) + 1 + if smart_current._avg_call_count <= 10: + return 10.1 + return 9.8 + if d == 27: + return 9.8 + return 5.0 + + mgr.get_led_current = smart_current + + success, final_dac, current = mgr._calibrate(10.0) + assert success is True + # Should have stepped up from 26 to 27 during averaging + assert final_dac == 27 + + @patch("strobe_calibration_manager.time.sleep") + def test_updates_status_progress(self, _sleep): + """Status dict should be updated during calibration sweep""" + mgr = _make_mgr_with_hw() + + mgr._find_dac_start = lambda: (10, 7.0) + mgr._set_dac = lambda v: None + mgr.get_ldo_voltage = lambda: 7.0 + + # Have it cross the target at DAC=5 + current_dac = [10] + original_set = mgr._set_dac + def tracking_set(v): + current_dac[0] = v + mgr._set_dac = tracking_set + + def current_fn(): + if current_dac[0] <= 5: + return 10.5 # above target + return 5.0 + mgr.get_led_current = current_fn + + progress_values = [] + original_status = mgr.status + class StatusProxy(dict): + def __setitem__(self, k, v): + super().__setitem__(k, v) + if k == "progress": + progress_values.append(v) + proxy = StatusProxy(original_status) + mgr.status = proxy + + mgr._calibrate(10.0) + # Should have updated progress at least once + assert len(progress_values) > 0 + + +# --------------------------------------------------------------------------- +# Increment 4: Async public API +# --------------------------------------------------------------------------- + +class TestStartCalibration: + """start_calibration validates inputs, checks board version, runs _calibrate""" + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_rejects_non_v3_board(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "2", + "gs_config.strobing.kDAC_setting": None, + }.get(key) + + mgr = StrobeCalibrationManager(cm) + + result = await mgr.start_calibration(led_type="v3") + assert result["status"] == "error" + assert "version" in result["message"].lower() or "V3" in result["message"] + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_rejects_overwrite_without_flag(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": 150, + }.get(key) + + mgr = StrobeCalibrationManager(cm) + + result = await mgr.start_calibration(led_type="v3", overwrite=False) + assert result["status"] == "error" + assert "exist" in result["message"].lower() or "overwrite" in result["message"].lower() + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_allows_overwrite_with_flag(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": 150, + }.get(key) + cm.set_config.return_value = (True, "ok", False) + + mgr = StrobeCalibrationManager(cm) + + # Mock out the hardware and calibration + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr._calibrate = Mock(return_value=(True, 0x80, 9.5)) + + result = await mgr.start_calibration(led_type="v3", overwrite=True) + assert result["state"] == "complete" + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_uses_v3_target_current(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": None, + }.get(key) + cm.set_config.return_value = (True, "ok", False) + + mgr = StrobeCalibrationManager(cm) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + + captured_target = [] + def fake_calibrate(target): + captured_target.append(target) + return (True, 0x80, 9.5) + mgr._calibrate = fake_calibrate + + await mgr.start_calibration(led_type="v3") + assert captured_target[0] == 10.0 + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_uses_legacy_target_current(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": None, + }.get(key) + cm.set_config.return_value = (True, "ok", False) + + mgr = StrobeCalibrationManager(cm) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + + captured_target = [] + def fake_calibrate(target): + captured_target.append(target) + return (True, 0x80, 8.5) + mgr._calibrate = fake_calibrate + + await mgr.start_calibration(led_type="legacy") + assert captured_target[0] == 9.0 + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_uses_custom_target(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": None, + }.get(key) + cm.set_config.return_value = (True, "ok", False) + + mgr = StrobeCalibrationManager(cm) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + + captured_target = [] + def fake_calibrate(target): + captured_target.append(target) + return (True, 0x80, 7.5) + mgr._calibrate = fake_calibrate + + await mgr.start_calibration(led_type="v3", target_current=7.5) + assert captured_target[0] == 7.5 + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_saves_result_on_success(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": None, + }.get(key) + cm.set_config.return_value = (True, "ok", False) + + mgr = StrobeCalibrationManager(cm) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr._calibrate = Mock(return_value=(True, 0x80, 9.5)) + + await mgr.start_calibration(led_type="v3") + cm.set_config.assert_called_once_with("gs_config.strobing.kDAC_setting", 0x80) + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_sets_safe_dac_on_failure(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": None, + }.get(key) + + mgr = StrobeCalibrationManager(cm) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr._calibrate = Mock(return_value=(False, -1, -1)) + dac_calls = [] + mgr._set_dac = lambda v: dac_calls.append(v) + + result = await mgr.start_calibration(led_type="v3") + assert result["state"] == "failed" + assert 0x96 in dac_calls + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_always_closes_hardware(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": None, + }.get(key) + + mgr = StrobeCalibrationManager(cm) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr._calibrate = Mock(side_effect=RuntimeError("kaboom")) + + result = await mgr.start_calibration(led_type="v3") + assert result["status"] == "error" + mgr._close_hardware.assert_called_once() + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_status_transitions(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.side_effect = lambda key=None: { + "gs_config.strobing.kConnectionBoardVersion": "3", + "gs_config.strobing.kDAC_setting": None, + }.get(key) + cm.set_config.return_value = (True, "ok", False) + + mgr = StrobeCalibrationManager(cm) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr._calibrate = Mock(return_value=(True, 0x80, 9.5)) + + assert mgr.status["state"] == "idle" + await mgr.start_calibration(led_type="v3") + assert mgr.status["state"] == "complete" + assert mgr.status["progress"] == 100 + + +class TestCancel: + """cancel sets the flag and resets status""" + + def test_cancel_sets_flag(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr.status["state"] = "calibrating" + mgr.cancel() + assert mgr._cancel_requested is True + + def test_cancel_when_idle(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr.cancel() + # should not error, just a no-op + assert mgr._cancel_requested is True + + +class TestGetStatus: + """get_status returns a copy of the status dict""" + + def test_returns_status_copy(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr.status["state"] = "calibrating" + mgr.status["progress"] = 42 + mgr.status["message"] = "sweeping" + + result = mgr.get_status() + assert result["state"] == "calibrating" + assert result["progress"] == 42 + assert result["message"] == "sweeping" + # should be a copy + result["state"] = "mutated" + assert mgr.status["state"] == "calibrating" + + +class TestReadDiagnostics: + """read_diagnostics bundles LDO + current + raw ADC reads""" + + @pytest.mark.asyncio + async def test_returns_all_readings(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr.get_ldo_voltage = Mock(return_value=7.5) + mgr.get_led_current = Mock(return_value=9.2) + mgr._read_adc = Mock(side_effect=[1234, 2345]) + + result = await mgr.read_diagnostics() + + assert result["ldo_voltage"] == 7.5 + assert result["led_current"] == 9.2 + assert result["adc_ch0_raw"] == 1234 + assert result["adc_ch1_raw"] == 2345 + + @pytest.mark.asyncio + async def test_skips_current_when_ldo_unsafe(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr.get_ldo_voltage = Mock(return_value=3.0) + mgr._read_adc = Mock(side_effect=[100, 200]) + + result = await mgr.read_diagnostics() + + assert result["ldo_voltage"] == 3.0 + assert result["led_current"] is None + assert "unsafe" in result.get("warning", "").lower() + + @pytest.mark.asyncio + async def test_closes_hardware_on_error(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr.get_ldo_voltage = Mock(side_effect=RuntimeError("SPI gone")) + + result = await mgr.read_diagnostics() + assert result["status"] == "error" + mgr._close_hardware.assert_called_once() + + +class TestSetDacManual: + """set_dac_manual validates range and returns LDO check""" + + @pytest.mark.asyncio + async def test_rejects_out_of_range(self): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + + result = await mgr.set_dac_manual(256) + assert result["status"] == "error" + + result = await mgr.set_dac_manual(-1) + assert result["status"] == "error" + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_sets_dac_and_checks_ldo(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + dac_calls = [] + mgr._set_dac = lambda v: dac_calls.append(v) + mgr.get_ldo_voltage = Mock(return_value=7.5) + + result = await mgr.set_dac_manual(0x80) + assert result["status"] == "success" + assert 0x80 in dac_calls + assert result["ldo_voltage"] == 7.5 + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_warns_when_ldo_below_min(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr._set_dac = lambda v: None + mgr.get_ldo_voltage = Mock(return_value=3.5) + + result = await mgr.set_dac_manual(0x10) + assert "warning" in result + + +class TestGetDacStart: + """get_dac_start runs the safe-start sweep""" + + @pytest.mark.asyncio + @patch("strobe_calibration_manager.time.sleep") + async def test_returns_start_and_ldo(self, _sleep): + from strobe_calibration_manager import StrobeCalibrationManager + + mgr = StrobeCalibrationManager(Mock()) + mgr._open_hardware = Mock() + mgr._close_hardware = Mock() + mgr._find_dac_start = Mock(return_value=(99, 7.0)) + mgr._set_dac = Mock() + mgr.get_ldo_voltage = Mock(return_value=7.0) + + result = await mgr.get_dac_start() + assert result["dac_start"] == 99 + assert result["ldo_voltage"] == 7.0 + + +class TestGetSavedSettings: + """get_saved_settings reads kDAC_setting from config""" + + @pytest.mark.asyncio + async def test_returns_saved_value(self): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.return_value = 150 + + mgr = StrobeCalibrationManager(cm) + result = await mgr.get_saved_settings() + assert result["dac_setting"] == 150 + + @pytest.mark.asyncio + async def test_returns_none_when_unset(self): + from strobe_calibration_manager import StrobeCalibrationManager + + cm = Mock() + cm.get_config.return_value = None + + mgr = StrobeCalibrationManager(cm) + result = await mgr.get_saved_settings() + assert result["dac_setting"] is None diff --git a/docs/assets/images/enclosure_assembly/PiTrac_V3_Connection_Guide.svg b/docs/assets/images/enclosure_assembly/PiTrac_V3_Connection_Guide.svg new file mode 100644 index 00000000..e26cf64a --- /dev/null +++ b/docs/assets/images/enclosure_assembly/PiTrac_V3_Connection_Guide.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/docs/hardware/parts-list.md b/docs/hardware/parts-list.md index b04f2810..cb2bfcf6 100644 --- a/docs/hardware/parts-list.md +++ b/docs/hardware/parts-list.md @@ -18,10 +18,10 @@ This document provides a comprehensive list of all components needed to build a | Quantity | Hardware | Purpose | Link | |----------|----------|---------|------| -| 1-2 | Raspberry Pi 5 (8 GB recommended) | Main embedded computer | https://vilros.com/products/raspberry-pi-5?variant=40065551302750 -| 1-2 | Active Cooler Kit | Required to keep temps low and timing consistent | https://a.co/d/dsl7saU -| 1-2 | MicroSD card (64 GB recommended) | For RPi5 filesystem | https://www.amazon.com/Amazon-Basics-microSDXC-Memory-Adapter/dp/B08TJTB8XS -| 1-2 | 1ft USB-C to USB-C Cable | For powering RPi5 from connector board | https://www.amazon.com/Anker-Charging-MacBook-Samsung-Nintendo/dp/B09H2DMR4K +| 1 | Raspberry Pi 5 (8 GB recommended) | Main embedded computer | https://vilros.com/products/raspberry-pi-5?variant=40065551302750 +| 1 | Active Cooler Kit | Required to keep temps low and timing consistent | https://a.co/d/dsl7saU +| 1 | MicroSD card (64 GB recommended) | For RPi5 filesystem | https://www.amazon.com/Amazon-Basics-microSDXC-Memory-Adapter/dp/B08TJTB8XS +| 1 | 1ft USB-C to USB-C Cable | For powering RPi5 from connector board | https://www.amazon.com/Anker-Charging-MacBook-Samsung-Nintendo/dp/B09H2DMR4K ## Camera and Lighting Hardware diff --git a/docs/hardware/pcb-assembly.md b/docs/hardware/pcb-assembly.md index 938f97c5..91c9dea7 100644 --- a/docs/hardware/pcb-assembly.md +++ b/docs/hardware/pcb-assembly.md @@ -217,7 +217,7 @@ After assembly, must run current calibration before you will be able to capture ### Pi GPIO -- **J3:** 8-pin GPIO header for control signals (see assembly guide for pinout) +- **J3:** 8-pin GPIO header for control signals (see below for Connection Guide for Raspberry Pi 5) ### LED Output @@ -228,6 +228,31 @@ After assembly, must run current calibration before you will be able to capture - **J6 (USB-A):** Originally for LED strip, but Pi5 has USB 2.0 ports already - this is redundant +## Connection Guide for Raspberry Pi 5 + +| V3 Connector Board | Raspberry Pi 5 | +|----------|----------| +| GND | GND (Pin 39) | +| DIAG | GPIO 10 (Pin 19) | +| CS0 | GPIO 18 (Pin 12) | +| MOSI | GPIO 20 (Pin 38) | +| MISO | GPIO 19 (Pin 35) | +| CLK | GPIO 21 (Pin 40) | +| CS1 | GPIO 17 (Pin 11) | +| V3P3 | 3V3 (Pin 1) | + +| Global Shutter Camera 2 | Raspberry Pi 5 | +|----------|----------| +| Trig+ | GPIO 25 (Pin 22) | +| Trig- | GND (Pin 20) | + +| V3 Connector Board | V3 IRLED Board | +|----------|----------| +| VIR+ | VIR+ | +| VIR- | VIR- | + + ![PiTrac V3 Connection Guide]({{ '/assets/images/enclosure_assembly/PiTrac_V3_Connection_Guide.svg' | relative_url }}) + ## Configuring PiTrac You'll need to tell PiTrac which version of the Connection Board you are using. This is done in the Configuration screen in the UI. diff --git a/packaging/build.sh b/packaging/build.sh index 1d61539f..d9ee8c71 100755 --- a/packaging/build.sh +++ b/packaging/build.sh @@ -99,7 +99,7 @@ build_dev() { # Core libraries (libcamera-dev pulls in correct runtime version) for pkg in libcamera-dev libcamera-tools libfmt-dev libssl-dev \ - libmsgpack-cxx-dev \ + libmsgpack-cxx-dev liblgpio-dev \ libapr1 libaprutil1 libapr1-dev libaprutil1-dev; do if ! dpkg -l | grep -q "^ii $pkg"; then missing_deps+=("$pkg") @@ -145,7 +145,7 @@ build_dev() { fi done - # Python runtime dependencies for CLI tool + # Python runtime dependencies for pkg in python3 python3-pip python3-yaml python3-opencv python3-numpy; do if ! dpkg -l | grep -q "^ii $pkg"; then missing_deps+=("$pkg") @@ -287,6 +287,9 @@ build_dev() { log_info "Force rebuild requested - cleaning build directory..." rm -rf build fi + + # Add back from PostINT for hardening + create_pkgconfig_files # Only run meson setup if build directory doesn't exist or force rebuild was requested if [[ ! -d "build" ]] || [[ "$FORCE_REBUILD" == "true" ]] || [[ "$FORCE_REBUILD" == "force" ]]; then @@ -487,6 +490,9 @@ EOF INSTALL_USER="${SUDO_USER:-$(whoami)}" + # Add back from PostINT for hardening + usermod -a -G video,gpio,i2c,spi,dialout "$INSTALL_USER" 2>/dev/null || true + # Install Python web server (always update) log_info "Installing/Updating PiTrac web server..." WEB_SERVER_DIR="$REPO_ROOT/Software/web-server" diff --git a/packaging/scripts/configure-cameras.sh b/packaging/scripts/configure-cameras.sh index 0422461e..be330e34 100755 --- a/packaging/scripts/configure-cameras.sh +++ b/packaging/scripts/configure-cameras.sh @@ -148,10 +148,9 @@ configure_boot_config() { log_info " Slot 2 type: ${slot2_type:-none}" log_info " Has InnoMaker: $has_innomaker" - # Skip if no cameras detected + # Issue warning if no cameras detected, but continue to configure base system parameters if [[ "$num_cameras" -eq 0 ]]; then - log_warn "No cameras detected, skipping config.txt configuration" - return 0 + log_warn "No cameras detected. Camera-specific overlays will be skipped, but base system parameters will be configured." fi backup_config_txt "$config_path" @@ -188,6 +187,13 @@ dtparam=spi=on" log_info " dtparam=spi=on already exists, skipping" fi + if ! grep -q "^dtoverlay=spi1-2cs" "$config_path"; then + config_block="$config_block +dtoverlay=spi1-2cs" + else + log_info " dtoverlay=spi1-2cs already exists, skipping" + fi + if ! grep -q "^force_turbo=" "$config_path"; then config_block="$config_block force_turbo=1" @@ -347,48 +353,43 @@ main() { log_info "Detecting connected cameras..." local camera_json + local num_cameras=0 - if camera_json=$(sudo python3 /usr/lib/pitrac/web-server/camera_detector.py --json 2>/dev/null); then - if echo "$camera_json" | python3 -c "import sys, json; data=json.load(sys.stdin); sys.exit(0 if data.get('success', False) else 1)" 2>/dev/null; then - local num_cameras=$(echo "$camera_json" | python3 -c "import sys, json; data=json.load(sys.stdin); print(len(data.get('cameras', [])))") + # camera_detector.py exits non-zero when no cameras found, so ignore exit code + camera_json=$(sudo python3 /usr/lib/pitrac/web-server/camera_detector.py --json 2>/dev/null) || true - if [[ "$num_cameras" -eq 0 ]]; then - log_warn "No cameras detected - skipping camera configuration" - log_info "Camera configuration can be done manually later if needed" - exit 0 - fi - - log_success "Successfully detected ${num_cameras} camera(s)" + if echo "$camera_json" | python3 -c "import sys, json; json.load(sys.stdin)" 2>/dev/null; then + num_cameras=$(echo "$camera_json" | python3 -c "import sys, json; data=json.load(sys.stdin); print(len(data.get('cameras', [])))") + if [[ "$num_cameras" -gt 0 ]]; then + log_success "Detected ${num_cameras} camera(s)" echo "$camera_json" | python3 -c " import sys, json data = json.load(sys.stdin) for cam in data.get('cameras', []): print(f\" Camera {cam['index']}: {cam['description']} on {cam['port']} (Type {cam['pitrac_type']})\") " + else + log_warn "No cameras detected - camera overlays will be skipped" + fi + else + log_warn "Camera detection returned no usable output" + camera_json='{"cameras":[]}' + fi - configure_boot_config "$camera_json" - - if [[ -n "${SUDO_USER:-}" ]]; then - user_home=$(eval echo ~${SUDO_USER}) - else - user_home="${HOME}" - fi - configure_user_settings "$camera_json" "${user_home}/.pitrac/config/user_settings.json" - - log_success "Camera configuration completed successfully" - log_warn "Please reboot the system for camera configuration to take effect" + configure_boot_config "$camera_json" + if [[ "$num_cameras" -gt 0 ]]; then + if [[ -n "${SUDO_USER:-}" ]]; then + user_home=$(eval echo ~${SUDO_USER}) else - log_error "Camera detection failed" - log_info "You can manually configure cameras later if needed" - exit 0 + user_home="${HOME}" fi - else - log_warn "Could not run camera detection - skipping camera configuration" - log_info "This may be normal on non-Pi systems or if cameras are not connected" - exit 0 + configure_user_settings "$camera_json" "${user_home}/.pitrac/config/user_settings.json" fi + + log_success "Configuration completed successfully" + log_warn "Please reboot the system for changes to take effect" } main "$@" \ No newline at end of file diff --git a/packaging/src/lib/pitrac-common-functions.sh b/packaging/src/lib/pitrac-common-functions.sh index 4b88c252..2cc130d0 100755 --- a/packaging/src/lib/pitrac-common-functions.sh +++ b/packaging/src/lib/pitrac-common-functions.sh @@ -197,35 +197,6 @@ EOF fi log_success "Created lgpio.pc" fi - - # Create msgpack-cxx.pc if it doesn't exist - if [[ ! -f /usr/lib/pkgconfig/msgpack-cxx.pc ]]; then - log_info "Creating msgpack-cxx.pc pkg-config file..." - if [[ -n "$need_sudo" ]]; then - sudo tee /usr/lib/pkgconfig/msgpack-cxx.pc > /dev/null << 'EOF' -prefix=/usr -exec_prefix=${prefix} -includedir=${prefix}/include - -Name: msgpack-cxx -Description: MessagePack implementation for C++ -Version: 4.1.3 -Cflags: -I${includedir} -EOF - else - cat > /usr/lib/pkgconfig/msgpack-cxx.pc << 'EOF' -prefix=/usr -exec_prefix=${prefix} -includedir=${prefix}/include - -Name: msgpack-cxx -Description: MessagePack implementation for C++ -Version: 4.1.3 -Cflags: -I${includedir} -EOF - fi - log_success "Created msgpack-cxx.pc" - fi } # Get the actual user (not root) who is installing @@ -468,7 +439,16 @@ install_python_dependencies() { fi log_info "Installing Python dependencies for web server..." - + + # gpiozero (in requirements.txt) needs these system GPIO backends on Raspberry Pi OS. + # They're not available on PyPI so we install them via apt. + for pkg in python3-lgpio python3-rpi-lgpio; do + if ! dpkg -l | grep -q "^ii $pkg"; then + log_info "Installing system GPIO backend: $pkg" + INITRD=No apt-get install -y "$pkg" 2>/dev/null || log_warn "Could not install $pkg" + fi + done + if [[ $EUID -eq 0 ]]; then if pip3 install -r "$web_server_dir/requirements.txt" --break-system-packages --ignore-installed 2>/dev/null; then log_success "Python dependencies installed successfully" @@ -718,16 +698,12 @@ install_dependencies_from_apt() { # ======================================================================== # PiTrac custom dependency packages (from pitraclm.github.io/packages) # ======================================================================== - # Comment out a line to skip installing that package from the PiTrac repo - # and use the system version instead (if available). Useful for testing - # whether a system-provided package works as a drop-in replacement. - # - # Example: To test with system lgpio instead of custom, comment out - # the liblgpio1 and liblgpio-dev lines below, then run a fresh build. + # lgpio is NOT here — system liblgpio1 is used instead. + # python3-lgpio/python3-rpi-lgpio depend on the RPi Foundation version + # and break if a custom build with a different version string is installed. + # liblgpio-dev is in the system deps block in build.sh. # ======================================================================== local packages=( - "liblgpio1" # GPIO library - "liblgpio-dev" # GPIO development headers "libmsgpack-cxx-dev" # MessagePack C++ (header-only) "libactivemq-cpp" # ActiveMQ C++ client runtime "libactivemq-cpp-dev" # ActiveMQ C++ client headers