feat(engine,nodes): add cross-node control messaging via param_bridge#280
feat(engine,nodes): add cross-node control messaging via param_bridge#280streamer45 merged 26 commits intomainfrom
Conversation
Introduces a generalizable pattern for cross-node control messaging
within pipeline graphs, enabling any node to send UpdateParams to
sibling nodes by name.
Phase 1 — Engine control channel in NodeContext:
- Add engine_control_tx: Option<mpsc::Sender<EngineControlMessage>>
field to NodeContext, wired in DynamicEngine::initialize_node()
- Add tune_sibling() convenience method for sending TuneNode messages
- Set to None in oneshot/stateless pipelines (not supported)
Phase 2 — core::param_bridge node:
- Terminal node that bridges data-plane packets to control-plane
UpdateParams messages on a configured target node
- Three mapping modes:
* Auto: smart per-packet-type (Transcription/Text → properties.text,
Custom → forward data as-is)
* Template: user-supplied JSON with {{ text }} placeholders
* Raw: forward extracted payload unchanged
- Designed for best_effort side branches to never stall main data flow
Phase 3 — Compositor word-wrap:
- Add word_wrap: bool field to TextOverlayConfig (default false)
- When true, uses transform.rect.width as wrap boundary
- Backward compatible — existing overlays unchanged
Phase 4 — Demo pipeline + Slint subtitle component:
- samples/slint/system/subtitle.slint: semi-transparent panel with
word-wrapped text and fade animation
- samples/pipelines/dynamic/video_moq_webcam_subtitles.yml: webcam PiP
with Whisper STT → param_bridge → Slint subtitle overlay
Data flow: mic → opus_decoder → resampler → whisper → [best_effort] →
param_bridge → UpdateParams → slint → compositor layer
Signed-off-by: Devin AI <devin@streamkit.dev>
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Fix template mode sending spurious UpdateParams with empty text when receiving unsupported packet types (Audio, Video, Binary). Now skips them consistently with auto and raw modes. Add comprehensive unit tests for all param_bridge helper functions: extract_text, auto_map, apply_template, raw_payload, and config validation (24 tests). Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
crates/core/src/node.rs
Outdated
| tx.send(crate::control::EngineControlMessage::TuneNode { | ||
| node_id: target_node_id.to_string(), | ||
| message: crate::control::NodeControlMessage::UpdateParams(params), | ||
| }) | ||
| .await | ||
| .map_err(|_| "Engine control channel closed".to_string()) |
There was a problem hiding this comment.
🚩 Blocking send in tune_sibling may delay shutdown by up to 2 seconds
The tune_sibling method at crates/core/src/node.rs:381-386 uses tx.send().await on the engine's main control channel (engine_control_tx). During engine shutdown (crates/engine/src/dynamic_actor.rs:1550-1639), the engine stops reading from control_rx while waiting for nodes to exit. If a param_bridge node is mid-send and the channel buffer (128 slots) happens to be full, the node blocks and cannot exit its loop until the engine's 2-second timeout fires and the task is aborted. This produces a misleading warning: "Node did not shut down within 2s, this indicates a bug".
In practice this is unlikely (the 128-capacity channel is rarely full), and the timeout+abort mechanism handles it correctly, but using try_send instead of send().await would make the param_bridge more shutdown-friendly and avoid the false warning. The sample YAML already marks the connection as best_effort, so dropping a single UpdateParams during shutdown is acceptable.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Valid observation. In practice the 128-slot buffer makes this unlikely, and the timeout+abort handles the edge case correctly. Using try_send is a reasonable improvement but would change the semantics slightly (silently dropping messages when the buffer is full during normal operation, not just shutdown). I'd defer this to human review — if the maintainer prefers try_send semantics here I'm happy to switch it.
There was a problem hiding this comment.
Acknowledged — this is unlikely in practice (128-capacity channel at STT rates of ~1-5/sec), and the timeout+abort handles it correctly. The param_bridge already monitors control_rx for Shutdown and breaks out of its loop, so the send().await would only block if the channel buffer were completely full at the exact moment of shutdown. Leaving as-is for now since try_send would silently drop the message without logging, and the current behavior is safe.
- Use let-else instead of if-let for template mode extract_text - Move test module to end of file (items_after_test_module) - Allow unwrap_used in test module (matches repo convention) - Remove unused variable in test Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- tune_sibling() now returns Result<(), StreamKitError> instead of String
- Add optional debounce_ms config to coalesce rapid UpdateParams
- Make placeholder matching whitespace-insensitive ({{text}} and {{ text }})
- Document auto_map asymmetry (Slint-oriented default) in MappingMode doc
- Add extension path comment for future placeholders (language, confidence)
- Align error strings between early check and tune_sibling
- Register with StaticPins to fix schema endpoint ERROR log
- Fix sample pipeline: target_sample_rate (not sample_rate/channels),
model_path with tiny model, add debounce_ms to subtitle_bridge
- Add tests for debounce_ms config and whitespace-insensitive placeholders
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Fixes sequential replacement corruption when substituted text contains
the literal string '{{text}}'. Normalize '{{ text }}' → '{{text}}'
first, then replace once.
Adds regression test for this case.
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
'visible' is a built-in property on all Slint elements (including
Window). Declaring 'in property <bool> visible' causes a Slint
compilation error ('Cannot override property visible') that was
silently swallowed at the plugin FFI boundary, surfacing only as the
generic 'Plugin failed to create instance' message.
Rename to 'show' (consistent with lower_third.slint) and update the
sample pipeline template to match.
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Slint subtitle fix (1560532)Root cause identified: This error was invisible because the Fix: Renamed Verified with a standalone Slint compiler test — |
Add TelemetryEmitter to param_bridge that emits 'stt.result' events with text_preview when forwarding UpdateParams containing text. This surfaces transcribed text in the stream view's telemetry timeline. Also add a core::telemetry_tap node to the subtitle sample pipeline between whisper and param_bridge so raw STT results (with segments) appear in telemetry too. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Add Silero VAD configuration to the Whisper node (vad_threshold: 0.4, min_silence_duration_ms: 600) so silence is filtered before inference, improving transcription responsiveness. Replace telemetry_tap with core::telemetry_out (matching other dynamic pipelines like voice-agent-openai and speech-translate) to surface STT results in the stream view telemetry timeline via best_effort side branch. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Without this, the engine's shutdown_node() always hits the 5-second timeout and force-aborts the node because param_bridge never reads control_rx. This also prevented the pending debounce flush from executing on shutdown. Extracts control_rx from NodeContext before the loop to avoid borrow conflicts with recv_with_cancellation (which borrows context immutably). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…btitle transition - Dedup: skip UpdateParams identical to last-sent value to avoid redundant Slint re-renders during VAD boundary refinement. - Telemetry: extract text preview before mapping so it works regardless of the target node's JSON shape (decouples from properties.text). - Debounce reset: use 1-year duration instead of 24h to avoid spurious wakeup on long-running sessions (Duration::MAX overflows Instant). - Docs: add note about raw_payload weight with Transcription packets; explain one-time control_rx swap overhead. - Subtitle transition: fade-in + slide-up when text arrives, fade-out + slide-down when cleared (driven by active = show && text != ""). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Let show remain an independent kill switch via controls/API. The Slint active property (show && text != "") already handles auto-hide when there is no text to display. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Subtitle: move transition to text element (fade + slide-up), not the
background overlay. Backdrop appears/disappears instantly.
- Fix connection_mode: was silently ignored at node level; use Map
variant syntax (in: {node, mode}) so best_effort is actually applied.
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…m-bridge-subtitles
Parakeet TDT is ~10x faster than Whisper on CPU with competitive accuracy. Updates the subtitle pipeline to use plugin::native::parakeet with the INT8 model and built-in VAD. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…t lang)
- Fix apply_template infinite loop when replacement text contains
{{ field }} patterns by advancing cursor past each substitution
- Restore --profile release-lto to profiling build/run justfile targets
- Regenerate official-plugins.json to match plugin.yml expected_size_bytes
- Read detected language from FFI result instead of hardcoding "en"
(Parakeet v3 supports 25 languages)
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…m-bridge-subtitles
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Move InstanceState out of slint_thread_main and extract handle_register, handle_render, apply_config_update, and apply_resize into separate functions/methods. Reduces cognitive complexity from 66 to well under the clippy threshold of 50. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| - decoder.int8.onnx | ||
| - joiner.int8.onnx | ||
| - tokens.txt | ||
| expected_size_bytes: 661190513 | ||
| expected_size_bytes: 671145061 | ||
| license: CC-BY-4.0 | ||
| license_url: https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2 | ||
| file_checksums: | ||
| encoder.int8.onnx: a32b12d17bbbc309d0686fbbcc2987b5e9b8333a7da83fa6b089f0a2acd651ab | ||
| decoder.int8.onnx: b6bb64963457237b900e496ee9994b59294526439fbcc1fecf705b31a15c6b4e | ||
| joiner.int8.onnx: 7946164367946e7f9f29a122407c3252b680dbae9a51343eb2488d057c3c43d2 | ||
| tokens.txt: ec182b70dd42113aff6c5372c75cac58c952443eb22322f57bbd7f53977d497d | ||
| license_url: https://huggingface.co/nvidia/parakeet-tdt-0.6b-v3 |
There was a problem hiding this comment.
🚩 Parakeet file_checksums removed from plugin manifest — integrity verification no longer possible
The file_checksums map was removed from plugins/native/parakeet/plugin.yml and marketplace/official-plugins.json for the v3 model. This means the model download pipeline can no longer verify file integrity after download. The v2 model had SHA256 checksums for each individual file (encoder, decoder, joiner, tokens). The v3 entry retains expected_size_bytes but has no per-file or aggregate checksum. This is likely intentional (checksums may not yet be computed for the new model files), but should be backfilled before the next release to maintain the integrity verification guarantee that other model entries provide.
(Refers to lines 13-26)
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Acknowledged — the file_checksums were removed as part of the v2→v3 model upgrade (checksums for the new v3 files haven't been computed yet). The expected_size_bytes still provides basic integrity checking. Checksums should be backfilled once the v3 model files are finalized and published, but this isn't blocking for the current PR.
- Update Requires comment to include download-silero-vad and download-tenvad-models alongside download-parakeet-models so a fresh checkout can run the demo without missing VAD assets. - Remove the stt_telemetry (core::telemetry_out) node since param_bridge::send_params() already emits stt.result telemetry, avoiding duplicate entries in the stream view timeline. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| let language = if result.lang.is_null() { | ||
| None | ||
| } else { | ||
| let lang = unsafe { CStr::from_ptr(result.lang).to_string_lossy().into_owned() }; | ||
| if lang.is_empty() { None } else { Some(lang) } | ||
| }; |
There was a problem hiding this comment.
🚩 Parakeet result.lang field access assumes FFI struct layout matches sherpa-onnx v3
At plugins/native/parakeet/src/parakeet_node.rs:495-500, the code accesses result.lang from the FFI SherpaOnnxOfflineRecognizerResult struct. The lang field exists in the FFI definition at plugins/native/parakeet/src/ffi.rs:30, and the null check at line 495 guards against models that don't populate this field. However, correctness depends on the sherpa-onnx C library version matching the FFI struct layout exactly — if the actual shared library has a different struct layout (e.g., lang at a different offset), this would be undefined behavior. The model upgrade from v2 to v3 makes this worth verifying against the deployed sherpa-onnx version.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
The FFI struct at ffi.rs:24-34 is #[repr(C)] and matches the upstream sherpa-onnx SherpaOnnxOfflineRecognizerResult layout exactly — lang has been present at this offset since the struct was extended to support multilingual models. The null check at line 495 guards against older models or library versions that leave the field unpopulated. The sherpa-onnx version used here is pinned via install-sherpa-onnx in the justfile, so the struct layout is guaranteed to match at build time.
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…ated reads tokio::fs::File::write_all returns as soon as data is copied to an internal buffer and a blocking write task is spawned — it does NOT wait for the blocking write to complete. When the File is dropped without flushing, the last write may still be in-flight. A subsequent fs::read can then see a truncated file. This caused flaky E2E failures in the compositor-image-overlay upload test: the image crate's into_dimensions() would fail with 'unexpected end of file' because it was parsing a partially-written PNG. The plugin upload handler in server/mod.rs already had this fix; apply the same pattern to all asset upload functions (image, audio, font) in assets.rs and plugin_assets.rs. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
apps/skit/src/assets.rs
Outdated
| // Flush pending writes — tokio::fs::File::write_all returns as soon as | ||
| // data is copied to an internal buffer and a blocking write is spawned, | ||
| // so the last write may still be in-flight when the File is dropped. | ||
| if let Err(e) = file.flush().await { | ||
| let _ = fs::remove_file(file_path).await; | ||
| return Err(AssetsError::IoError(format!("Failed to flush file: {e}"))); | ||
| } |
There was a problem hiding this comment.
Is there a fundamental reason we need to repeat the same logic 4 times?
There was a problem hiding this comment.
Consolidated in 3478ae2 — the three nearly-identical functions (write_upload_stream_to_disk, write_image_upload_to_disk, write_font_upload_to_disk) are now a single stream_field_to_file(field, path, max_size) helper. The cleanup-on-error pattern (remove partial file) appears exactly once via an inner async block instead of being repeated 3–4 times per function.
Also fixed a missing flush() in the image upload path that the audio and font paths had but image did not.
License sidecar creation is extracted into create_license_sidecar() and called by the audio/font upload sites that need it.
Net: −167 lines, +76 lines (−91 total).
Extract stream_field_to_file() helper to replace three nearly-identical functions (write_upload_stream_to_disk, write_image_upload_to_disk, write_font_upload_to_disk). The cleanup-on-error pattern (remove partial file) now appears exactly once via an inner async block, instead of being repeated 3-4 times per function. Also fixes a missing flush() in the image upload path — the audio and font paths had the flush but image did not, which could cause the same truncated-read race condition that was fixed for those paths. Signed-off-by: streamkit-devin <devin@streamer45.com> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: streamkit-devin <devin@streamer45.com> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Summary
Adds a
core::param_bridgenode that bridges data-plane packets to control-planeUpdateParamsmessages viatune_sibling(), enabling cross-node control within the pipeline graph. Includes a live subtitle demo pipeline using Parakeet TDT STT → param_bridge → Slint overlay.Key changes:
core::param_bridgenode — terminal node with three mapping modes (auto, template, raw), optional debounce, dedup of identical params, and telemetry emissionNodeContext::tune_sibling()sendsTuneNodevia the engine control channelbest_effortconnections — new connection mode that drops packets instead of blocking the sendervideo_moq_webcam_subtitles.ymlpipeline +subtitle.slintoverlay with text fade-in/slide-up transitionstt.resultevents (text extracted before mapping, independent of target JSON shape)control_rxforShutdownsignalstream_field_to_file()helper to replace three nearly-identical upload functions (write_upload_stream_to_disk,write_image_upload_to_disk,write_font_upload_to_disk). Cleanup-on-error (remove partial file) now appears exactly once via an inner async block. Also fixed a missingflush()in the image upload path.Review & Testing Checklist for Human
video_moq_webcam_subtitles.ymlpipeline and confirm text fade-in/slide-up on arrival, smooth text changes (no flicker), fade-out on clearbest_effortconnection mode is working — theconnection_modefield was previously at node level (silently ignored); now uses correct{node, mode}Map syntax insideneedsshowproperty works as independent kill switch — template no longer forcesshow: true, so togglingshowvia API should hide/show subtitles independentlyTest plan
just build-plugin-native-parakeet && just build-plugin-native-slint && just copy-plugins-nativejust download-parakeet-models && just download-silero-vad && just download-tenvad-modelsvideo_moq_webcam_subtitles.ymlstt.resulteventsNotes
stream_field_to_filerefactoring also fixed a missingflush()in the image upload path that audio and font paths had but image did notLink to Devin session: https://staging.itsdev.in/sessions/a750af18ee254481a97c4ac581ba129e
Requested by: @streamer45