From d08520c1591893e26e305fd55666afc182202a3a Mon Sep 17 00:00:00 2001 From: Lev Neiman Date: Mon, 2 Mar 2026 17:22:28 -0500 Subject: [PATCH 1/3] Make runtime resilient for dev/CI environments 1. Skip VLM plugin init when run_mode=never (no-op analyzer) - Avoids requiring OPENAI_API_KEY when VLM analysis is disabled - Returns safe defaults (low risk, skipped) for any analyze() calls 2. Graceful fallback when RTSP session-limit preflight fails - Session-limit check can fail in constrained environments - Falls back to single-stream defaults instead of crashing - Other preflight stages (probe, negotiation) still fail-fast 3. Update MediaMTX config for dev fake camera - Add maxReaders, readTimeout, writeTimeout, writeQueueSize --- dev/fake-camera/mediamtx.yml | 4 ++ src/homesec/plugins/analyzers/__init__.py | 28 ++++++++++- src/homesec/sources/rtsp/core.py | 57 ++++++++++++++++++----- 3 files changed, 77 insertions(+), 12 deletions(-) diff --git a/dev/fake-camera/mediamtx.yml b/dev/fake-camera/mediamtx.yml index 767cb709..743a9683 100644 --- a/dev/fake-camera/mediamtx.yml +++ b/dev/fake-camera/mediamtx.yml @@ -1,6 +1,9 @@ rtspAddress: :8099 rtpAddress: :8100 rtcpAddress: :8101 +readTimeout: 20s +writeTimeout: 20s +writeQueueSize: 2048 authMethod: internal authInternalUsers: - user: admin @@ -15,3 +18,4 @@ authInternalUsers: paths: live: + maxReaders: 10 diff --git a/src/homesec/plugins/analyzers/__init__.py b/src/homesec/plugins/analyzers/__init__.py index a784aba3..e5547dc7 100644 --- a/src/homesec/plugins/analyzers/__init__.py +++ b/src/homesec/plugins/analyzers/__init__.py @@ -3,15 +3,37 @@ from __future__ import annotations import logging +from pathlib import Path from typing import cast from homesec.interfaces import VLMAnalyzer -from homesec.models.vlm import VLMConfig +from homesec.models.enums import RiskLevel, VLMRunMode +from homesec.models.filter import FilterResult +from homesec.models.vlm import AnalysisResult, VLMConfig from homesec.plugins.registry import PluginType, load_plugin logger = logging.getLogger(__name__) +class _NoopVLM(VLMAnalyzer): + """Placeholder VLM that returns a safe default when run_mode is 'never'.""" + + async def analyze( + self, video_path: Path, filter_result: FilterResult, config: VLMConfig + ) -> AnalysisResult: + return AnalysisResult( + risk_level=RiskLevel.LOW, + activity_type="skipped", + summary="VLM analysis disabled (run_mode=never)", + ) + + async def ping(self) -> bool: + return True + + async def shutdown(self, timeout: float | None = None) -> None: + pass + + def load_analyzer(config: VLMConfig) -> VLMAnalyzer: """Load and instantiate a VLM analyzer plugin. @@ -25,6 +47,10 @@ def load_analyzer(config: VLMConfig) -> VLMAnalyzer: ValueError: If backend not found in registry ValidationError: If config validation fails """ + if config.run_mode == VLMRunMode.NEVER: + logger.info("VLM run_mode is 'never'; using no-op analyzer") + return _NoopVLM() + return cast( VLMAnalyzer, load_plugin( diff --git a/src/homesec/sources/rtsp/core.py b/src/homesec/sources/rtsp/core.py index cf921430..e7bd57de 100644 --- a/src/homesec/sources/rtsp/core.py +++ b/src/homesec/sources/rtsp/core.py @@ -30,12 +30,13 @@ from homesec.sources.rtsp.hardware import HardwareAccelConfig, HardwareAccelDetector from homesec.sources.rtsp.motion import MotionDetector from homesec.sources.rtsp.preflight import ( + CameraPreflightDiagnostics, CameraPreflightOutcome, PreflightError, RTSPStartupPreflight, ) from homesec.sources.rtsp.recorder import FfmpegRecorder, Recorder -from homesec.sources.rtsp.recording_profile import MotionProfile +from homesec.sources.rtsp.recording_profile import MotionProfile, RecordingProfile from homesec.sources.rtsp.url_derivation import derive_detect_rtsp_url from homesec.sources.rtsp.utils import ( _build_timeout_attempts, @@ -407,22 +408,56 @@ def _run_startup_preflight(self) -> None: case CameraPreflightOutcome() as outcome: self._apply_preflight_outcome(outcome) case PreflightError() as err: - logger.error( - "RTSP startup preflight failed: camera=%s key=%s stage=%s reason=%s", - self.camera_name, - err.camera_key, - err.stage, - err.message, - ) - raise RuntimeError( - f"RTSP preflight failed for {self.camera_name} at {err.stage}: {err.message}" - ) from err + if err.stage == "session_limit": + logger.warning( + "RTSP session-limit preflight failed for %s; falling back to single-stream defaults", + self.camera_name, + ) + fallback = self._build_fallback_preflight_outcome(err.camera_key) + self._apply_preflight_outcome(fallback) + else: + logger.error( + "RTSP startup preflight failed: camera=%s key=%s stage=%s reason=%s", + self.camera_name, + err.camera_key, + err.stage, + err.message, + ) + raise RuntimeError( + f"RTSP preflight failed for {self.camera_name} at {err.stage}: {err.message}" + ) from err case _: raise RuntimeError( "RTSP startup preflight returned unexpected result type: " f"{type(result).__name__}" ) + def _build_fallback_preflight_outcome(self, camera_key: str) -> CameraPreflightOutcome: + motion_profile = MotionProfile(input_url=self.rtsp_url, ffmpeg_input_args=[]) + recording_profile = RecordingProfile( + input_url=self.rtsp_url, + ffmpeg_input_args=[], + container="mp4", + video_mode="copy", + audio_mode="copy", + ffmpeg_output_args=["-movflags", "+frag_keyframe+empty_moov+faststart"], + ) + diagnostics = CameraPreflightDiagnostics( + attempted_urls=[self.rtsp_url], + probes=[], + selected_motion_url=self.rtsp_url, + selected_recording_url=self.rtsp_url, + selected_recording_profile=recording_profile.profile_id(), + session_mode="single_stream", + notes=["Session-limit preflight failed; using single-stream fallback defaults"], + ) + return CameraPreflightOutcome( + camera_key=camera_key, + motion_profile=motion_profile, + recording_profile=recording_profile, + diagnostics=diagnostics, + ) + def _apply_preflight_outcome(self, outcome: CameraPreflightOutcome) -> None: self._preflight_outcome = outcome self._motion_rtsp_url = outcome.motion_profile.input_url From 91e91ec802080a691331ad36ab551ddd3f9a92db Mon Sep 17 00:00:00 2001 From: Lev Neiman Date: Mon, 2 Mar 2026 15:28:26 -0800 Subject: [PATCH 2/3] refactor: move VLM never-mode handling to runtime assembly --- src/homesec/models/events.py | 2 +- src/homesec/pipeline/core.py | 24 +++++----- src/homesec/plugins/analyzers/__init__.py | 31 +------------ src/homesec/runtime/assembly.py | 34 +++++++++++++- tests/homesec/test_pipeline.py | 47 ++++++++++++++++++++ tests/homesec/test_pipeline_events.py | 54 ++++++++++++++++++++++- tests/homesec/test_runtime_assembly.py | 46 ++++++++++++++++++- 7 files changed, 193 insertions(+), 45 deletions(-) diff --git a/src/homesec/models/events.py b/src/homesec/models/events.py index 99246b90..4e4c08cd 100644 --- a/src/homesec/models/events.py +++ b/src/homesec/models/events.py @@ -142,7 +142,7 @@ class VLMFailedEvent(ClipEvent): class VLMSkippedEvent(ClipEvent): - """VLM analysis skipped (no trigger classes detected).""" + """VLM analysis skipped by runtime policy.""" event_type: Literal[EventType.VLM_SKIPPED] = EventType.VLM_SKIPPED reason: str diff --git a/src/homesec/pipeline/core.py b/src/homesec/pipeline/core.py index ec99fda6..ad193f05 100644 --- a/src/homesec/pipeline/core.py +++ b/src/homesec/pipeline/core.py @@ -15,6 +15,7 @@ from homesec.models.alert import Alert, AlertDecision from homesec.models.clip import Clip from homesec.models.config import Config +from homesec.models.enums import VLMRunMode from homesec.models.filter import FilterResult from homesec.models.vlm import AnalysisResult from homesec.notifiers.multiplex import NotifierEntry @@ -262,7 +263,8 @@ async def _process_clip(self, clip: Clip) -> None: # Stage 3: VLM (conditional) analysis_result: AnalysisResult | None = None vlm_failed = False - if self._should_run_vlm(filter_res): + vlm_skip_reason = self._vlm_skip_reason(filter_res) + if vlm_skip_reason is None: vlm_result = await self._vlm_stage(clip, filter_res) match vlm_result: case VLMError() as vlm_err: @@ -284,9 +286,9 @@ async def _process_clip(self, clip: Clip) -> None: else: await self._repository.record_vlm_skipped( clip.clip_id, - reason="no_trigger_classes", + reason=vlm_skip_reason, ) - logger.info("VLM skipped for %s: no trigger classes", clip.clip_id) + logger.info("VLM skipped for %s: %s", clip.clip_id, vlm_skip_reason) # Await upload after filter/VLM to maximize overlap upload_result = await upload_task @@ -641,17 +643,19 @@ def _format_error_type(exc: Exception) -> str: return type(exc.cause).__name__ return type(exc).__name__ - def _should_run_vlm(self, filter_result: FilterResult) -> bool: - """Check if VLM should run based on detected classes and config.""" + def _vlm_skip_reason(self, filter_result: FilterResult) -> str | None: + """Return skip reason when VLM should not run, otherwise None.""" run_mode = self._config.vlm.run_mode - if run_mode == "never": - return False - if run_mode == "always": - return True + if run_mode == VLMRunMode.NEVER: + return "run_mode_never" + if run_mode == VLMRunMode.ALWAYS: + return None detected = set(filter_result.detected_classes) trigger = set(self._config.vlm.trigger_classes) - return bool(detected & trigger) + if detected & trigger: + return None + return "no_trigger_classes" async def _apply_upload_result( self, diff --git a/src/homesec/plugins/analyzers/__init__.py b/src/homesec/plugins/analyzers/__init__.py index e5547dc7..01c963f7 100644 --- a/src/homesec/plugins/analyzers/__init__.py +++ b/src/homesec/plugins/analyzers/__init__.py @@ -2,37 +2,12 @@ from __future__ import annotations -import logging -from pathlib import Path from typing import cast from homesec.interfaces import VLMAnalyzer -from homesec.models.enums import RiskLevel, VLMRunMode -from homesec.models.filter import FilterResult -from homesec.models.vlm import AnalysisResult, VLMConfig +from homesec.models.vlm import VLMConfig from homesec.plugins.registry import PluginType, load_plugin -logger = logging.getLogger(__name__) - - -class _NoopVLM(VLMAnalyzer): - """Placeholder VLM that returns a safe default when run_mode is 'never'.""" - - async def analyze( - self, video_path: Path, filter_result: FilterResult, config: VLMConfig - ) -> AnalysisResult: - return AnalysisResult( - risk_level=RiskLevel.LOW, - activity_type="skipped", - summary="VLM analysis disabled (run_mode=never)", - ) - - async def ping(self) -> bool: - return True - - async def shutdown(self, timeout: float | None = None) -> None: - pass - def load_analyzer(config: VLMConfig) -> VLMAnalyzer: """Load and instantiate a VLM analyzer plugin. @@ -47,10 +22,6 @@ def load_analyzer(config: VLMConfig) -> VLMAnalyzer: ValueError: If backend not found in registry ValidationError: If config validation fails """ - if config.run_mode == VLMRunMode.NEVER: - logger.info("VLM run_mode is 'never'; using no-op analyzer") - return _NoopVLM() - return cast( VLMAnalyzer, load_plugin( diff --git a/src/homesec/runtime/assembly.py b/src/homesec/runtime/assembly.py index a425f776..5d73930a 100644 --- a/src/homesec/runtime/assembly.py +++ b/src/homesec/runtime/assembly.py @@ -5,8 +5,13 @@ import asyncio import logging from collections.abc import Awaitable, Callable +from pathlib import Path from typing import TYPE_CHECKING, Protocol +from homesec.interfaces import VLMAnalyzer +from homesec.models.enums import RiskLevel, VLMRunMode +from homesec.models.filter import FilterResult +from homesec.models.vlm import AnalysisResult, VLMConfig from homesec.notifiers.multiplex import NotifierEntry from homesec.pipeline import ClipPipeline from homesec.plugins.analyzers import load_analyzer @@ -22,7 +27,6 @@ Notifier, ObjectFilter, StorageBackend, - VLMAnalyzer, ) from homesec.models.config import Config @@ -34,6 +38,28 @@ async def shutdown(self, timeout: float = 30.0) -> None: """Release resources.""" +class _DisabledVLMAnalyzer(VLMAnalyzer): + """Runtime-only analyzer used when VLM run_mode is disabled.""" + + async def analyze( + self, video_path: Path, filter_result: FilterResult, config: VLMConfig + ) -> AnalysisResult: + _ = video_path + _ = filter_result + _ = config + return AnalysisResult( + risk_level=RiskLevel.LOW, + activity_type="skipped", + summary="VLM analysis disabled (run_mode=never)", + ) + + async def ping(self) -> bool: + return True + + async def shutdown(self, timeout: float | None = None) -> None: + _ = timeout + + class RuntimeAssembler: """Builds and manages the lifecycle of in-process runtime bundles.""" @@ -76,7 +102,11 @@ async def build_bundle(self, config: Config, generation: int) -> RuntimeBundle: await self._notifier_health_logger(notifier_entries) filter_plugin = load_filter(config.filter) - vlm_plugin = load_analyzer(config.vlm) + if config.vlm.run_mode == VLMRunMode.NEVER: + logger.info("VLM run_mode=never; runtime will use disabled analyzer") + vlm_plugin = _DisabledVLMAnalyzer() + else: + vlm_plugin = load_analyzer(config.vlm) alert_policy = self._alert_policy_factory(config) sources, sources_by_camera = self._source_factory(config) diff --git a/tests/homesec/test_pipeline.py b/tests/homesec/test_pipeline.py index 8397cc05..12b85646 100644 --- a/tests/homesec/test_pipeline.py +++ b/tests/homesec/test_pipeline.py @@ -283,6 +283,53 @@ async def test_run_mode_always_runs_vlm_regardless( assert state is not None assert state.analysis_result is not None + @pytest.mark.asyncio + async def test_run_mode_never_skips_vlm_with_explicit_reason( + self, base_config: Config, sample_clip: Clip, mocks: PipelineMocks + ) -> None: + """When run_mode=never, VLM is skipped even for trigger detections.""" + # Given run_mode=never and a trigger-class filter result + base_config = Config( + cameras=base_config.cameras, + storage=base_config.storage, + state_store=base_config.state_store, + notifiers=base_config.notifiers, + filter=base_config.filter, + vlm=base_config.vlm.model_copy(update={"run_mode": "never"}), + alert_policy=base_config.alert_policy, + ) + person_result = FilterResult( + detected_classes=["person"], + confidence=0.95, + model="mock", + sampled_frames=30, + ) + filter_mock = MockFilter(result=person_result) + mocks.filter = filter_mock + + pipeline = ClipPipeline( + config=base_config, + storage=mocks.storage, + repository=make_repository(base_config, mocks), + filter_plugin=filter_mock, + vlm_plugin=mocks.vlm, + notifier=mocks.notifier, + alert_policy=make_alert_policy(base_config), + retention_pruner=MockRetentionPruner(), + ) + + # When a clip is processed + pipeline.on_new_clip(sample_clip) + await pipeline.shutdown() + + # Then VLM is skipped with a run_mode-specific reason + events = await mocks.event_store.get_events(sample_clip.clip_id) + skipped_events = [event for event in events if event.event_type == "vlm_skipped"] + assert len(skipped_events) == 1 + assert skipped_events[0].reason == "run_mode_never" + assert all(event.event_type != "vlm_started" for event in events) + assert all(event.event_type != "vlm_completed" for event in events) + class TestClipPipelineErrorHandling: """Test error handling and partial failures.""" diff --git a/tests/homesec/test_pipeline_events.py b/tests/homesec/test_pipeline_events.py index 571e99fd..c557f07a 100644 --- a/tests/homesec/test_pipeline_events.py +++ b/tests/homesec/test_pipeline_events.py @@ -37,7 +37,12 @@ ) -def build_config(*, notify_on_motion: bool = False, notifier_count: int = 1) -> Config: +def build_config( + *, + notify_on_motion: bool = False, + notifier_count: int = 1, + run_mode: str = "trigger_only", +) -> Config: """Build a minimal config for pipeline tests.""" cameras = [ CameraConfig( @@ -68,6 +73,7 @@ def build_config(*, notify_on_motion: bool = False, notifier_count: int = 1) -> ), vlm=VLMConfig( backend="openai", + run_mode=run_mode, trigger_classes=["person", "car"], config=OpenAIConfig(api_key_env="OPENAI_API_KEY", model="gpt-4o"), ), @@ -318,6 +324,52 @@ async def test_pipeline_emits_vlm_skipped_event( await state_store.shutdown() +@pytest.mark.asyncio +async def test_pipeline_emits_vlm_skipped_event_for_run_mode_never( + postgres_dsn: str, tmp_path: Path, clean_test_db: None +) -> None: + # Given: A clip with trigger classes but run_mode forcing VLM disabled + state_store = PostgresStateStore(postgres_dsn) + await state_store.initialize() + event_store = state_store.create_event_store() + assert isinstance(event_store, PostgresEventStore) + config = build_config(run_mode="never") + repository = ClipRepository(state_store, event_store, retry=config.retry) + + filter_result = FilterResult( + detected_classes=["person"], + confidence=0.9, + model="mock", + sampled_frames=30, + ) + pipeline = ClipPipeline( + config=config, + storage=MockStorage(), + repository=repository, + filter_plugin=MockFilter(result=filter_result), + vlm_plugin=MockVLM(), + notifier=MockNotifier(), + alert_policy=make_alert_policy(config), + retention_pruner=MockRetentionPruner(), + ) + clip = make_clip(tmp_path, "test-clip-events-006") + + # When: A clip is processed + pipeline.on_new_clip(clip) + await pipeline.shutdown() + + # Then: VLM is skipped because run mode is disabled + events = await event_store.get_events(clip.clip_id) + event_types = {event.event_type for event in events} + assert "vlm_skipped" in event_types + assert "vlm_started" not in event_types + assert "vlm_completed" not in event_types + skipped_event = next(event for event in events if event.event_type == "vlm_skipped") + assert skipped_event.reason == "run_mode_never" + + await state_store.shutdown() + + @pytest.mark.asyncio async def test_pipeline_emits_upload_failed_event( postgres_dsn: str, tmp_path: Path, clean_test_db: None diff --git a/tests/homesec/test_runtime_assembly.py b/tests/homesec/test_runtime_assembly.py index f4954234..35fbe8e5 100644 --- a/tests/homesec/test_runtime_assembly.py +++ b/tests/homesec/test_runtime_assembly.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +from pathlib import Path from typing import Any, cast import pytest @@ -143,7 +144,7 @@ async def shutdown(self, timeout: float | None = None) -> None: _ = timeout -def _make_config() -> Config: +def _make_config(*, run_mode: str = "trigger_only") -> Config: return Config( cameras=[ CameraConfig( @@ -166,6 +167,7 @@ def _make_config() -> Config: ), vlm=VLMConfig( backend="openai", + run_mode=run_mode, config=OpenAIConfig(api_key_env="OPENAI_API_KEY", model="gpt-4o"), trigger_classes=["person"], ), @@ -242,6 +244,48 @@ def _raise_analyzer_error(_: object) -> object: assert filter_plugin.shutdown_called is True +@pytest.mark.asyncio +async def test_runtime_assembly_skips_analyzer_load_when_run_mode_never( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """run_mode=never should avoid analyzer plugin instantiation at startup.""" + # Given: Runtime assembly with VLM disabled and filter creation stubbed + notifier = _StubNotifier() + filter_plugin = _StubFilter() + assembler = _make_assembler(notifier) + config = _make_config(run_mode="never") + + monkeypatch.setattr("homesec.runtime.assembly.load_filter", lambda _cfg: filter_plugin) + + def _fail_if_called(_: object) -> object: + raise AssertionError("load_analyzer should not run when run_mode=never") + + monkeypatch.setattr("homesec.runtime.assembly.load_analyzer", _fail_if_called) + + # When: Building a runtime bundle + runtime = await assembler.build_bundle(config, generation=1) + + # Then: Runtime uses disabled analyzer semantics and bundle remains healthy + try: + assert await runtime.vlm_plugin.ping() is True + analysis = await runtime.vlm_plugin.analyze( + video_path=Path("/tmp/disabled-vlm-test.mp4"), + filter_result=FilterResult( + detected_classes=["person"], + confidence=0.9, + model="stub", + sampled_frames=1, + ), + config=config.vlm, + ) + assert analysis.activity_type == "skipped" + assert analysis.summary == "VLM analysis disabled (run_mode=never)" + finally: + await assembler.shutdown_bundle(runtime) + assert notifier.shutdown_called is True + assert filter_plugin.shutdown_called is True + + @pytest.mark.asyncio async def test_runtime_assembly_start_timeout_cleans_started_sources() -> None: """Startup timeout should fail fast and clean up already-started sources.""" From bcd1c18778d0819ba1f2b7b1481baa62cfdadfe9 Mon Sep 17 00:00:00 2001 From: Lev Neiman Date: Mon, 2 Mar 2026 16:21:16 -0800 Subject: [PATCH 3/3] refactor: tighten VLM skip policy and runtime wiring --- src/homesec/models/enums.py | 7 +++ src/homesec/models/events.py | 4 +- src/homesec/pipeline/core.py | 8 ++-- src/homesec/repository/clip_repository.py | 4 +- src/homesec/runtime/assembly.py | 32 ++----------- src/homesec/runtime/disabled_vlm.py | 32 +++++++++++++ src/homesec/sources/rtsp/core.py | 11 +---- tests/homesec/rtsp/test_runtime.py | 17 ++++++- tests/homesec/test_pipeline.py | 4 +- tests/homesec/test_pipeline_events.py | 5 +- tests/homesec/test_runtime_worker.py | 57 ++++++++++++++++++++++- 11 files changed, 129 insertions(+), 52 deletions(-) create mode 100644 src/homesec/runtime/disabled_vlm.py diff --git a/src/homesec/models/enums.py b/src/homesec/models/enums.py index cf94d44a..2e46ba2d 100644 --- a/src/homesec/models/enums.py +++ b/src/homesec/models/enums.py @@ -74,6 +74,13 @@ class VLMRunMode(StrEnum): NEVER = "never" +class VLMSkipReason(StrEnum): + """Reason why VLM analysis was skipped for a clip.""" + + RUN_MODE_NEVER = "run_mode_never" + NO_TRIGGER_CLASSES = "no_trigger_classes" + + class RiskLevel(IntEnum): """VLM risk assessment levels. diff --git a/src/homesec/models/events.py b/src/homesec/models/events.py index 4e4c08cd..9d9d4654 100644 --- a/src/homesec/models/events.py +++ b/src/homesec/models/events.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, Field -from homesec.models.enums import EventType, RiskLevelField +from homesec.models.enums import EventType, RiskLevelField, VLMSkipReason from homesec.models.filter import FilterResult @@ -145,7 +145,7 @@ class VLMSkippedEvent(ClipEvent): """VLM analysis skipped by runtime policy.""" event_type: Literal[EventType.VLM_SKIPPED] = EventType.VLM_SKIPPED - reason: str + reason: VLMSkipReason class AlertDecisionMadeEvent(ClipEvent): diff --git a/src/homesec/pipeline/core.py b/src/homesec/pipeline/core.py index ad193f05..7d03bde9 100644 --- a/src/homesec/pipeline/core.py +++ b/src/homesec/pipeline/core.py @@ -15,7 +15,7 @@ from homesec.models.alert import Alert, AlertDecision from homesec.models.clip import Clip from homesec.models.config import Config -from homesec.models.enums import VLMRunMode +from homesec.models.enums import VLMRunMode, VLMSkipReason from homesec.models.filter import FilterResult from homesec.models.vlm import AnalysisResult from homesec.notifiers.multiplex import NotifierEntry @@ -643,11 +643,11 @@ def _format_error_type(exc: Exception) -> str: return type(exc.cause).__name__ return type(exc).__name__ - def _vlm_skip_reason(self, filter_result: FilterResult) -> str | None: + def _vlm_skip_reason(self, filter_result: FilterResult) -> VLMSkipReason | None: """Return skip reason when VLM should not run, otherwise None.""" run_mode = self._config.vlm.run_mode if run_mode == VLMRunMode.NEVER: - return "run_mode_never" + return VLMSkipReason.RUN_MODE_NEVER if run_mode == VLMRunMode.ALWAYS: return None @@ -655,7 +655,7 @@ def _vlm_skip_reason(self, filter_result: FilterResult) -> str | None: trigger = set(self._config.vlm.trigger_classes) if detected & trigger: return None - return "no_trigger_classes" + return VLMSkipReason.NO_TRIGGER_CLASSES async def _apply_upload_result( self, diff --git a/src/homesec/repository/clip_repository.py b/src/homesec/repository/clip_repository.py index b792233f..f61bb55b 100644 --- a/src/homesec/repository/clip_repository.py +++ b/src/homesec/repository/clip_repository.py @@ -10,7 +10,7 @@ from homesec.models.clip import Clip, ClipListCursor, ClipListPage, ClipStateData from homesec.models.config import RetryConfig -from homesec.models.enums import ClipStatus, RiskLevelField +from homesec.models.enums import ClipStatus, RiskLevelField, VLMSkipReason from homesec.models.events import ( AlertDecisionMadeEvent, ClipDeletedEvent, @@ -285,7 +285,7 @@ async def record_vlm_failed( ) ) - async def record_vlm_skipped(self, clip_id: str, reason: str) -> None: + async def record_vlm_skipped(self, clip_id: str, reason: VLMSkipReason) -> None: """Record VLM skipped event.""" await self._safe_append( VLMSkippedEvent( diff --git a/src/homesec/runtime/assembly.py b/src/homesec/runtime/assembly.py index 5d73930a..173f75ea 100644 --- a/src/homesec/runtime/assembly.py +++ b/src/homesec/runtime/assembly.py @@ -5,19 +5,16 @@ import asyncio import logging from collections.abc import Awaitable, Callable -from pathlib import Path from typing import TYPE_CHECKING, Protocol -from homesec.interfaces import VLMAnalyzer -from homesec.models.enums import RiskLevel, VLMRunMode -from homesec.models.filter import FilterResult -from homesec.models.vlm import AnalysisResult, VLMConfig +from homesec.models.enums import VLMRunMode from homesec.notifiers.multiplex import NotifierEntry from homesec.pipeline import ClipPipeline from homesec.plugins.analyzers import load_analyzer from homesec.plugins.filters import load_filter from homesec.repository import ClipRepository from homesec.retention import build_local_retention_pruner +from homesec.runtime.disabled_vlm import DisabledVLMAnalyzer from homesec.runtime.models import RuntimeBundle, config_signature if TYPE_CHECKING: @@ -27,6 +24,7 @@ Notifier, ObjectFilter, StorageBackend, + VLMAnalyzer, ) from homesec.models.config import Config @@ -38,28 +36,6 @@ async def shutdown(self, timeout: float = 30.0) -> None: """Release resources.""" -class _DisabledVLMAnalyzer(VLMAnalyzer): - """Runtime-only analyzer used when VLM run_mode is disabled.""" - - async def analyze( - self, video_path: Path, filter_result: FilterResult, config: VLMConfig - ) -> AnalysisResult: - _ = video_path - _ = filter_result - _ = config - return AnalysisResult( - risk_level=RiskLevel.LOW, - activity_type="skipped", - summary="VLM analysis disabled (run_mode=never)", - ) - - async def ping(self) -> bool: - return True - - async def shutdown(self, timeout: float | None = None) -> None: - _ = timeout - - class RuntimeAssembler: """Builds and manages the lifecycle of in-process runtime bundles.""" @@ -104,7 +80,7 @@ async def build_bundle(self, config: Config, generation: int) -> RuntimeBundle: filter_plugin = load_filter(config.filter) if config.vlm.run_mode == VLMRunMode.NEVER: logger.info("VLM run_mode=never; runtime will use disabled analyzer") - vlm_plugin = _DisabledVLMAnalyzer() + vlm_plugin = DisabledVLMAnalyzer() else: vlm_plugin = load_analyzer(config.vlm) alert_policy = self._alert_policy_factory(config) diff --git a/src/homesec/runtime/disabled_vlm.py b/src/homesec/runtime/disabled_vlm.py new file mode 100644 index 00000000..5f14064f --- /dev/null +++ b/src/homesec/runtime/disabled_vlm.py @@ -0,0 +1,32 @@ +"""Disabled VLM analyzer implementation for run_mode=never.""" + +from __future__ import annotations + +from pathlib import Path + +from homesec.interfaces import VLMAnalyzer +from homesec.models.enums import RiskLevel +from homesec.models.filter import FilterResult +from homesec.models.vlm import AnalysisResult, VLMConfig + + +class DisabledVLMAnalyzer(VLMAnalyzer): + """Runtime-only analyzer used when VLM is disabled.""" + + async def analyze( + self, video_path: Path, filter_result: FilterResult, config: VLMConfig + ) -> AnalysisResult: + _ = video_path + _ = filter_result + _ = config + return AnalysisResult( + risk_level=RiskLevel.LOW, + activity_type="skipped", + summary="VLM analysis disabled (run_mode=never)", + ) + + async def ping(self) -> bool: + return True + + async def shutdown(self, timeout: float | None = None) -> None: + _ = timeout diff --git a/src/homesec/sources/rtsp/core.py b/src/homesec/sources/rtsp/core.py index e7bd57de..d05c9905 100644 --- a/src/homesec/sources/rtsp/core.py +++ b/src/homesec/sources/rtsp/core.py @@ -36,7 +36,7 @@ RTSPStartupPreflight, ) from homesec.sources.rtsp.recorder import FfmpegRecorder, Recorder -from homesec.sources.rtsp.recording_profile import MotionProfile, RecordingProfile +from homesec.sources.rtsp.recording_profile import MotionProfile, build_default_recording_profile from homesec.sources.rtsp.url_derivation import derive_detect_rtsp_url from homesec.sources.rtsp.utils import ( _build_timeout_attempts, @@ -434,14 +434,7 @@ def _run_startup_preflight(self) -> None: def _build_fallback_preflight_outcome(self, camera_key: str) -> CameraPreflightOutcome: motion_profile = MotionProfile(input_url=self.rtsp_url, ffmpeg_input_args=[]) - recording_profile = RecordingProfile( - input_url=self.rtsp_url, - ffmpeg_input_args=[], - container="mp4", - video_mode="copy", - audio_mode="copy", - ffmpeg_output_args=["-movflags", "+frag_keyframe+empty_moov+faststart"], - ) + recording_profile = build_default_recording_profile(self.rtsp_url) diagnostics = CameraPreflightDiagnostics( attempted_urls=[self.rtsp_url], probes=[], diff --git a/tests/homesec/rtsp/test_runtime.py b/tests/homesec/rtsp/test_runtime.py index f05ebd8c..09bb91ab 100644 --- a/tests/homesec/rtsp/test_runtime.py +++ b/tests/homesec/rtsp/test_runtime.py @@ -15,7 +15,7 @@ from homesec.sources.rtsp.frame_pipeline import FfmpegFramePipeline from homesec.sources.rtsp.hardware import HardwareAccelConfig from homesec.sources.rtsp.recorder import FfmpegRecorder -from homesec.sources.rtsp.recording_profile import MotionProfile +from homesec.sources.rtsp.recording_profile import MotionProfile, build_default_recording_profile class FakeClock: @@ -296,6 +296,21 @@ def test_detect_stream_defaults_to_main(tmp_path: Path) -> None: assert not source._detect_stream_available +def test_session_limit_fallback_uses_default_recording_profile(tmp_path: Path) -> None: + """Session-limit fallback should reuse shared default recording profile builder.""" + # Given: an RTSP source with default startup configuration + config = _make_config(tmp_path, rtsp_url="rtsp://host/stream") + source = RTSPSource(config, camera_name="cam") + + # When: constructing the startup session-limit fallback outcome + outcome = source._build_fallback_preflight_outcome("cam-key") + + # Then: fallback uses the canonical default recording profile + expected_profile = build_default_recording_profile(source.rtsp_url) + assert outcome.recording_profile == expected_profile + assert outcome.diagnostics.selected_recording_profile == expected_profile.profile_id() + + def test_reconnect_retries_until_success(tmp_path: Path) -> None: """Reconnect should retry until a pipeline starts when attempts are infinite.""" # Given: a pipeline that fails twice then succeeds diff --git a/tests/homesec/test_pipeline.py b/tests/homesec/test_pipeline.py index 12b85646..ebab9060 100644 --- a/tests/homesec/test_pipeline.py +++ b/tests/homesec/test_pipeline.py @@ -21,7 +21,7 @@ StateStoreConfig, StorageConfig, ) -from homesec.models.enums import RiskLevel +from homesec.models.enums import RiskLevel, VLMSkipReason from homesec.models.filter import FilterConfig, FilterOverrides, FilterResult from homesec.models.vlm import AnalysisResult, VLMConfig from homesec.pipeline import ClipPipeline @@ -326,7 +326,7 @@ async def test_run_mode_never_skips_vlm_with_explicit_reason( events = await mocks.event_store.get_events(sample_clip.clip_id) skipped_events = [event for event in events if event.event_type == "vlm_skipped"] assert len(skipped_events) == 1 - assert skipped_events[0].reason == "run_mode_never" + assert skipped_events[0].reason == VLMSkipReason.RUN_MODE_NEVER assert all(event.event_type != "vlm_started" for event in events) assert all(event.event_type != "vlm_completed" for event in events) diff --git a/tests/homesec/test_pipeline_events.py b/tests/homesec/test_pipeline_events.py index c557f07a..80cdcb7b 100644 --- a/tests/homesec/test_pipeline_events.py +++ b/tests/homesec/test_pipeline_events.py @@ -18,6 +18,7 @@ StateStoreConfig, StorageConfig, ) +from homesec.models.enums import VLMSkipReason from homesec.models.filter import FilterConfig, FilterResult from homesec.models.vlm import AnalysisResult, VLMConfig from homesec.notifiers.multiplex import NotifierEntry @@ -319,7 +320,7 @@ async def test_pipeline_emits_vlm_skipped_event( assert "vlm_started" not in event_types assert "vlm_completed" not in event_types skipped_event = next(event for event in events if event.event_type == "vlm_skipped") - assert skipped_event.reason == "no_trigger_classes" + assert skipped_event.reason == VLMSkipReason.NO_TRIGGER_CLASSES await state_store.shutdown() @@ -365,7 +366,7 @@ async def test_pipeline_emits_vlm_skipped_event_for_run_mode_never( assert "vlm_started" not in event_types assert "vlm_completed" not in event_types skipped_event = next(event for event in events if event.event_type == "vlm_skipped") - assert skipped_event.reason == "run_mode_never" + assert skipped_event.reason == VLMSkipReason.RUN_MODE_NEVER await state_store.shutdown() diff --git a/tests/homesec/test_runtime_worker.py b/tests/homesec/test_runtime_worker.py index de2011b8..e0b25e50 100644 --- a/tests/homesec/test_runtime_worker.py +++ b/tests/homesec/test_runtime_worker.py @@ -2,9 +2,12 @@ from __future__ import annotations +import asyncio from argparse import Namespace from typing import Any, cast +import pytest + import homesec.runtime.worker as worker_module from homesec.models.config import ( AlertPolicyConfig, @@ -27,7 +30,7 @@ def close(self) -> None: return None -def _make_config(*, notifiers: list[NotifierConfig]) -> Config: +def _make_config(*, notifiers: list[NotifierConfig], run_mode: str = "trigger_only") -> Config: return Config( cameras=[ CameraConfig( @@ -39,7 +42,7 @@ def _make_config(*, notifiers: list[NotifierConfig]) -> Config: state_store=StateStoreConfig(dsn="postgresql://user:pass@localhost/homesec"), notifiers=notifiers, filter=FilterConfig(backend="yolo", config={}), - vlm=VLMConfig(backend="openai", config={}), + vlm=VLMConfig(backend="openai", run_mode=run_mode, config={}), alert_policy=AlertPolicyConfig(backend="default", config={}), ) @@ -122,3 +125,53 @@ def _unexpected_plugin_load(*_: object) -> object: # Then: Worker keeps notifications disabled and does not load plugins assert isinstance(notifier, worker_module._NoopNotifier) assert entries == [] + + +@pytest.mark.asyncio +async def test_runtime_worker_run_runtime_skips_analyzer_load_when_run_mode_never( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """run_mode=never should avoid analyzer plugin loading in worker startup.""" + # Given: Runtime worker with VLM disabled and analyzer loader guarded + config = _make_config(notifiers=[], run_mode="never") + service = _make_service(config) + stop_event = asyncio.Event() + stop_event.set() + + class _StubStorage: + async def shutdown(self, timeout: float | None = None) -> None: + _ = timeout + + class _StubStateStore: + async def shutdown(self, timeout: float | None = None) -> None: + _ = timeout + + class _StubEventStore: + async def shutdown(self, timeout: float | None = None) -> None: + _ = timeout + + class _StubFilter: + async def shutdown(self, timeout: float | None = None) -> None: + _ = timeout + + async def _fake_create_state_store(_config: Config) -> _StubStateStore: + return _StubStateStore() + + monkeypatch.setattr(worker_module, "discover_all_plugins", lambda: None) + monkeypatch.setattr(worker_module, "load_storage_plugin", lambda _cfg: _StubStorage()) + monkeypatch.setattr(service, "_create_state_store", _fake_create_state_store) + monkeypatch.setattr(service, "_create_event_store", lambda _state: _StubEventStore()) + monkeypatch.setattr(service, "_create_alert_policy", lambda _cfg: cast(Any, object())) + monkeypatch.setattr(service, "_create_sources", lambda _cfg: ([], {})) + monkeypatch.setattr("homesec.runtime.assembly.load_filter", lambda _cfg: _StubFilter()) + + def _fail_if_called(_: object) -> object: + raise AssertionError("load_analyzer should not be called for run_mode=never") + + monkeypatch.setattr("homesec.runtime.assembly.load_analyzer", _fail_if_called) + + # When: Running runtime worker startup/shutdown cycle + await service.run_runtime(stop_event) + + # Then: Worker completes lifecycle without invoking analyzer loader + assert service._runtime_bundle is None