From 0aac25d3fd6a389760e9045b3ba7ff11ee7cbaae Mon Sep 17 00:00:00 2001 From: Kam Date: Wed, 8 Apr 2026 13:56:37 +1000 Subject: [PATCH] feat(server): align vision pipeline with normalized events --- config.json | 5 ++++ config.rtsp.example.json | 5 ++++ packaging/config.example.json | 5 ++++ packaging/config.rtsp.example.json | 5 ++++ src/server/internal/config.cpp | 9 +++++++ src/server/internal/config.h | 8 +++++++ src/server/internal/media.cpp | 32 +++++++++++++++++++++++-- src/server/internal/media.h | 2 ++ src/server/internal/visionartifacts.cpp | 27 ++++++++++++++------- web/src/app.js | 10 +++++++- 10 files changed, 96 insertions(+), 12 deletions(-) diff --git a/config.json b/config.json index 6f0de08..4f5be6e 100644 --- a/config.json +++ b/config.json @@ -37,6 +37,11 @@ "everyNthFrame": 6, "minIntervalUsec": 200000, "queueDepth": 8, + "normalize": { + "width": 0, + "height": 0, + "pixelFmt": "" + }, "snapshots": { "enabled": false, "dir": "./recordings/snapshots", diff --git a/config.rtsp.example.json b/config.rtsp.example.json index 1132437..4842063 100644 --- a/config.rtsp.example.json +++ b/config.rtsp.example.json @@ -33,6 +33,11 @@ "everyNthFrame": 6, "minIntervalUsec": 200000, "queueDepth": 8, + "normalize": { + "width": 0, + "height": 0, + "pixelFmt": "" + }, "snapshots": { "enabled": true, "dir": "./recordings/snapshots", diff --git a/packaging/config.example.json b/packaging/config.example.json index 34defeb..0038cf2 100644 --- a/packaging/config.example.json +++ b/packaging/config.example.json @@ -37,6 +37,11 @@ "everyNthFrame": 6, "minIntervalUsec": 200000, "queueDepth": 8, + "normalize": { + "width": 0, + "height": 0, + "pixelFmt": "" + }, "snapshots": { "enabled": false, "dir": "./recordings/snapshots", diff --git a/packaging/config.rtsp.example.json b/packaging/config.rtsp.example.json index 8fe9fec..f10fb45 100644 --- a/packaging/config.rtsp.example.json +++ b/packaging/config.rtsp.example.json @@ -37,6 +37,11 @@ "everyNthFrame": 6, "minIntervalUsec": 200000, "queueDepth": 8, + "normalize": { + "width": 0, + "height": 0, + "pixelFmt": "" + }, "snapshots": { "enabled": true, "dir": "./recordings/snapshots", diff --git a/src/server/internal/config.cpp b/src/server/internal/config.cpp index fc11959..4342766 100644 --- a/src/server/internal/config.cpp +++ b/src/server/internal/config.cpp @@ -129,6 +129,15 @@ ConfigLoadResult loadConfigResult(const std::string& path) c.vision.everyNthFrame = v.value("everyNthFrame", c.vision.everyNthFrame); c.vision.minIntervalUsec = v.value("minIntervalUsec", c.vision.minIntervalUsec); c.vision.queueDepth = v.value("queueDepth", c.vision.queueDepth); + if (v.contains("normalize")) { + auto& normalize = v["normalize"]; + c.vision.normalize.width = + normalize.value("width", c.vision.normalize.width); + c.vision.normalize.height = + normalize.value("height", c.vision.normalize.height); + c.vision.normalize.pixelFmt = + normalize.value("pixelFmt", c.vision.normalize.pixelFmt); + } if (v.contains("motion")) { auto& motion = v["motion"]; c.vision.motionGridWidth = motion.value("gridWidth", c.vision.motionGridWidth); diff --git a/src/server/internal/config.h b/src/server/internal/config.h index 6698627..534534d 100644 --- a/src/server/internal/config.h +++ b/src/server/internal/config.h @@ -30,6 +30,13 @@ struct Config struct VisionConfig { + struct NormalizerConfig + { + int width = 0; + int height = 0; + std::string pixelFmt; + }; + struct SnapshotConfig { bool enabled = false; @@ -49,6 +56,7 @@ struct Config uint32_t everyNthFrame = 6; int64_t minIntervalUsec = 200000; int queueDepth = 8; + NormalizerConfig normalize; uint32_t motionGridWidth = 32; uint32_t motionGridHeight = 18; uint32_t motionWarmupFrames = 2; diff --git a/src/server/internal/media.cpp b/src/server/internal/media.cpp index 85f7be3..b1657f0 100644 --- a/src/server/internal/media.cpp +++ b/src/server/internal/media.cpp @@ -31,6 +31,12 @@ std::string intelligenceSourceLabel(const Config& config, const std::string& pee return config.source.empty() ? peerId : config.source; } + +std::string intelligenceStreamId(const std::string& peerId) +{ + return "stream/" + peerId; +} + } // namespace @@ -425,6 +431,12 @@ json::Value MediaSession::intelligenceStatus() const vision["sampledFrames"] = stats.forwarded; vision["sampledDropped"] = stats.dropped; } + if (_visionNormalizer) { + const auto stats = _visionNormalizer->stats(); + vision["normalizedFrames"] = stats.emitted; + vision["normalizerDropped"] = stats.dropped; + vision["normalizerConverted"] = stats.converted; + } if (_visionQueue) { vision["queueDepth"] = static_cast(_visionQueue->size()); vision["queueDropped"] = static_cast(_visionQueue->dropped()); @@ -577,6 +589,8 @@ void MediaSession::startStreaming() if (_visionSampler) _visionSampler->reset(); + if (_visionNormalizer) + _visionNormalizer->reset(); if (_visionDetector) _visionDetector->reset(); if (_visionArtifacts) @@ -605,6 +619,7 @@ void MediaSession::setupIntelligence() return; const auto sourceLabel = intelligenceSourceLabel(_config, _peerId); + const auto streamId = intelligenceStreamId(_peerId); if (_config.vision.enabled) { _visionArtifacts = std::make_unique( @@ -623,9 +638,18 @@ void MediaSession::setupIntelligence() .everyNthFrame = _config.vision.everyNthFrame, .minIntervalUsec = _config.vision.minIntervalUsec, }); + _visionNormalizer = std::make_shared( + vision::FrameNormalizerConfig{ + .sourceId = sourceLabel, + .streamId = streamId, + .width = _config.vision.normalize.width, + .height = _config.vision.normalize.height, + .pixelFmt = _config.vision.normalize.pixelFmt, + }); _visionQueue = std::make_shared(_config.vision.queueDepth); _visionDetector = std::make_unique(vision::MotionDetectorConfig{ .source = sourceLabel, + .streamId = streamId, .detectorName = "motion", .gridWidth = _config.vision.motionGridWidth, .gridHeight = _config.vision.motionGridHeight, @@ -644,12 +668,16 @@ void MediaSession::setupIntelligence() _visionSampler->process(packet); }; _visionSampler->emitter += [this](IPacket& packet) { - auto* frame = dynamic_cast(&packet); + if (_visionNormalizer) + _visionNormalizer->process(packet); + }; + _visionNormalizer->emitter += [this](IPacket& packet) { + auto* frame = dynamic_cast(&packet); if (frame && _visionQueue) _visionQueue->process(*frame); }; _visionQueue->emitter += [this](IPacket& packet) { - auto* frame = dynamic_cast(&packet); + auto* frame = dynamic_cast(&packet); if (frame && _visionDetector) _visionDetector->process(*frame); }; diff --git a/src/server/internal/media.h b/src/server/internal/media.h index cbcccdb..adce1fd 100644 --- a/src/server/internal/media.h +++ b/src/server/internal/media.h @@ -13,6 +13,7 @@ #include "icy/speech/voiceactivitydetector.h" #include "icy/symple/server.h" #include "icy/vision/detectionqueue.h" +#include "icy/vision/framenormalizer.h" #include "icy/vision/framesampler.h" #include "icy/vision/motiondetector.h" #include "icy/webrtc/peersession.h" @@ -113,6 +114,7 @@ class MediaSession : public std::enable_shared_from_this std::shared_ptr _videoEncoder; std::shared_ptr _audioEncoder; std::shared_ptr _visionSampler; + std::shared_ptr _visionNormalizer; std::shared_ptr _visionQueue; std::unique_ptr _visionDetector; std::shared_ptr _speechQueue; diff --git a/src/server/internal/visionartifacts.cpp b/src/server/internal/visionartifacts.cpp index 00cb2c3..3467806 100644 --- a/src/server/internal/visionartifacts.cpp +++ b/src/server/internal/visionartifacts.cpp @@ -23,6 +23,14 @@ double fpsFromWindow(uint64_t frames, int64_t firstUsec, int64_t lastUsec) (static_cast(frames - 1) * 1000000.0L) / durationUsec); } + +int64_t eventFrameTimeUsec(const vision::VisionEvent& event) +{ + if (event.frame.ptsUsec > 0) + return event.frame.ptsUsec; + return event.emittedAtUsec; +} + } // namespace @@ -152,22 +160,23 @@ void VisionArtifacts::onFrame(const av::PlanarVideoPacket& packet) VisionArtifactResult VisionArtifacts::onEvent(const vision::VisionEvent& event) { std::lock_guard lock(_mutex); + const int64_t frameTimeUsec = eventFrameTimeUsec(event); VisionArtifactResult result; - result.latencyUsec = latencyForFrameLocked(event.frame.timeUsec); + result.latencyUsec = latencyForFrameLocked(frameTimeUsec); _lastLatencyUsec = result.latencyUsec; if (_config.snapshotsEnabled) { - const int64_t sinceLastSnapshot = event.frame.timeUsec - _lastSnapshotTimeUsec; + const int64_t sinceLastSnapshot = frameTimeUsec - _lastSnapshotTimeUsec; if (_lastSnapshotTimeUsec == 0 || sinceLastSnapshot >= _config.snapshotMinIntervalUsec) { - if (auto* frame = bestFrameLocked(event.frame.timeUsec)) { - const auto path = makeSnapshotPathLocked(event.frame.timeUsec); + if (auto* frame = bestFrameLocked(frameTimeUsec)) { + const auto path = makeSnapshotPathLocked(frameTimeUsec); if (writeSnapshotLocked(*frame, path)) { result.snapshotPath = path; result.snapshotUrl = artifactUrlFor( fs::makePath("snapshots", fs::filename(path))); - _lastSnapshotTimeUsec = event.frame.timeUsec; + _lastSnapshotTimeUsec = frameTimeUsec; _lastSnapshotPath = result.snapshotPath; _lastSnapshotUrl = result.snapshotUrl; ++_snapshotsWritten; @@ -180,14 +189,14 @@ VisionArtifactResult VisionArtifacts::onEvent(const vision::VisionEvent& event) } if (_config.clipsEnabled) { - if (_clip && event.frame.timeUsec <= _clip->deadlineUsec) { + if (_clip && frameTimeUsec <= _clip->deadlineUsec) { _clip->deadlineUsec = std::max( _clip->deadlineUsec, - event.frame.timeUsec + _config.clipPostRollUsec); + frameTimeUsec + _config.clipPostRollUsec); } else { finishClipLocked(); - startClipLocked(event.frame.timeUsec); - flushBufferedFramesLocked(event.frame.timeUsec - _config.clipPreRollUsec); + startClipLocked(frameTimeUsec); + flushBufferedFramesLocked(frameTimeUsec - _config.clipPreRollUsec); } if (_clip) { diff --git a/web/src/app.js b/web/src/app.js index 12bc150..9400a82 100644 --- a/web/src/app.js +++ b/web/src/app.js @@ -461,7 +461,15 @@ function buildArtifactLinks (event) { } function formatEventTime (event) { - const usec = Number(event?.time || event?.audio?.time || event?.frame?.time || 0) + const usec = Number( + event?.frame?.ptsUsec ?? + event?.audio?.timeUsec ?? + event?.time ?? + event?.audio?.time ?? + event?.frame?.time ?? + event?.emittedAtUsec ?? + 0 + ) if (!Number.isFinite(usec) || usec <= 0) { return 'live' }