Skip to content

Commit bc40048

Browse files
staging-devin-ai-integration[bot]streamkit-devinstreamer45
authored
feat(engine,nodes): add cross-node control messaging via param_bridge (#280)
* feat(engine,nodes): add cross-node control messaging via param_bridge Introduces a generalizable pattern for cross-node control messaging within pipeline graphs, enabling any node to send UpdateParams to sibling nodes by name. Phase 1 — Engine control channel in NodeContext: - Add engine_control_tx: Option<mpsc::Sender<EngineControlMessage>> field to NodeContext, wired in DynamicEngine::initialize_node() - Add tune_sibling() convenience method for sending TuneNode messages - Set to None in oneshot/stateless pipelines (not supported) Phase 2 — core::param_bridge node: - Terminal node that bridges data-plane packets to control-plane UpdateParams messages on a configured target node - Three mapping modes: * Auto: smart per-packet-type (Transcription/Text → properties.text, Custom → forward data as-is) * Template: user-supplied JSON with {{ text }} placeholders * Raw: forward extracted payload unchanged - Designed for best_effort side branches to never stall main data flow Phase 3 — Compositor word-wrap: - Add word_wrap: bool field to TextOverlayConfig (default false) - When true, uses transform.rect.width as wrap boundary - Backward compatible — existing overlays unchanged Phase 4 — Demo pipeline + Slint subtitle component: - samples/slint/system/subtitle.slint: semi-transparent panel with word-wrapped text and fade animation - samples/pipelines/dynamic/video_moq_webcam_subtitles.yml: webcam PiP with Whisper STT → param_bridge → Slint subtitle overlay Data flow: mic → opus_decoder → resampler → whisper → [best_effort] → param_bridge → UpdateParams → slint → compositor layer Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): skip unsupported packets in template mode, add unit tests Fix template mode sending spurious UpdateParams with empty text when receiving unsupported packet types (Audio, Video, Binary). Now skips them consistently with auto and raw modes. Add comprehensive unit tests for all param_bridge helper functions: extract_text, auto_map, apply_template, raw_payload, and config validation (24 tests). Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style(nodes): fix clippy lints in param_bridge - Use let-else instead of if-let for template mode extract_text - Move test module to end of file (items_after_test_module) - Allow unwrap_used in test module (matches repo convention) - Remove unused variable in test Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): address review findings for param_bridge - tune_sibling() now returns Result<(), StreamKitError> instead of String - Add optional debounce_ms config to coalesce rapid UpdateParams - Make placeholder matching whitespace-insensitive ({{text}} and {{ text }}) - Document auto_map asymmetry (Slint-oriented default) in MappingMode doc - Add extension path comment for future placeholders (language, confidence) - Align error strings between early check and tune_sibling - Register with StaticPins to fix schema endpoint ERROR log - Fix sample pipeline: target_sample_rate (not sample_rate/channels), model_path with tiny model, add debounce_ms to subtitle_bridge - Add tests for debounce_ms config and whitespace-insensitive placeholders Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): normalize template placeholders before substitution Fixes sequential replacement corruption when substituted text contains the literal string '{{text}}'. Normalize '{{ text }}' → '{{text}}' first, then replace once. Adds regression test for this case. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(slint): rename reserved 'visible' property in subtitle.slint 'visible' is a built-in property on all Slint elements (including Window). Declaring 'in property <bool> visible' causes a Slint compilation error ('Cannot override property visible') that was silently swallowed at the plugin FFI boundary, surfacing only as the generic 'Plugin failed to create instance' message. Rename to 'show' (consistent with lower_third.slint) and update the sample pipeline template to match. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * feat(nodes): emit telemetry from param_bridge for stream view visibility Add TelemetryEmitter to param_bridge that emits 'stt.result' events with text_preview when forwarding UpdateParams containing text. This surfaces transcribed text in the stream view's telemetry timeline. Also add a core::telemetry_tap node to the subtitle sample pipeline between whisper and param_bridge so raw STT results (with segments) appear in telemetry too. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * feat(pipeline): add VAD filtering and telemetry_out to subtitle pipeline Add Silero VAD configuration to the Whisper node (vad_threshold: 0.4, min_silence_duration_ms: 600) so silence is filtered before inference, improving transcription responsiveness. Replace telemetry_tap with core::telemetry_out (matching other dynamic pipelines like voice-agent-openai and speech-translate) to surface STT results in the stream view telemetry timeline via best_effort side branch. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(nodes): handle control_rx Shutdown in param_bridge select loop Without this, the engine's shutdown_node() always hits the 5-second timeout and force-aborts the node because param_bridge never reads control_rx. This also prevented the pending debounce flush from executing on shutdown. Extracts control_rx from NodeContext before the loop to avoid borrow conflicts with recv_with_cancellation (which borrows context immutably). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(param_bridge): dedup identical params, decouple telemetry, add subtitle transition - Dedup: skip UpdateParams identical to last-sent value to avoid redundant Slint re-renders during VAD boundary refinement. - Telemetry: extract text preview before mapping so it works regardless of the target node's JSON shape (decouples from properties.text). - Debounce reset: use 1-year duration instead of 24h to avoid spurious wakeup on long-running sessions (Duration::MAX overflows Instant). - Docs: add note about raw_payload weight with Transcription packets; explain one-time control_rx swap overhead. - Subtitle transition: fade-in + slide-up when text arrives, fade-out + slide-down when cleared (driven by active = show && text != ""). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style: cargo fmt Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(sample): remove show:true from subtitle template Let show remain an independent kill switch via controls/API. The Slint active property (show && text != "") already handles auto-hide when there is no text to display. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(sample): text transition effect + connection_mode syntax - Subtitle: move transition to text element (fade + slide-up), not the background overlay. Backdrop appears/disappears instantly. - Fix connection_mode: was silently ignored at node level; use Map variant syntax (in: {node, mode}) so best_effort is actually applied. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * feat(sample): switch subtitle demo from Whisper to Parakeet TDT Parakeet TDT is ~10x faster than Whisper on CPU with competitive accuracy. Updates the subtitle pipeline to use plugin::native::parakeet with the INT8 model and built-in VAD. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * misc improvements * misc improvements * fix: address Devin Review findings (template loop, profiling, parakeet lang) - Fix apply_template infinite loop when replacement text contains {{ field }} patterns by advancing cursor past each substitution - Restore --profile release-lto to profiling build/run justfile targets - Regenerate official-plugins.json to match plugin.yml expected_size_bytes - Read detected language from FFI result instead of hardcoding "en" (Parakeet v3 supports 25 languages) Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style: cargo fmt slint plugin Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * refactor(slint): extract helpers to reduce cognitive complexity Move InstanceState out of slint_thread_main and extract handle_register, handle_render, apply_config_update, and apply_resize into separate functions/methods. Reduces cognitive complexity from 66 to well under the clippy threshold of 50. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(samples): fix VAD model setup and remove duplicate telemetry - Update Requires comment to include download-silero-vad and download-tenvad-models alongside download-parakeet-models so a fresh checkout can run the demo without missing VAD assets. - Remove the stt_telemetry (core::telemetry_out) node since param_bridge::send_params() already emits stt.result telemetry, avoiding duplicate entries in the stream view timeline. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(assets): flush tokio file after multipart upload to prevent truncated reads tokio::fs::File::write_all returns as soon as data is copied to an internal buffer and a blocking write task is spawned — it does NOT wait for the blocking write to complete. When the File is dropped without flushing, the last write may still be in-flight. A subsequent fs::read can then see a truncated file. This caused flaky E2E failures in the compositor-image-overlay upload test: the image crate's into_dimensions() would fail with 'unexpected end of file' because it was parsing a partially-written PNG. The plugin upload handler in server/mod.rs already had this fix; apply the same pattern to all asset upload functions (image, audio, font) in assets.rs and plugin_assets.rs. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * refactor(assets): consolidate duplicate upload streaming logic Extract stream_field_to_file() helper to replace three nearly-identical functions (write_upload_stream_to_disk, write_image_upload_to_disk, write_font_upload_to_disk). The cleanup-on-error pattern (remove partial file) now appears exactly once via an inner async block, instead of being repeated 3-4 times per function. Also fixes a missing flush() in the image upload path — the audio and font paths had the flush but image did not, which could cause the same truncated-read race condition that was fixed for those paths. Signed-off-by: streamkit-devin <devin@streamer45.com> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style(assets): apply rustfmt formatting to stream_field_to_file Signed-off-by: streamkit-devin <devin@streamer45.com> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> --------- Signed-off-by: Devin AI <devin@streamkit.dev> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Signed-off-by: streamkit-devin <devin@streamer45.com> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com>
1 parent 6594851 commit bc40048

