Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dev/fake-camera/mediamtx.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
rtspAddress: :8099
rtpAddress: :8100
rtcpAddress: :8101
readTimeout: 20s
writeTimeout: 20s
writeQueueSize: 2048
authMethod: internal
authInternalUsers:
- user: admin
Expand All @@ -15,3 +18,4 @@ authInternalUsers:

paths:
live:
maxReaders: 10
7 changes: 7 additions & 0 deletions src/homesec/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class VLMRunMode(StrEnum):
NEVER = "never"


class VLMSkipReason(StrEnum):
"""Reason why VLM analysis was skipped for a clip."""

RUN_MODE_NEVER = "run_mode_never"
NO_TRIGGER_CLASSES = "no_trigger_classes"


class RiskLevel(IntEnum):
"""VLM risk assessment levels.

Expand Down
6 changes: 3 additions & 3 deletions src/homesec/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pydantic import BaseModel, Field

from homesec.models.enums import EventType, RiskLevelField
from homesec.models.enums import EventType, RiskLevelField, VLMSkipReason
from homesec.models.filter import FilterResult


Expand Down Expand Up @@ -142,10 +142,10 @@ class VLMFailedEvent(ClipEvent):


class VLMSkippedEvent(ClipEvent):
"""VLM analysis skipped (no trigger classes detected)."""
"""VLM analysis skipped by runtime policy."""

event_type: Literal[EventType.VLM_SKIPPED] = EventType.VLM_SKIPPED
reason: str
reason: VLMSkipReason


class AlertDecisionMadeEvent(ClipEvent):
Expand Down
24 changes: 14 additions & 10 deletions src/homesec/pipeline/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from homesec.models.alert import Alert, AlertDecision
from homesec.models.clip import Clip
from homesec.models.config import Config
from homesec.models.enums import VLMRunMode, VLMSkipReason
from homesec.models.filter import FilterResult
from homesec.models.vlm import AnalysisResult
from homesec.notifiers.multiplex import NotifierEntry
Expand Down Expand Up @@ -262,7 +263,8 @@ async def _process_clip(self, clip: Clip) -> None:
# Stage 3: VLM (conditional)
analysis_result: AnalysisResult | None = None
vlm_failed = False
if self._should_run_vlm(filter_res):
vlm_skip_reason = self._vlm_skip_reason(filter_res)
if vlm_skip_reason is None:
vlm_result = await self._vlm_stage(clip, filter_res)
match vlm_result:
case VLMError() as vlm_err:
Expand All @@ -284,9 +286,9 @@ async def _process_clip(self, clip: Clip) -> None:
else:
await self._repository.record_vlm_skipped(
clip.clip_id,
reason="no_trigger_classes",
reason=vlm_skip_reason,
)
logger.info("VLM skipped for %s: no trigger classes", clip.clip_id)
logger.info("VLM skipped for %s: %s", clip.clip_id, vlm_skip_reason)

# Await upload after filter/VLM to maximize overlap
upload_result = await upload_task
Expand Down Expand Up @@ -641,17 +643,19 @@ def _format_error_type(exc: Exception) -> str:
return type(exc.cause).__name__
return type(exc).__name__

def _should_run_vlm(self, filter_result: FilterResult) -> bool:
"""Check if VLM should run based on detected classes and config."""
def _vlm_skip_reason(self, filter_result: FilterResult) -> VLMSkipReason | None:
"""Return skip reason when VLM should not run, otherwise None."""
run_mode = self._config.vlm.run_mode
if run_mode == "never":
return False
if run_mode == "always":
return True
if run_mode == VLMRunMode.NEVER:
return VLMSkipReason.RUN_MODE_NEVER
if run_mode == VLMRunMode.ALWAYS:
return None

detected = set(filter_result.detected_classes)
trigger = set(self._config.vlm.trigger_classes)
return bool(detected & trigger)
if detected & trigger:
return None
return VLMSkipReason.NO_TRIGGER_CLASSES

async def _apply_upload_result(
self,
Expand Down
3 changes: 0 additions & 3 deletions src/homesec/plugins/analyzers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

from __future__ import annotations

import logging
from typing import cast

from homesec.interfaces import VLMAnalyzer
from homesec.models.vlm import VLMConfig
from homesec.plugins.registry import PluginType, load_plugin

logger = logging.getLogger(__name__)


def load_analyzer(config: VLMConfig) -> VLMAnalyzer:
"""Load and instantiate a VLM analyzer plugin.
Expand Down
4 changes: 2 additions & 2 deletions src/homesec/repository/clip_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from homesec.models.clip import Clip, ClipListCursor, ClipListPage, ClipStateData
from homesec.models.config import RetryConfig
from homesec.models.enums import ClipStatus, RiskLevelField
from homesec.models.enums import ClipStatus, RiskLevelField, VLMSkipReason
from homesec.models.events import (
AlertDecisionMadeEvent,
ClipDeletedEvent,
Expand Down Expand Up @@ -285,7 +285,7 @@ async def record_vlm_failed(
)
)