38 files changed

+1663
-354
lines changed

apps/skit/src/assets.rs

Lines changed: 74 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -238,56 +238,85 @@ async fn list_assets(
238238
}
239239

240240
/// Stream an uploaded multipart field to disk with size enforcement.
241-
async fn write_upload_stream_to_disk(
241+
///
242+
/// On any error the partially-written file is removed before returning.
243+
/// Callers that need a REUSE license sidecar should create it after this
244+
/// function succeeds.
245+
async fn stream_field_to_file(
242246
mut field: axum::extract::multipart::Field<'_>,
243247
file_path: &std::path::Path,
244-
extension: &str,
248+
max_size: usize,
245249
) -> Result<usize, AssetsError> {
246250
use tokio::fs::OpenOptions;
247251

248-
let mut file = OpenOptions::new()
249-
.create_new(true)
250-
.write(true)
251-
.open(file_path)
252-
.await
253-
.map_err(|e| AssetsError::IoError(format!("Failed to create file: {e}")))?;
254-
255-
let mut total_bytes: usize = 0;
256-
loop {
257-
match field.chunk().await {
258-
Ok(Some(chunk)) => {
259-
total_bytes = total_bytes.saturating_add(chunk.len());
260-
if total_bytes > MAX_AUDIO_FILE_SIZE {
261-
let _ = fs::remove_file(file_path).await;
262-
return Err(AssetsError::FileTooLarge(MAX_AUDIO_FILE_SIZE));
263-
}
252+
let open_result = OpenOptions::new().create_new(true).write(true).open(file_path).await;
264253

265-
if let Err(e) = file.write_all(&chunk).await {
266-
let _ = fs::remove_file(file_path).await;
267-
return Err(AssetsError::IoError(format!("Failed to write file: {e}")));
268-
}
269-
},
270-
Ok(None) => break,
271-
Err(e) => {
272-
let _ = fs::remove_file(file_path).await;
273-
return Err(AssetsError::InvalidRequest(format!(
274-
"Failed to read upload stream: {e}"
275-
)));
276-
},
254+
let mut file = match open_result {
255+
Ok(f) => f,
256+
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
257+
return Err(AssetsError::FileExists(
258+
file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(),
259+
));
260+
},
261+
Err(e) => return Err(AssetsError::IoError(format!("Failed to create file: {e}"))),
262+
};
263+
264+
// Inner block: any error triggers a single cleanup path below.
265+
let result = async {
266+
let mut total_bytes: usize = 0;
267+
loop {
268+
match field.chunk().await {
269+
Ok(Some(chunk)) => {
270+
total_bytes = total_bytes.saturating_add(chunk.len());
271+
if total_bytes > max_size {
272+
return Err(AssetsError::FileTooLarge(max_size));
273+
}
274+
file.write_all(&chunk)
275+
.await
276+
.map_err(|e| AssetsError::IoError(format!("Failed to write file: {e}")))?;
277+
},
278+
Ok(None) => break,
279+
Err(e) => {
280+
return Err(AssetsError::InvalidRequest(format!(
281+
"Failed to read upload stream: {e}"
282+
)));
283+
},
284+
}
277285
}
286+
287+
// Flush pending writes — tokio::fs::File::write_all returns as soon as
288+
// data is copied to an internal buffer and a blocking write is spawned,
289+
// so the last write may still be in-flight when the File is dropped.
290+
file.flush()
291+
.await
292+
.map_err(|e| AssetsError::IoError(format!("Failed to flush file: {e}")))?;
293+
294+
Ok(total_bytes)
278295
}
296+
.await;
279297

280-
// Create default license file (best-effort).
281-
let license_path = file_path.with_extension(format!("{extension}.license"));
282-
// REUSE-IgnoreStart
283-
let default_license =
284-
"SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n";
285-
// REUSE-IgnoreEnd
286-
if let Err(e) = fs::write(&license_path, default_license).await {
287-
warn!("Failed to create license file: {}", e);
298+
if result.is_err() {
299+
let _ = fs::remove_file(file_path).await;
288300
}
289301

290-
Ok(total_bytes)
302+
result
303+
}
304+
305+
/// Create a default REUSE license sidecar next to the uploaded file.
306+
fn create_license_sidecar(
307+
file_path: &std::path::Path,
308+
extension: &str,
309+
) -> impl std::future::Future<Output = ()> + Send + 'static {
310+
let license_path = file_path.with_extension(format!("{extension}.license"));
311+
async move {
312+
// REUSE-IgnoreStart
313+
let default_license =
314+
"SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n";
315+
// REUSE-IgnoreEnd
316+
if let Err(e) = fs::write(&license_path, default_license).await {
317+
warn!("Failed to create license file: {}", e);
318+
}
319+
}
291320
}
292321

293322
/// Build AudioAsset response for uploaded file
@@ -332,7 +361,8 @@ async fn process_upload(
332361
return Err(AssetsError::FileExists(filename));
333362
}
334363

335-
let written_bytes = write_upload_stream_to_disk(field, &file_path, &extension).await?;
364+
let written_bytes = stream_field_to_file(field, &file_path, MAX_AUDIO_FILE_SIZE).await?;
365+
create_license_sidecar(&file_path, &extension).await;
336366

337367
info!("Uploaded audio asset: {}", filename);
338368

@@ -664,55 +694,6 @@ async fn list_image_assets(perms: &RolePermissions) -> Result<Vec<ImageAsset>, A
664694
Ok(all_assets)
665695
}
666696

667-
/// Stream an uploaded multipart image field to disk with size enforcement.
668-
///
669-
/// Uses `create_new(true)` so the call fails atomically if the file already
670-
/// exists, avoiding the TOCTOU race of a separate `exists()` pre-check.
671-
async fn write_image_upload_to_disk(
672-
mut field: axum::extract::multipart::Field<'_>,
673-
file_path: &std::path::Path,
674-
) -> Result<usize, AssetsError> {
675-
use tokio::fs::OpenOptions;
676-
677-
let mut file =
678-
OpenOptions::new().create_new(true).write(true).open(file_path).await.map_err(|e| {
679-
if e.kind() == std::io::ErrorKind::AlreadyExists {
680-
AssetsError::FileExists(
681-
file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(),
682-
)
683-
} else {
684-
AssetsError::IoError(format!("Failed to create file: {e}"))
685-
}
686-
})?;
687-
688-
let mut total_bytes: usize = 0;
689-
loop {
690-
match field.chunk().await {
691-
Ok(Some(chunk)) => {
692-
total_bytes = total_bytes.saturating_add(chunk.len());
693-
if total_bytes > MAX_IMAGE_FILE_SIZE {
694-
let _ = fs::remove_file(file_path).await;
695-
return Err(AssetsError::FileTooLarge(MAX_IMAGE_FILE_SIZE));
696-
}
697-
698-
if let Err(e) = file.write_all(&chunk).await {
699-
let _ = fs::remove_file(file_path).await;
700-
return Err(AssetsError::IoError(format!("Failed to write file: {e}")));
701-
}
702-
},
703-
Ok(None) => break,
704-
Err(e) => {
705-
let _ = fs::remove_file(file_path).await;
706-
return Err(AssetsError::InvalidRequest(format!(
707-
"Failed to read upload stream: {e}"
708-
)));
709-
},
710-
}
711-
}
712-
713-
Ok(total_bytes)
714-
}
715-
716697
/// Core image upload logic after permission check
717698
async fn process_image_upload(
718699
filename: String,
@@ -729,7 +710,7 @@ async fn process_image_upload(
729710

730711
let file_path = user_dir.join(&filename);
731712

732-
let written_bytes = write_image_upload_to_disk(field, &file_path).await?;
713+
let written_bytes = stream_field_to_file(field, &file_path, MAX_IMAGE_FILE_SIZE).await?;
733714

734715
// SVG validation: parse with resvg to check validity and extract dimensions.
735716
// Skip raster decode path entirely for SVGs.
@@ -1209,63 +1190,6 @@ async fn list_font_assets(perms: &RolePermissions) -> Result<Vec<FontAsset>, Ass
12091190
Ok(all_assets)
12101191
}
12111192

1212-
/// Stream an uploaded multipart font field to disk with size enforcement.
1213-
async fn write_font_upload_to_disk(
1214-
mut field: axum::extract::multipart::Field<'_>,
1215-
file_path: &std::path::Path,
1216-
extension: &str,
1217-
) -> Result<usize, AssetsError> {
1218-
use tokio::fs::OpenOptions;
1219-
1220-
let mut file =
1221-
OpenOptions::new().create_new(true).write(true).open(file_path).await.map_err(|e| {
1222-
if e.kind() == std::io::ErrorKind::AlreadyExists {
1223-
AssetsError::FileExists(
1224-
file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(),
1225-
)
1226-
} else {
1227-
AssetsError::IoError(format!("Failed to create file: {e}"))
1228-
}
1229-
})?;
1230-
1231-
let mut total_bytes: usize = 0;
1232-
loop {
1233-
match field.chunk().await {
1234-
Ok(Some(chunk)) => {
1235-
total_bytes = total_bytes.saturating_add(chunk.len());
1236-
if total_bytes > MAX_FONT_FILE_SIZE {
1237-
let _ = fs::remove_file(file_path).await;
1238-
return Err(AssetsError::FileTooLarge(MAX_FONT_FILE_SIZE));
1239-
}
1240-
1241-
if let Err(e) = file.write_all(&chunk).await {
1242-
let _ = fs::remove_file(file_path).await;
1243-
return Err(AssetsError::IoError(format!("Failed to write file: {e}")));
1244-
}
1245-
},
1246-
Ok(None) => break,
1247-
Err(e) => {
1248-
let _ = fs::remove_file(file_path).await;
1249-
return Err(AssetsError::InvalidRequest(format!(
1250-
"Failed to read upload stream: {e}"
1251-
)));
1252-
},
1253-
}
1254-
}
1255-
1256-
// Create default license file (best-effort).
1257-
let license_path = file_path.with_extension(format!("{extension}.license"));
1258-
// REUSE-IgnoreStart
1259-
let default_license =
1260-
"SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n";
1261-
// REUSE-IgnoreEnd
1262-
if let Err(e) = fs::write(&license_path, default_license).await {
1263-
warn!("Failed to create license file: {}", e);
1264-
}
1265-
1266-
Ok(total_bytes)
1267-
}
1268-
12691193
/// Core font upload logic after permission check
12701194
async fn process_font_upload(
12711195
filename: String,
@@ -1281,7 +1205,8 @@ async fn process_font_upload(
12811205

12821206
let file_path = user_dir.join(&filename);
12831207

1284-
let written_bytes = write_font_upload_to_disk(field, &file_path, &extension).await?;
1208+
let written_bytes = stream_field_to_file(field, &file_path, MAX_FONT_FILE_SIZE).await?;
1209+
create_license_sidecar(&file_path, &extension).await;
12851210

12861211
// Validate that the uploaded file is actually a font by checking magic bytes.
12871212
let header = match fs::read(&file_path).await {
@@ -1306,7 +1231,7 @@ async fn process_font_upload(
13061231

13071232
if !is_valid_font {
13081233
let _ = fs::remove_file(&file_path).await;
1309-
// Also remove the license sidecar created by write_font_upload_to_disk.
1234+
// Also remove the license sidecar created by create_license_sidecar.
13101235
let license_path = file_path.with_extension(format!("{extension}.license"));
13111236
let _ = fs::remove_file(&license_path).await;
13121237
return Err(AssetsError::InvalidFormat(

apps/skit/src/plugin_assets.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,14 @@ async fn write_upload_to_disk(
836836
}
837837
}
838838

839+
// Flush pending writes — tokio::fs::File::write_all returns as soon as
840+
// data is copied to an internal buffer and a blocking write is spawned,
841+
// so the last write may still be in-flight when the File is dropped.
842+
if let Err(e) = file.flush().await {
843+
let _ = fs::remove_file(file_path).await;
844+
return Err(PluginAssetError::IoError(format!("Failed to flush file: {e}")));
845+
}
846+
839847
Ok(total_bytes)
840848
}
841849

crates/core/src/node.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,15 @@ pub struct NodeContext {
333333
/// Channel for the node to emit structured view data for frontend consumption.
334334
/// Like stats_tx, this is optional and best-effort.
335335
pub view_data_tx: Option<mpsc::Sender<NodeViewDataUpdate>>,
336+
/// Optional sender for engine-level control messages.
337+
///
338+
/// Allows nodes to send [`EngineControlMessage`] to the engine actor,
339+
/// enabling cross-node control (e.g. sending `UpdateParams` to a sibling
340+
/// node by name via [`EngineControlMessage::TuneNode`]).
341+
///
342+
/// Only provided in dynamic pipelines. `None` in oneshot/static
343+
/// pipelines where the graph is fixed at build time.
344+
pub engine_control_tx: Option<mpsc::Sender<crate::control::EngineControlMessage>>,
336345
}
337346

338347
impl NodeContext {
@@ -348,6 +357,36 @@ impl NodeContext {
348357
})
349358
}
350359

360+
/// Send an `UpdateParams` control message to a sibling node by name.
361+
///
362+
/// This is a convenience wrapper around [`EngineControlMessage::TuneNode`]
363+
/// that routes through the engine actor's control channel — the same path
364+
/// the WebSocket/REST API uses.
365+
///
366+
/// Only works in dynamic pipelines (where `engine_control_tx` is `Some`).
367+
///
368+
/// # Errors
369+
///
370+
/// Returns a [`StreamKitError::Runtime`] if the engine control channel is
371+
/// unavailable (oneshot pipeline) or closed (engine shut down).
372+
pub async fn tune_sibling(
373+
&self,
374+
target_node_id: &str,
375+
params: serde_json::Value,
376+
) -> Result<(), StreamKitError> {
377+
let tx = self.engine_control_tx.as_ref().ok_or_else(|| {
378+
StreamKitError::Runtime(
379+
"engine_control_tx not available (oneshot pipeline?)".to_string(),
380+
)
381+
})?;
382+
tx.send(crate::control::EngineControlMessage::TuneNode {
383+
node_id: target_node_id.to_string(),
384+
message: crate::control::NodeControlMessage::UpdateParams(params),
385+
})
386+
.await
387+
.map_err(|_| StreamKitError::Runtime("engine control channel closed".to_string()))
388+
}
389+
351390
/// Receives a packet from the given receiver, respecting the cancellation token if present.
352391
/// Returns None if cancelled or if the channel is closed.
353392
///

crates/engine/src/dynamic_actor.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ pub struct DynamicEngine {
152152
pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter<u64>,
153153
// Node state metric (1=running, 0=not running)
154154
pub(super) node_state_gauge: opentelemetry::metrics::Gauge<u64>,
155+
/// Clone of the engine's own control sender, handed to every node via
156+
/// [`NodeContext::engine_control_tx`] so that nodes can emit
157+
/// [`EngineControlMessage::TuneNode`] to sibling nodes.
158+
pub(super) engine_control_tx: mpsc::Sender<EngineControlMessage>,
155159
/// Sender half of the internal channel for background node creation results.
156160
/// Cloned into each spawned creation task.
157161
pub(super) node_created_tx: mpsc::Sender<NodeCreatedEvent>,
@@ -691,6 +695,7 @@ impl DynamicEngine {
691695
video_pool: Some(self.video_pool.clone()),
692696
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
693697
view_data_tx: Some(channels.view_data.clone()),
698+
engine_control_tx: Some(self.engine_control_tx.clone()),
694699
};
695700

696701
// 5. Spawn Node

crates/engine/src/graph_builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,8 @@ pub async fn wire_and_spawn_graph(
380380
audio_pool: audio_pool.clone(),
381381
video_pool: video_pool.clone(),
382382
pipeline_mode: streamkit_core::PipelineMode::Oneshot,
383-
view_data_tx: None, // Stateless pipelines don't emit view data
383+
view_data_tx: None, // Stateless pipelines don't emit view data
384+
engine_control_tx: None, // Stateless pipelines don't support cross-node control
384385
};
385386

386387
tracing::debug!("Starting task for node '{}'", name);

crates/engine/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ impl Engine {
155155
#[cfg(feature = "dynamic")]
156156
pub fn start_dynamic_actor(&self, config: DynamicEngineConfig) -> DynamicEngineHandle {
157157
let (control_tx, control_rx) = mpsc::channel(DEFAULT_ENGINE_CONTROL_CAPACITY);
158+
let engine_control_tx = control_tx.clone();
158159
let (query_tx, query_rx) = mpsc::channel(DEFAULT_ENGINE_QUERY_CAPACITY);
159160

160161
let node_input_capacity = config.node_input_capacity.unwrap_or(DEFAULT_NODE_INPUT_CAPACITY);
@@ -239,6 +240,7 @@ impl Engine {
239240
.u64_gauge("node.state")
240241
.with_description("Node state (1=running, 0=stopped/failed)")
241242
.build(),
243+
engine_control_tx,
242244
node_created_tx: nc_tx,
243245
node_created_rx: nc_rx,
244246
pending_connections: Vec::new(),

0 commit comments

Comments
 (0)