async def record_vlm_skipped(self, clip_id: str, reason: str) -> None:
async def record_vlm_skipped(self, clip_id: str, reason: VLMSkipReason) -> None:
"""Record VLM skipped event."""
await self._safe_append(
VLMSkippedEvent(
Expand Down
8 changes: 7 additions & 1 deletion src/homesec/runtime/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Protocol

from homesec.models.enums import VLMRunMode
from homesec.notifiers.multiplex import NotifierEntry
from homesec.pipeline import ClipPipeline
from homesec.plugins.analyzers import load_analyzer
from homesec.plugins.filters import load_filter
from homesec.repository import ClipRepository
from homesec.retention import build_local_retention_pruner
from homesec.runtime.disabled_vlm import DisabledVLMAnalyzer
from homesec.runtime.models import RuntimeBundle, config_signature

if TYPE_CHECKING:
Expand Down Expand Up @@ -76,7 +78,11 @@ async def build_bundle(self, config: Config, generation: int) -> RuntimeBundle:
await self._notifier_health_logger(notifier_entries)

filter_plugin = load_filter(config.filter)
vlm_plugin = load_analyzer(config.vlm)
if config.vlm.run_mode == VLMRunMode.NEVER:
logger.info("VLM run_mode=never; runtime will use disabled analyzer")
vlm_plugin = DisabledVLMAnalyzer()
else:
vlm_plugin = load_analyzer(config.vlm)
alert_policy = self._alert_policy_factory(config)

sources, sources_by_camera = self._source_factory(config)
Expand Down
32 changes: 32 additions & 0 deletions src/homesec/runtime/disabled_vlm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Disabled VLM analyzer implementation for run_mode=never."""

from __future__ import annotations

from pathlib import Path

from homesec.interfaces import VLMAnalyzer
from homesec.models.enums import RiskLevel
from homesec.models.filter import FilterResult
from homesec.models.vlm import AnalysisResult, VLMConfig


class DisabledVLMAnalyzer(VLMAnalyzer):
"""Runtime-only analyzer used when VLM is disabled."""

async def analyze(
self, video_path: Path, filter_result: FilterResult, config: VLMConfig
) -> AnalysisResult:
_ = video_path
_ = filter_result
_ = config
return AnalysisResult(
risk_level=RiskLevel.LOW,
activity_type="skipped",
summary="VLM analysis disabled (run_mode=never)",
)

async def ping(self) -> bool:
return True

async def shutdown(self, timeout: float | None = None) -> None:
_ = timeout
50 changes: 39 additions & 11 deletions src/homesec/sources/rtsp/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
from homesec.sources.rtsp.hardware import HardwareAccelConfig, HardwareAccelDetector
from homesec.sources.rtsp.motion import MotionDetector
from homesec.sources.rtsp.preflight import (
CameraPreflightDiagnostics,
CameraPreflightOutcome,
PreflightError,
RTSPStartupPreflight,
)
from homesec.sources.rtsp.recorder import FfmpegRecorder, Recorder
from homesec.sources.rtsp.recording_profile import MotionProfile
from homesec.sources.rtsp.recording_profile import MotionProfile, build_default_recording_profile
from homesec.sources.rtsp.url_derivation import derive_detect_rtsp_url
from homesec.sources.rtsp.utils import (
_build_timeout_attempts,
Expand Down Expand Up @@ -407,22 +408,49 @@ def _run_startup_preflight(self) -> None:
case CameraPreflightOutcome() as outcome:
self._apply_preflight_outcome(outcome)
case PreflightError() as err:
logger.error(
"RTSP startup preflight failed: camera=%s key=%s stage=%s reason=%s",
self.camera_name,
err.camera_key,
err.stage,
err.message,
)
raise RuntimeError(
f"RTSP preflight failed for {self.camera_name} at {err.stage}: {err.message}"
) from err
if err.stage == "session_limit":
logger.warning(
"RTSP session-limit preflight failed for %s; falling back to single-stream defaults",
self.camera_name,
)
fallback = self._build_fallback_preflight_outcome(err.camera_key)
self._apply_preflight_outcome(fallback)
else:
logger.error(
"RTSP startup preflight failed: camera=%s key=%s stage=%s reason=%s",
self.camera_name,
err.camera_key,
err.stage,
err.message,
)
raise RuntimeError(
f"RTSP preflight failed for {self.camera_name} at {err.stage}: {err.message}"
) from err
case _:
raise RuntimeError(
"RTSP startup preflight returned unexpected result type: "
f"{type(result).__name__}"
)

def _build_fallback_preflight_outcome(self, camera_key: str) -> CameraPreflightOutcome:
motion_profile = MotionProfile(input_url=self.rtsp_url, ffmpeg_input_args=[])
recording_profile = build_default_recording_profile(self.rtsp_url)
diagnostics = CameraPreflightDiagnostics(
attempted_urls=[self.rtsp_url],
probes=[],
selected_motion_url=self.rtsp_url,
selected_recording_url=self.rtsp_url,
selected_recording_profile=recording_profile.profile_id(),
session_mode="single_stream",
notes=["Session-limit preflight failed; using single-stream fallback defaults"],
)
return CameraPreflightOutcome(
camera_key=camera_key,
motion_profile=motion_profile,
recording_profile=recording_profile,
diagnostics=diagnostics,
)

def _apply_preflight_outcome(self, outcome: CameraPreflightOutcome) -> None:
self._preflight_outcome = outcome
self._motion_rtsp_url = outcome.motion_profile.input_url
Expand Down
17 changes: 16 additions & 1 deletion tests/homesec/rtsp/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from homesec.sources.rtsp.frame_pipeline import FfmpegFramePipeline
from homesec.sources.rtsp.hardware import HardwareAccelConfig
from homesec.sources.rtsp.recorder import FfmpegRecorder
from homesec.sources.rtsp.recording_profile import MotionProfile
from homesec.sources.rtsp.recording_profile import MotionProfile, build_default_recording_profile


class FakeClock:
Expand Down Expand Up @@ -296,6 +296,21 @@ def test_detect_stream_defaults_to_main(tmp_path: Path) -> None:
assert not source._detect_stream_available


def test_session_limit_fallback_uses_default_recording_profile(tmp_path: Path) -> None:
"""Session-limit fallback should reuse shared default recording profile builder."""
# Given: an RTSP source with default startup configuration
config = _make_config(tmp_path, rtsp_url="rtsp://host/stream")
source = RTSPSource(config, camera_name="cam")

# When: constructing the startup session-limit fallback outcome
outcome = source._build_fallback_preflight_outcome("cam-key")

# Then: fallback uses the canonical default recording profile
expected_profile = build_default_recording_profile(source.rtsp_url)
assert outcome.recording_profile == expected_profile
assert outcome.diagnostics.selected_recording_profile == expected_profile.profile_id()


def test_reconnect_retries_until_success(tmp_path: Path) -> None:
"""Reconnect should retry until a pipeline starts when attempts are infinite."""
# Given: a pipeline that fails twice then succeeds
Expand Down
49 changes: 48 additions & 1 deletion tests/homesec/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
StateStoreConfig,
StorageConfig,
)
from homesec.models.enums import RiskLevel
from homesec.models.enums import RiskLevel, VLMSkipReason
from homesec.models.filter import FilterConfig, FilterOverrides, FilterResult
from homesec.models.vlm import AnalysisResult, VLMConfig
from homesec.pipeline import ClipPipeline
Expand Down Expand Up @@ -283,6 +283,53 @@ async def test_run_mode_always_runs_vlm_regardless(
assert state is not None
assert state.analysis_result is not None

@pytest.mark.asyncio
async def test_run_mode_never_skips_vlm_with_explicit_reason(
self, base_config: Config, sample_clip: Clip, mocks: PipelineMocks
) -> None:
"""When run_mode=never, VLM is skipped even for trigger detections."""
# Given run_mode=never and a trigger-class filter result
base_config = Config(
cameras=base_config.cameras,
storage=base_config.storage,
state_store=base_config.state_store,
notifiers=base_config.notifiers,
filter=base_config.filter,
vlm=base_config.vlm.model_copy(update={"run_mode": "never"}),
alert_policy=base_config.alert_policy,
)
person_result = FilterResult(
detected_classes=["person"],
confidence=0.95,
model="mock",
sampled_frames=30,
)
filter_mock = MockFilter(result=person_result)
mocks.filter = filter_mock

pipeline = ClipPipeline(
config=base_config,
storage=mocks.storage,
repository=make_repository(base_config, mocks),
filter_plugin=filter_mock,
vlm_plugin=mocks.vlm,
notifier=mocks.notifier,
alert_policy=make_alert_policy(base_config),
retention_pruner=MockRetentionPruner(),
)

# When a clip is processed
pipeline.on_new_clip(sample_clip)
await pipeline.shutdown()

# Then VLM is skipped with a run_mode-specific reason
events = await mocks.event_store.get_events(sample_clip.clip_id)
skipped_events = [event for event in events if event.event_type == "vlm_skipped"]
assert len(skipped_events) == 1
assert skipped_events[0].reason == VLMSkipReason.RUN_MODE_NEVER
assert all(event.event_type != "vlm_started" for event in events)
assert all(event.event_type != "vlm_completed" for event in events)


class TestClipPipelineErrorHandling:
"""Test error handling and partial failures."""
Expand Down
Loading
Loading