diff --git a/apps/skit/src/assets.rs b/apps/skit/src/assets.rs index 9ebf5af1..baf57cb3 100644 --- a/apps/skit/src/assets.rs +++ b/apps/skit/src/assets.rs @@ -238,56 +238,85 @@ async fn list_assets( } /// Stream an uploaded multipart field to disk with size enforcement. -async fn write_upload_stream_to_disk( +/// +/// On any error the partially-written file is removed before returning. +/// Callers that need a REUSE license sidecar should create it after this +/// function succeeds. +async fn stream_field_to_file( mut field: axum::extract::multipart::Field<'_>, file_path: &std::path::Path, - extension: &str, + max_size: usize, ) -> Result { use tokio::fs::OpenOptions; - let mut file = OpenOptions::new() - .create_new(true) - .write(true) - .open(file_path) - .await - .map_err(|e| AssetsError::IoError(format!("Failed to create file: {e}")))?; - - let mut total_bytes: usize = 0; - loop { - match field.chunk().await { - Ok(Some(chunk)) => { - total_bytes = total_bytes.saturating_add(chunk.len()); - if total_bytes > MAX_AUDIO_FILE_SIZE { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::FileTooLarge(MAX_AUDIO_FILE_SIZE)); - } + let open_result = OpenOptions::new().create_new(true).write(true).open(file_path).await; - if let Err(e) = file.write_all(&chunk).await { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::IoError(format!("Failed to write file: {e}"))); - } - }, - Ok(None) => break, - Err(e) => { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::InvalidRequest(format!( - "Failed to read upload stream: {e}" - ))); - }, + let mut file = match open_result { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + return Err(AssetsError::FileExists( + file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(), + )); + }, + Err(e) => return Err(AssetsError::IoError(format!("Failed to create file: {e}"))), + }; + + // Inner block: any error triggers a single cleanup path below. + let result = async { + let mut total_bytes: usize = 0; + loop { + match field.chunk().await { + Ok(Some(chunk)) => { + total_bytes = total_bytes.saturating_add(chunk.len()); + if total_bytes > max_size { + return Err(AssetsError::FileTooLarge(max_size)); + } + file.write_all(&chunk) + .await + .map_err(|e| AssetsError::IoError(format!("Failed to write file: {e}")))?; + }, + Ok(None) => break, + Err(e) => { + return Err(AssetsError::InvalidRequest(format!( + "Failed to read upload stream: {e}" + ))); + }, + } } + + // Flush pending writes — tokio::fs::File::write_all returns as soon as + // data is copied to an internal buffer and a blocking write is spawned, + // so the last write may still be in-flight when the File is dropped. + file.flush() + .await + .map_err(|e| AssetsError::IoError(format!("Failed to flush file: {e}")))?; + + Ok(total_bytes) } + .await; - // Create default license file (best-effort). - let license_path = file_path.with_extension(format!("{extension}.license")); - // REUSE-IgnoreStart - let default_license = - "SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n"; - // REUSE-IgnoreEnd - if let Err(e) = fs::write(&license_path, default_license).await { - warn!("Failed to create license file: {}", e); + if result.is_err() { + let _ = fs::remove_file(file_path).await; } - Ok(total_bytes) + result +} + +/// Create a default REUSE license sidecar next to the uploaded file. +fn create_license_sidecar( + file_path: &std::path::Path, + extension: &str, +) -> impl std::future::Future + Send + 'static { + let license_path = file_path.with_extension(format!("{extension}.license")); + async move { + // REUSE-IgnoreStart + let default_license = + "SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n"; + // REUSE-IgnoreEnd + if let Err(e) = fs::write(&license_path, default_license).await { + warn!("Failed to create license file: {}", e); + } + } } /// Build AudioAsset response for uploaded file @@ -332,7 +361,8 @@ async fn process_upload( return Err(AssetsError::FileExists(filename)); } - let written_bytes = write_upload_stream_to_disk(field, &file_path, &extension).await?; + let written_bytes = stream_field_to_file(field, &file_path, MAX_AUDIO_FILE_SIZE).await?; + create_license_sidecar(&file_path, &extension).await; info!("Uploaded audio asset: {}", filename); @@ -664,55 +694,6 @@ async fn list_image_assets(perms: &RolePermissions) -> Result, A Ok(all_assets) } -/// Stream an uploaded multipart image field to disk with size enforcement. -/// -/// Uses `create_new(true)` so the call fails atomically if the file already -/// exists, avoiding the TOCTOU race of a separate `exists()` pre-check. -async fn write_image_upload_to_disk( - mut field: axum::extract::multipart::Field<'_>, - file_path: &std::path::Path, -) -> Result { - use tokio::fs::OpenOptions; - - let mut file = - OpenOptions::new().create_new(true).write(true).open(file_path).await.map_err(|e| { - if e.kind() == std::io::ErrorKind::AlreadyExists { - AssetsError::FileExists( - file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(), - ) - } else { - AssetsError::IoError(format!("Failed to create file: {e}")) - } - })?; - - let mut total_bytes: usize = 0; - loop { - match field.chunk().await { - Ok(Some(chunk)) => { - total_bytes = total_bytes.saturating_add(chunk.len()); - if total_bytes > MAX_IMAGE_FILE_SIZE { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::FileTooLarge(MAX_IMAGE_FILE_SIZE)); - } - - if let Err(e) = file.write_all(&chunk).await { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::IoError(format!("Failed to write file: {e}"))); - } - }, - Ok(None) => break, - Err(e) => { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::InvalidRequest(format!( - "Failed to read upload stream: {e}" - ))); - }, - } - } - - Ok(total_bytes) -} - /// Core image upload logic after permission check async fn process_image_upload( filename: String, @@ -729,7 +710,7 @@ async fn process_image_upload( let file_path = user_dir.join(&filename); - let written_bytes = write_image_upload_to_disk(field, &file_path).await?; + let written_bytes = stream_field_to_file(field, &file_path, MAX_IMAGE_FILE_SIZE).await?; // SVG validation: parse with resvg to check validity and extract dimensions. // Skip raster decode path entirely for SVGs. @@ -1209,63 +1190,6 @@ async fn list_font_assets(perms: &RolePermissions) -> Result, Ass Ok(all_assets) } -/// Stream an uploaded multipart font field to disk with size enforcement. -async fn write_font_upload_to_disk( - mut field: axum::extract::multipart::Field<'_>, - file_path: &std::path::Path, - extension: &str, -) -> Result { - use tokio::fs::OpenOptions; - - let mut file = - OpenOptions::new().create_new(true).write(true).open(file_path).await.map_err(|e| { - if e.kind() == std::io::ErrorKind::AlreadyExists { - AssetsError::FileExists( - file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(), - ) - } else { - AssetsError::IoError(format!("Failed to create file: {e}")) - } - })?; - - let mut total_bytes: usize = 0; - loop { - match field.chunk().await { - Ok(Some(chunk)) => { - total_bytes = total_bytes.saturating_add(chunk.len()); - if total_bytes > MAX_FONT_FILE_SIZE { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::FileTooLarge(MAX_FONT_FILE_SIZE)); - } - - if let Err(e) = file.write_all(&chunk).await { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::IoError(format!("Failed to write file: {e}"))); - } - }, - Ok(None) => break, - Err(e) => { - let _ = fs::remove_file(file_path).await; - return Err(AssetsError::InvalidRequest(format!( - "Failed to read upload stream: {e}" - ))); - }, - } - } - - // Create default license file (best-effort). - let license_path = file_path.with_extension(format!("{extension}.license")); - // REUSE-IgnoreStart - let default_license = - "SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n"; - // REUSE-IgnoreEnd - if let Err(e) = fs::write(&license_path, default_license).await { - warn!("Failed to create license file: {}", e); - } - - Ok(total_bytes) -} - /// Core font upload logic after permission check async fn process_font_upload( filename: String, @@ -1281,7 +1205,8 @@ async fn process_font_upload( let file_path = user_dir.join(&filename); - let written_bytes = write_font_upload_to_disk(field, &file_path, &extension).await?; + let written_bytes = stream_field_to_file(field, &file_path, MAX_FONT_FILE_SIZE).await?; + create_license_sidecar(&file_path, &extension).await; // Validate that the uploaded file is actually a font by checking magic bytes. let header = match fs::read(&file_path).await { @@ -1306,7 +1231,7 @@ async fn process_font_upload( if !is_valid_font { let _ = fs::remove_file(&file_path).await; - // Also remove the license sidecar created by write_font_upload_to_disk. + // Also remove the license sidecar created by create_license_sidecar. let license_path = file_path.with_extension(format!("{extension}.license")); let _ = fs::remove_file(&license_path).await; return Err(AssetsError::InvalidFormat( diff --git a/apps/skit/src/plugin_assets.rs b/apps/skit/src/plugin_assets.rs index c749b259..f57f888f 100644 --- a/apps/skit/src/plugin_assets.rs +++ b/apps/skit/src/plugin_assets.rs @@ -836,6 +836,14 @@ async fn write_upload_to_disk( } } + // Flush pending writes — tokio::fs::File::write_all returns as soon as + // data is copied to an internal buffer and a blocking write is spawned, + // so the last write may still be in-flight when the File is dropped. + if let Err(e) = file.flush().await { + let _ = fs::remove_file(file_path).await; + return Err(PluginAssetError::IoError(format!("Failed to flush file: {e}"))); + } + Ok(total_bytes) } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index df438925..e88d9339 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -333,6 +333,15 @@ pub struct NodeContext { /// Channel for the node to emit structured view data for frontend consumption. /// Like stats_tx, this is optional and best-effort. pub view_data_tx: Option>, + /// Optional sender for engine-level control messages. + /// + /// Allows nodes to send [`EngineControlMessage`] to the engine actor, + /// enabling cross-node control (e.g. sending `UpdateParams` to a sibling + /// node by name via [`EngineControlMessage::TuneNode`]). + /// + /// Only provided in dynamic pipelines. `None` in oneshot/static + /// pipelines where the graph is fixed at build time. + pub engine_control_tx: Option>, } impl NodeContext { @@ -348,6 +357,36 @@ impl NodeContext { }) } + /// Send an `UpdateParams` control message to a sibling node by name. + /// + /// This is a convenience wrapper around [`EngineControlMessage::TuneNode`] + /// that routes through the engine actor's control channel — the same path + /// the WebSocket/REST API uses. + /// + /// Only works in dynamic pipelines (where `engine_control_tx` is `Some`). + /// + /// # Errors + /// + /// Returns a [`StreamKitError::Runtime`] if the engine control channel is + /// unavailable (oneshot pipeline) or closed (engine shut down). + pub async fn tune_sibling( + &self, + target_node_id: &str, + params: serde_json::Value, + ) -> Result<(), StreamKitError> { + let tx = self.engine_control_tx.as_ref().ok_or_else(|| { + StreamKitError::Runtime( + "engine_control_tx not available (oneshot pipeline?)".to_string(), + ) + })?; + tx.send(crate::control::EngineControlMessage::TuneNode { + node_id: target_node_id.to_string(), + message: crate::control::NodeControlMessage::UpdateParams(params), + }) + .await + .map_err(|_| StreamKitError::Runtime("engine control channel closed".to_string())) + } + /// Receives a packet from the given receiver, respecting the cancellation token if present. /// Returns None if cancelled or if the channel is closed. /// diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index 97f076e0..59d0e170 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -152,6 +152,10 @@ pub struct DynamicEngine { pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter, // Node state metric (1=running, 0=not running) pub(super) node_state_gauge: opentelemetry::metrics::Gauge, + /// Clone of the engine's own control sender, handed to every node via + /// [`NodeContext::engine_control_tx`] so that nodes can emit + /// [`EngineControlMessage::TuneNode`] to sibling nodes. + pub(super) engine_control_tx: mpsc::Sender, /// Sender half of the internal channel for background node creation results. /// Cloned into each spawned creation task. pub(super) node_created_tx: mpsc::Sender, @@ -691,6 +695,7 @@ impl DynamicEngine { video_pool: Some(self.video_pool.clone()), pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: Some(channels.view_data.clone()), + engine_control_tx: Some(self.engine_control_tx.clone()), }; // 5. Spawn Node diff --git a/crates/engine/src/graph_builder.rs b/crates/engine/src/graph_builder.rs index a3c542a6..9d3709ea 100644 --- a/crates/engine/src/graph_builder.rs +++ b/crates/engine/src/graph_builder.rs @@ -380,7 +380,8 @@ pub async fn wire_and_spawn_graph( audio_pool: audio_pool.clone(), video_pool: video_pool.clone(), pipeline_mode: streamkit_core::PipelineMode::Oneshot, - view_data_tx: None, // Stateless pipelines don't emit view data + view_data_tx: None, // Stateless pipelines don't emit view data + engine_control_tx: None, // Stateless pipelines don't support cross-node control }; tracing::debug!("Starting task for node '{}'", name); diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index f975c421..b1ff839d 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -155,6 +155,7 @@ impl Engine { #[cfg(feature = "dynamic")] pub fn start_dynamic_actor(&self, config: DynamicEngineConfig) -> DynamicEngineHandle { let (control_tx, control_rx) = mpsc::channel(DEFAULT_ENGINE_CONTROL_CAPACITY); + let engine_control_tx = control_tx.clone(); let (query_tx, query_rx) = mpsc::channel(DEFAULT_ENGINE_QUERY_CAPACITY); let node_input_capacity = config.node_input_capacity.unwrap_or(DEFAULT_NODE_INPUT_CAPACITY); @@ -239,6 +240,7 @@ impl Engine { .u64_gauge("node.state") .with_description("Node state (1=running, 0=stopped/failed)") .build(), + engine_control_tx, node_created_tx: nc_tx, node_created_rx: nc_rx, pending_connections: Vec::new(), diff --git a/crates/engine/src/tests/connection_types.rs b/crates/engine/src/tests/connection_types.rs index ddb962bf..ad90945a 100644 --- a/crates/engine/src/tests/connection_types.rs +++ b/crates/engine/src/tests/connection_types.rs @@ -18,6 +18,7 @@ use tokio::sync::mpsc; fn create_test_engine() -> DynamicEngine { let (control_tx, control_rx) = mpsc::channel(32); let (query_tx, query_rx) = mpsc::channel(32); + let engine_control_tx = control_tx.clone(); drop(control_tx); drop(query_tx); @@ -59,6 +60,7 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + engine_control_tx, node_created_tx, node_created_rx, pending_connections: Vec::new(), diff --git a/crates/engine/src/tests/pipeline_activation.rs b/crates/engine/src/tests/pipeline_activation.rs index c7e78e3a..d79a9c5b 100644 --- a/crates/engine/src/tests/pipeline_activation.rs +++ b/crates/engine/src/tests/pipeline_activation.rs @@ -19,6 +19,7 @@ use tokio::sync::mpsc; fn create_test_engine() -> DynamicEngine { let (control_tx, control_rx) = mpsc::channel(32); let (query_tx, query_rx) = mpsc::channel(32); + let engine_control_tx = control_tx.clone(); drop(control_tx); drop(query_tx); @@ -60,6 +61,7 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + engine_control_tx, node_created_tx: nc_tx, node_created_rx: nc_rx, pending_connections: Vec::new(), diff --git a/crates/nodes/src/audio/filters/resampler.rs b/crates/nodes/src/audio/filters/resampler.rs index 0b093328..4935bfac 100644 --- a/crates/nodes/src/audio/filters/resampler.rs +++ b/crates/nodes/src/audio/filters/resampler.rs @@ -776,6 +776,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create node that downsamples from 48kHz to 24kHz @@ -856,6 +857,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; let config = AudioResamplerConfig { diff --git a/crates/nodes/src/core/file_read.rs b/crates/nodes/src/core/file_read.rs index 1f1a6844..e05d76df 100644 --- a/crates/nodes/src/core/file_read.rs +++ b/crates/nodes/src/core/file_read.rs @@ -246,6 +246,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node diff --git a/crates/nodes/src/core/file_write.rs b/crates/nodes/src/core/file_write.rs index eab8f075..4b301540 100644 --- a/crates/nodes/src/core/file_write.rs +++ b/crates/nodes/src/core/file_write.rs @@ -208,6 +208,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node @@ -292,6 +293,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node with small chunk size for testing diff --git a/crates/nodes/src/core/mod.rs b/crates/nodes/src/core/mod.rs index 98c0e0a6..2d75426e 100644 --- a/crates/nodes/src/core/mod.rs +++ b/crates/nodes/src/core/mod.rs @@ -13,6 +13,7 @@ pub mod json_serialize; #[cfg(feature = "object_store")] pub mod object_store_write; pub mod pacer; +pub mod param_bridge; mod passthrough; #[cfg(feature = "script")] pub mod script; @@ -193,6 +194,9 @@ pub fn register_core_nodes(registry: &mut NodeRegistry, constraints: &GlobalNode // --- Register TelemetryOut Node --- telemetry_out::register(registry); + // --- Register ParamBridge Node --- + param_bridge::register(registry); + // --- Register ObjectStoreWriteNode --- #[cfg(feature = "object_store")] { diff --git a/crates/nodes/src/core/object_store_write.rs b/crates/nodes/src/core/object_store_write.rs index 680625eb..aa8a61c4 100644 --- a/crates/nodes/src/core/object_store_write.rs +++ b/crates/nodes/src/core/object_store_write.rs @@ -799,6 +799,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // No credentials provided — should fail during init diff --git a/crates/nodes/src/core/pacer.rs b/crates/nodes/src/core/pacer.rs index 94a8a6ce..2f8a75fe 100644 --- a/crates/nodes/src/core/pacer.rs +++ b/crates/nodes/src/core/pacer.rs @@ -507,6 +507,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create node with very fast speed to minimize test time diff --git a/crates/nodes/src/core/param_bridge.rs b/crates/nodes/src/core/param_bridge.rs new file mode 100644 index 00000000..40e7bd8d --- /dev/null +++ b/crates/nodes/src/core/param_bridge.rs @@ -0,0 +1,770 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Parameter bridge node +//! +//! Accepts packets on its input and converts them into `UpdateParams` control +//! messages sent to a configured sibling node via +//! [`NodeContext::tune_sibling()`]. This enables cross-node control within the +//! pipeline graph — the same mechanism the WebSocket/REST API uses, but +//! initiated from inside the data flow. +//! +//! Three mapping modes are supported: +//! +//! - **Auto** — smart per-packet-type mapping (e.g. `Transcription.text` → +//! `{ "properties": { "text": "..." } }`). +//! - **Template** — a user-supplied JSON template with `{{ field }}` placeholders +//! replaced by values extracted from the incoming packet. +//! - **Raw** — forward the packet payload as-is (useful after a `core::script` +//! node that already produced the desired JSON shape). +//! +//! This is a terminal node (no output pins) and is designed for `best_effort` +//! side branches so it never stalls the main data flow. + +use async_trait::async_trait; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use streamkit_core::control::NodeControlMessage; +use streamkit_core::telemetry::TelemetryEmitter; +use streamkit_core::types::{Packet, PacketType}; +use streamkit_core::{ + state_helpers, InputPin, NodeContext, OutputPin, PinCardinality, ProcessorNode, StreamKitError, +}; + +/// How the bridge maps incoming packets to `UpdateParams` JSON. +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum MappingMode { + /// Smart per-packet-type mapping. + /// + /// `Transcription` and `Text` packets are wrapped in + /// `{ "properties": { "text": "..." } }` — a shape that targets Slint + /// plugin nodes out of the box. `Custom` packets forward their `data` + /// field as-is (assumed to already be the correct `UpdateParams` shape). + /// + /// If you need a different output shape (e.g. targeting a compositor's + /// `text_overlays`), use `template` mode instead. + #[default] + Auto, + /// User-provided JSON template with `{{ text }}` placeholders. + Template, + /// Forward the extracted payload as-is (no transformation). + Raw, +} + +/// Configuration for the `core::param_bridge` node. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct ParamBridgeConfig { + /// The `node_id` of the sibling node to send `UpdateParams` to. + pub target_node: String, + + /// Mapping strategy. + #[serde(default)] + pub mode: MappingMode, + + /// JSON template used when `mode` is `template`. + /// + /// Placeholders like `{{ text }}` (or `{{text}}`) are replaced with values + /// extracted from the incoming packet. + /// + /// Currently only `{{ text }}` is supported. Future extensions could add + /// `{{ language }}`, `{{ confidence }}`, or arbitrary field paths. + #[serde(default)] + pub template: Option, + + /// Optional debounce window in milliseconds. + /// + /// When set, rapid `UpdateParams` messages are coalesced: only the most + /// recent value is sent after the window expires. This is useful for + /// targets like subtitles where intermediate transcription segments are + /// superseded by newer ones. + #[serde(default)] + pub debounce_ms: Option, +} + +pub struct ParamBridgeNode { + config: ParamBridgeConfig, +} + +impl ParamBridgeNode { + /// Creates a new `ParamBridgeNode` from configuration. + /// + /// # Errors + /// + /// Returns an error if the configuration parameters cannot be parsed. + pub fn new(params: Option<&serde_json::Value>) -> Result { + let config: ParamBridgeConfig = if let Some(p) = params { + serde_json::from_value(p.clone()) + .map_err(|e| StreamKitError::Configuration(format!("Invalid config: {e}")))? + } else { + return Err(StreamKitError::Configuration( + "param_bridge requires at least `target_node` in params".to_string(), + )); + }; + + if matches!(config.mode, MappingMode::Template) && config.template.is_none() { + return Err(StreamKitError::Configuration( + "param_bridge: `template` is required when mode is `template`".to_string(), + )); + } + + Ok(Self { config }) + } + + pub fn input_pins() -> Vec { + vec![InputPin { + name: "in".to_string(), + accepts_types: vec![PacketType::Any], + cardinality: PinCardinality::One, + }] + } +} + +/// Extract the text content from a packet (for auto/template modes). +fn extract_text(packet: &Packet) -> Option { + match packet { + Packet::Transcription(t) => Some(t.text.clone()), + Packet::Text(t) => Some(t.to_string()), + _ => None, + } +} + +/// Build `UpdateParams` JSON using the auto-mapping strategy. +fn auto_map(packet: &Packet) -> Option { + match packet { + Packet::Transcription(t) => Some(serde_json::json!({ "properties": { "text": t.text } })), + Packet::Text(t) => Some(serde_json::json!({ "properties": { "text": t.as_ref() } })), + Packet::Custom(c) => Some(c.data.clone()), + _ => { + tracing::debug!(packet_type = %packet_type_label(packet), "param_bridge auto: unsupported packet type, skipping"); + None + }, + } +} + +/// Replace `{{ field }}` placeholders in a JSON value tree using values +/// from a context object. +/// +/// When a string value consists entirely of a single placeholder +/// (e.g. `"{{ is_speech }}"`) the raw JSON value from the context is +/// substituted — preserving booleans, numbers, and nulls. When the +/// placeholder appears inside a longer string (e.g. +/// `"Hello {{ name }}"`) the context value is stringified. +/// +/// Transcription and Text packets produce a context with a single +/// `text` key. Custom packets use their full JSON `.data` object. +fn apply_template(template: &JsonValue, ctx: &JsonValue) -> JsonValue { + match template { + JsonValue::String(s) => { + // Fast path: check if the entire string is a single {{ field }}. + let trimmed = s.trim(); + if let Some(field) = parse_sole_placeholder(trimmed) { + if let Some(val) = lookup_ctx(ctx, field) { + return val.clone(); + } + } + // General path: replace all {{ field }} occurrences as strings. + // We track a cursor to advance past each replacement so that + // placeholders inside substituted text are never re-scanned + // (prevents infinite loops when replacement contains `{{ … }}`). + let mut result = s.clone(); + let mut cursor = 0; + while cursor < result.len() { + let Some(start) = result[cursor..].find("{{").map(|i| cursor + i) else { + break; + }; + let Some(end) = result[start..].find("}}") else { break }; + let end = start + end + 2; + let field = result[start + 2..end - 2].trim(); + let replacement = lookup_ctx(ctx, field).map_or_else(String::new, |v| match v { + JsonValue::String(s) => s.clone(), + other => other.to_string(), + }); + let replacement_len = replacement.len(); + result.replace_range(start..end, &replacement); + cursor = start + replacement_len; + } + JsonValue::String(result) + }, + JsonValue::Array(arr) => { + JsonValue::Array(arr.iter().map(|v| apply_template(v, ctx)).collect()) + }, + JsonValue::Object(map) => JsonValue::Object( + map.iter().map(|(k, v)| (k.clone(), apply_template(v, ctx))).collect(), + ), + other => other.clone(), + } +} + +/// If the string is exactly `{{ field }}` (or `{{field}}`), return the +/// field name; otherwise `None`. +fn parse_sole_placeholder(s: &str) -> Option<&str> { + let s = s.strip_prefix("{{")?; + let s = s.strip_suffix("}}")?; + // Ensure there are no nested braces. + if s.contains("{{") || s.contains("}}") { + return None; + } + Some(s.trim()) +} + +/// Look up a field name in a JSON context value. +fn lookup_ctx<'a>(ctx: &'a JsonValue, field: &str) -> Option<&'a JsonValue> { + match ctx { + JsonValue::Object(map) => map.get(field), + _ => None, + } +} + +/// Extract the raw JSON payload from a packet (for raw mode). +/// +/// **Note:** `Transcription` packets serialize the full `TranscriptionData` +/// struct (including per-segment timing and confidence). For transcriptions +/// with many segments this can produce a non-trivial JSON tree — prefer +/// `auto` or `template` mode for the subtitle use case. +fn raw_payload(packet: &Packet) -> Option { + match packet { + Packet::Custom(c) => Some(c.data.clone()), + Packet::Transcription(t) => serde_json::to_value(t.as_ref()).ok(), + Packet::Text(t) => Some(serde_json::json!({ "text": t.as_ref() })), + _ => { + tracing::debug!(packet_type = %packet_type_label(packet), "param_bridge raw: unsupported packet type, skipping"); + None + }, + } +} + +const fn packet_type_label(packet: &Packet) -> &'static str { + match packet { + Packet::Audio(_) => "Audio", + Packet::Video(_) => "Video", + Packet::Text(_) => "Text", + Packet::Transcription(_) => "Transcription", + Packet::Custom(_) => "Custom", + Packet::Binary { .. } => "Binary", + } +} + +#[async_trait] +impl ProcessorNode for ParamBridgeNode { + fn input_pins(&self) -> Vec { + Self::input_pins() + } + + fn output_pins(&self) -> Vec { + vec![] + } + + async fn run(self: Box, mut context: NodeContext) -> Result<(), StreamKitError> { + let node_id = context.output_sender.node_name().to_string(); + let target = &self.config.target_node; + + state_helpers::emit_initializing(&context.state_tx, &node_id); + + if context.engine_control_tx.is_none() { + tracing::error!( + node = %node_id, + "param_bridge requires engine_control_tx (only available in dynamic pipelines)" + ); + state_helpers::emit_failed( + &context.state_tx, + &node_id, + "engine_control_tx not available (oneshot pipeline?)", + ); + return Err(StreamKitError::Runtime( + "engine_control_tx not available (oneshot pipeline?)".to_string(), + )); + } + + let telemetry = TelemetryEmitter::new( + node_id.clone(), + context.session_id.clone(), + context.telemetry_tx.clone(), + ); + + // Take control_rx out of context so we can select on it alongside + // recv_with_cancellation (which borrows context immutably). The + // dummy channel is a one-time allocation that is never read — + // other nodes avoid this because they don't use + // recv_with_cancellation. + let mut control_rx = { + let (_, rx) = tokio::sync::mpsc::channel(1); + std::mem::replace(&mut context.control_rx, rx) + }; + + let mut input_rx = context.take_input("in")?; + state_helpers::emit_running(&context.state_tx, &node_id); + + let debounce = self.config.debounce_ms.map(tokio::time::Duration::from_millis); + + tracing::info!( + node = %node_id, + target_node = %target, + mode = ?self.config.mode, + debounce_ms = ?self.config.debounce_ms, + "param_bridge started" + ); + + // When debouncing is enabled we store the most recent params (and the + // pre-mapping text preview for telemetry) and only send after the + // window elapses without a new packet arriving. + let mut pending_params: Option<(JsonValue, Option)> = None; + + // Dedup: skip UpdateParams that are identical to the last-sent value. + // This avoids redundant Slint re-renders when Whisper emits duplicate + // segments during VAD boundary refinement. + let mut last_sent: Option = None; + let sleep = tokio::time::sleep(tokio::time::Duration::MAX); + tokio::pin!(sleep); + + loop { + tokio::select! { + biased; + + Some(ctrl) = control_rx.recv() => { + match ctrl { + NodeControlMessage::Shutdown => { + tracing::info!(node = %node_id, "param_bridge received shutdown"); + break; + }, + NodeControlMessage::UpdateParams(_) | NodeControlMessage::Start => {}, + } + } + + packet = context.recv_with_cancellation(&mut input_rx) => { + let Some(packet) = packet else { + break; + }; + + // Extract text preview for telemetry — done before mapping + // so it's independent of the target-specific JSON shape. + let text_preview = extract_text(&packet); + + let params = match &self.config.mode { + MappingMode::Auto => auto_map(&packet), + MappingMode::Template => { + // Build a context object for template substitution. + // Text-bearing packets get a `{ "text": "..." }` + // context; Custom packets use their full JSON data. + let ctx = if let Some(ref text) = text_preview { + serde_json::json!({ "text": text }) + } else if let Packet::Custom(c) = &packet { + c.data.clone() + } else { + tracing::debug!(packet_type = %packet_type_label(&packet), "param_bridge template: unsupported packet type, skipping"); + continue; + }; + self.config.template.as_ref().map(|tmpl| apply_template(tmpl, &ctx)) + }, + MappingMode::Raw => raw_payload(&packet), + }; + + let Some(params) = params else { + continue; + }; + + if let Some(d) = debounce { + pending_params = Some((params, text_preview)); + sleep.as_mut().reset(tokio::time::Instant::now() + d); + } else { + // Dedup: skip if identical to last sent params. + if last_sent.as_ref() == Some(¶ms) { + continue; + } + last_sent = Some(params.clone()); + Self::send_params(&context, &telemetry, &node_id, target, params, text_preview.as_deref()).await; + } + } + + () = &mut sleep, if pending_params.is_some() => { + if let Some((params, text_preview)) = pending_params.take() { + // Dedup: skip if identical to last sent params. + if last_sent.as_ref() != Some(¶ms) { + last_sent = Some(params.clone()); + Self::send_params(&context, &telemetry, &node_id, target, params, text_preview.as_deref()).await; + } + } + // Reset sleep to far future so it doesn't fire again. + // Cannot use Duration::MAX — Instant + Duration::MAX overflows. + sleep.as_mut().reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(365 * 24 * 3600)); + } + } + } + + // Flush any pending debounced params before shutting down. + if let Some((params, text_preview)) = pending_params.take() { + if last_sent.as_ref() != Some(¶ms) { + Self::send_params( + &context, + &telemetry, + &node_id, + target, + params, + text_preview.as_deref(), + ) + .await; + } + } + + state_helpers::emit_stopped(&context.state_tx, &node_id, "input_closed"); + tracing::info!(node = %node_id, "param_bridge stopped"); + Ok(()) + } +} + +impl ParamBridgeNode { + async fn send_params( + context: &NodeContext, + telemetry: &TelemetryEmitter, + node_id: &str, + target: &str, + params: JsonValue, + text_preview: Option<&str>, + ) { + tracing::debug!( + node = %node_id, + target_node = %target, + "param_bridge sending UpdateParams" + ); + + // Emit telemetry so the stream view can display forwarded text. + // text_preview is extracted from the packet before mapping, so it + // works regardless of the target node's expected JSON shape. + if let Some(text) = text_preview { + telemetry.emit( + "stt.result", + serde_json::json!({ + "text_preview": text, + "target_node": target, + }), + ); + } + + if let Err(e) = context.tune_sibling(target, params).await { + tracing::warn!( + node = %node_id, + target_node = %target, + error = %e, + "param_bridge failed to send UpdateParams" + ); + } + } +} + +pub fn register(registry: &mut streamkit_core::NodeRegistry) { + use schemars::schema_for; + use streamkit_core::registry::StaticPins; + + let schema = match serde_json::to_value(schema_for!(ParamBridgeConfig)) { + Ok(v) => v, + Err(e) => { + tracing::error!(error = %e, "Failed to serialize ParamBridgeConfig schema"); + return; + }, + }; + + registry.register_static_with_description( + "core::param_bridge", + |params| Ok(Box::new(ParamBridgeNode::new(params)?)), + schema, + StaticPins { inputs: ParamBridgeNode::input_pins(), outputs: vec![] }, + vec!["core".to_string(), "control".to_string()], + false, + "Bridges data-plane packets to control-plane UpdateParams messages. \ + Accepts any packet type and sends a mapped UpdateParams to a configured \ + target node, enabling cross-node control within the pipeline graph. \ + Supports auto, template, and raw mapping modes.", + ); +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use std::sync::Arc; + + use super::*; + use serde_json::json; + use streamkit_core::types::{ + CustomEncoding, CustomPacketData, TranscriptionData, TranscriptionSegment, + }; + + // ── extract_text ──────────────────────────────────────────────── + + #[test] + fn extract_text_from_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hello world".into(), + segments: vec![], + language: None, + metadata: None, + })); + assert_eq!(extract_text(&pkt), Some("hello world".into())); + } + + #[test] + fn extract_text_from_text_packet() { + let pkt = Packet::Text("some text".into()); + assert_eq!(extract_text(&pkt), Some("some text".into())); + } + + #[test] + fn extract_text_from_empty_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: String::new(), + segments: vec![], + language: None, + metadata: None, + })); + assert_eq!(extract_text(&pkt), Some(String::new())); + } + + #[test] + fn extract_text_returns_none_for_custom() { + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: json!({"key": "value"}), + metadata: None, + })); + assert_eq!(extract_text(&pkt), None); + } + + // ── auto_map ──────────────────────────────────────────────────── + + #[test] + fn auto_map_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hi".into(), + segments: vec![], + language: None, + metadata: None, + })); + let result = auto_map(&pkt).unwrap(); + assert_eq!(result, json!({ "properties": { "text": "hi" } })); + } + + #[test] + fn auto_map_text() { + let pkt = Packet::Text("hello".into()); + let result = auto_map(&pkt).unwrap(); + assert_eq!(result, json!({ "properties": { "text": "hello" } })); + } + + #[test] + fn auto_map_custom_forwards_data() { + let data = json!({"props": {"color": "red"}}); + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: data.clone(), + metadata: None, + })); + assert_eq!(auto_map(&pkt).unwrap(), data); + } + + #[test] + fn auto_map_returns_none_for_unsupported() { + // Binary is unsupported in auto mode. + let pkt = Packet::Binary { data: bytes::Bytes::new(), content_type: None, metadata: None }; + assert!(auto_map(&pkt).is_none()); + } + + // ── apply_template ────────────────────────────────────────────── + + /// Helper: build a text-only context for template tests. + fn text_ctx(s: &str) -> JsonValue { + json!({ "text": s }) + } + + #[test] + fn apply_template_string_replacement() { + let tmpl = json!("prefix: {{ text }}"); + let result = apply_template(&tmpl, &text_ctx("hello")); + assert_eq!(result, json!("prefix: hello")); + } + + #[test] + fn apply_template_no_whitespace_placeholder() { + let tmpl = json!("prefix: {{text}}"); + let result = apply_template(&tmpl, &text_ctx("hello")); + assert_eq!(result, json!("prefix: hello")); + } + + #[test] + fn apply_template_nested_object() { + let tmpl = json!({ + "properties": { + "text": "{{ text }}", + "visible": true + } + }); + let result = apply_template(&tmpl, &text_ctx("subtitle line")); + assert_eq!( + result, + json!({ + "properties": { + "text": "subtitle line", + "visible": true + } + }) + ); + } + + #[test] + fn apply_template_array() { + let tmpl = json!(["{{ text }}", "static"]); + let result = apply_template(&tmpl, &text_ctx("dynamic")); + assert_eq!(result, json!(["dynamic", "static"])); + } + + #[test] + fn apply_template_no_placeholder() { + let tmpl = json!({"key": "no placeholder here"}); + let result = apply_template(&tmpl, &text_ctx("ignored")); + assert_eq!(result, json!({"key": "no placeholder here"})); + } + + #[test] + fn apply_template_empty_text() { + let tmpl = json!("{{ text }}"); + let result = apply_template(&tmpl, &text_ctx("")); + assert_eq!(result, json!("")); + } + + #[test] + fn apply_template_preserves_non_string_values() { + let tmpl = json!({"count": 42, "flag": true, "text": "{{ text }}"}); + let result = apply_template(&tmpl, &text_ctx("hello")); + assert_eq!(result, json!({"count": 42, "flag": true, "text": "hello"})); + } + + #[test] + fn apply_template_text_containing_placeholder_literal() { + // Regression: if substituted text contains "{{text}}", the replacement + // must NOT re-scan it (would cause infinite loop / double-replace). + let tmpl = json!("{{ text }}"); + let result = apply_template(&tmpl, &text_ctx("contains {{text}} marker")); + assert_eq!(result, json!("contains {{text}} marker")); + } + + #[test] + fn apply_template_no_infinite_loop_on_replacement_with_placeholder() { + // Regression: the general replacement path (not sole-placeholder fast + // path) must advance past each replacement to avoid re-scanning + // substituted text that itself contains {{ field }} patterns. + let tmpl = json!("Say: {{ text }}!"); + let result = apply_template(&tmpl, &text_ctx("hello {{text}} world")); + assert_eq!(result, json!("Say: hello {{text}} world!")); + } + + #[test] + fn apply_template_sole_placeholder_preserves_type() { + // When a placeholder is the entire value, the raw JSON type is kept. + let ctx = json!({ "is_speech": true, "score": 42 }); + assert_eq!(apply_template(&json!("{{ is_speech }}"), &ctx), json!(true)); + assert_eq!(apply_template(&json!("{{ score }}"), &ctx), json!(42)); + } + + #[test] + fn apply_template_custom_fields_in_object() { + let tmpl = json!({ "properties": { "speaking": "{{ is_speech }}" } }); + let ctx = json!({ "is_speech": true }); + assert_eq!(apply_template(&tmpl, &ctx), json!({ "properties": { "speaking": true } })); + } + + // ── raw_payload ───────────────────────────────────────────────── + + #[test] + fn raw_payload_custom() { + let data = json!({"properties": {"text": "direct"}}); + let pkt = Packet::Custom(Arc::new(CustomPacketData { + type_id: "test".into(), + encoding: CustomEncoding::Json, + data: data.clone(), + metadata: None, + })); + assert_eq!(raw_payload(&pkt).unwrap(), data); + } + + #[test] + fn raw_payload_text() { + let pkt = Packet::Text("raw text".into()); + assert_eq!(raw_payload(&pkt).unwrap(), json!({"text": "raw text"})); + } + + #[test] + fn raw_payload_transcription() { + let pkt = Packet::Transcription(Arc::new(TranscriptionData { + text: "hello".into(), + segments: vec![TranscriptionSegment { + text: "hello".into(), + start_time_ms: 0, + end_time_ms: 1000, + confidence: Some(0.95), + }], + language: Some("en".into()), + metadata: None, + })); + let result = raw_payload(&pkt).unwrap(); + assert_eq!(result["text"], "hello"); + assert_eq!(result["language"], "en"); + } + + #[test] + fn raw_payload_returns_none_for_unsupported() { + let pkt = Packet::Binary { data: bytes::Bytes::new(), content_type: None, metadata: None }; + assert!(raw_payload(&pkt).is_none()); + } + + // ── ParamBridgeNode::new (config validation) ──────────────────── + + #[test] + fn config_requires_params() { + assert!(ParamBridgeNode::new(None).is_err()); + } + + #[test] + fn config_requires_target_node() { + let params = json!({"mode": "auto"}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } + + #[test] + fn config_template_mode_requires_template() { + let params = json!({"target_node": "foo", "mode": "template"}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } + + #[test] + fn config_template_mode_with_template_ok() { + let params = json!({ + "target_node": "sub", + "mode": "template", + "template": {"properties": {"text": "{{ text }}"}} + }); + assert!(ParamBridgeNode::new(Some(¶ms)).is_ok()); + } + + #[test] + fn config_auto_mode_defaults() { + let params = json!({"target_node": "target"}); + let node = ParamBridgeNode::new(Some(¶ms)).unwrap(); + assert!(matches!(node.config.mode, MappingMode::Auto)); + } + + #[test] + fn config_rejects_unknown_fields() { + let params = json!({"target_node": "foo", "unknown_field": true}); + assert!(ParamBridgeNode::new(Some(¶ms)).is_err()); + } + + #[test] + fn config_debounce_ms() { + let params = json!({"target_node": "t", "debounce_ms": 100}); + let node = ParamBridgeNode::new(Some(¶ms)).unwrap(); + assert_eq!(node.config.debounce_ms, Some(100)); + } +} diff --git a/crates/nodes/src/test_utils.rs b/crates/nodes/src/test_utils.rs index 1fad8882..b47827d0 100644 --- a/crates/nodes/src/test_utils.rs +++ b/crates/nodes/src/test_utils.rs @@ -42,6 +42,7 @@ pub fn create_test_context( video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; (context, mock_sender, state_rx) @@ -85,6 +86,7 @@ pub fn create_test_context_with_pin_mgmt( video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; (context, mock_sender, state_rx, pin_mgmt_tx) diff --git a/crates/nodes/src/transport/http.rs b/crates/nodes/src/transport/http.rs index 9cedac26..0093a021 100644 --- a/crates/nodes/src/transport/http.rs +++ b/crates/nodes/src/transport/http.rs @@ -393,6 +393,7 @@ mod tests { video_pool: None, pipeline_mode: streamkit_core::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Create and run node with small chunk size for testing diff --git a/crates/nodes/src/transport/moq/peer/mod.rs b/crates/nodes/src/transport/moq/peer/mod.rs index 4e6a0f0c..c4ac31ad 100644 --- a/crates/nodes/src/transport/moq/peer/mod.rs +++ b/crates/nodes/src/transport/moq/peer/mod.rs @@ -1820,7 +1820,21 @@ impl MoqPeerNode { }, RouteOutcome::NoEntry(packet) => { // No dynamic channel — fall through to the static output sender. - output_sender.send(output_pin, packet).await.is_ok() + match output_sender.send(output_pin, packet).await { + Ok(()) => true, + Err(streamkit_core::OutputSendError::PinNotFound { .. }) => { + // The pin doesn't exist yet — the engine may still be + // wiring up a dynamic output pin for this track. Drop + // the packet but keep the track processor alive so it + // can deliver subsequent frames once the pin appears. + tracing::debug!( + output_pin, + "Output pin not yet available, dropping packet" + ); + true + }, + Err(_) => false, + } }, } } diff --git a/crates/nodes/src/video/compositor/config.rs b/crates/nodes/src/video/compositor/config.rs index 6d4e3648..5e7948fa 100644 --- a/crates/nodes/src/video/compositor/config.rs +++ b/crates/nodes/src/video/compositor/config.rs @@ -150,6 +150,13 @@ pub struct TextOverlayConfig { /// When omitted, the default system font (DejaVu Sans) is used. #[serde(default)] pub font_name: Option, + /// Enable word wrapping within the overlay's bounding rectangle. + /// + /// When `true`, text is wrapped at the width specified by + /// `transform.rect.width`. When `false` (the default), text only + /// breaks on explicit newlines — matching the historical behaviour. + #[serde(default)] + pub word_wrap: bool, } pub(crate) const fn default_opacity() -> f32 { diff --git a/crates/nodes/src/video/compositor/overlay.rs b/crates/nodes/src/video/compositor/overlay.rs index 74e9407c..2fe879ec 100644 --- a/crates/nodes/src/video/compositor/overlay.rs +++ b/crates/nodes/src/video/compositor/overlay.rs @@ -545,9 +545,9 @@ pub fn rasterize_text_overlay( }; let font_size = config.font_size.max(1) as f32; - // No word wrapping — text only breaks on explicit newlines. - // Passing 0 tells wrap_text_lines to split on '\n' only. - let wrap_width = 0; + // Word-wrap when enabled: use the overlay's bounding rect width. + // Otherwise only break on explicit newlines (wrap_width = 0). + let wrap_width = if config.word_wrap { config.transform.rect.width } else { 0 }; // Measure actual text dimensions so the bitmap is large enough to hold // the full rendered string without clipping. When a wrap width is set diff --git a/crates/nodes/src/video/compositor/tests.rs b/crates/nodes/src/video/compositor/tests.rs index c40bf05b..70aa9969 100644 --- a/crates/nodes/src/video/compositor/tests.rs +++ b/crates/nodes/src/video/compositor/tests.rs @@ -286,6 +286,7 @@ fn test_rasterize_text_overlay_produces_pixels() { color: [255, 255, 0, 255], font_size: 24, font_name: None, + word_wrap: false, }; let overlay = rasterize_text_overlay(&cfg, 7680, 10_000); // Bitmap is sized to the measured text extent, not the config rect. @@ -2232,6 +2233,7 @@ fn test_text_overlay_cache_reuses_arc_on_unchanged_config() { color: [255, 255, 255, 255], font_size: 24, font_name: None, + word_wrap: false, }; let limits = GlobalCompositorConfig::default(); let mut config = @@ -2350,6 +2352,7 @@ fn test_text_overlay_cache_handles_length_changes() { color: [255, 255, 255, 255], font_size: 24, font_name: None, + word_wrap: false, }; let limits = GlobalCompositorConfig::default(); let mut stats = NodeStatsTracker::new("test".to_string(), None); @@ -2509,6 +2512,7 @@ async fn test_compositor_output_format_runtime_change() { video_pool: None, pipeline_mode: streamkit_core::node::PipelineMode::Dynamic, view_data_tx: None, + engine_control_tx: None, }; // Start with no output_format (RGBA8). diff --git a/justfile b/justfile index daa2eafd..be8c6ee4 100644 --- a/justfile +++ b/justfile @@ -753,12 +753,12 @@ upload-parakeet-plugin: build-plugin-native-parakeet @curl -X POST -F "plugin=@{{plugins_target_dir}}/release/libparakeet.so" \ http://127.0.0.1:4545/api/v1/plugins -# Download Parakeet TDT models +# Download Parakeet TDT v3 models (25 languages, INT8) download-parakeet-models: - @echo "Downloading Parakeet TDT models (~631MB)..." - @mkdir -p models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v2-int8 + @echo "Downloading Parakeet TDT v3 models (~671MB)..." + @mkdir -p models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v3-int8 @HF_BASE="https://huggingface.co/streamkit/parakeet-models/resolve/main" && \ - MODEL_DIR="models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v2-int8" && \ + MODEL_DIR="models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v3-int8" && \ for f in encoder.int8.onnx decoder.int8.onnx joiner.int8.onnx tokens.txt; do \ if [ -f "$MODEL_DIR/$f" ]; then \ echo "✓ $f already exists"; \ @@ -767,7 +767,7 @@ download-parakeet-models: curl -L -o "$MODEL_DIR/$f" "$HF_BASE/$f" || exit 1; \ fi; \ done && \ - echo "✓ Parakeet TDT models ready at $MODEL_DIR (English)" + echo "✓ Parakeet TDT v3 models ready at $MODEL_DIR (25 languages)" # Setup Parakeet (install dependencies + download models) setup-parakeet: install-sherpa-onnx download-parakeet-models download-silero-vad diff --git a/marketplace/official-plugins.json b/marketplace/official-plugins.json index 128c63ac..9fd91e14 100644 --- a/marketplace/official-plugins.json +++ b/marketplace/official-plugins.json @@ -153,11 +153,11 @@ "artifact": "target/plugins/release/libparakeet.so", "description": "Fast speech-to-text using NVIDIA Parakeet TDT via sherpa-onnx", "license": "MPL-2.0", - "homepage": "https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2", + "homepage": "https://huggingface.co/nvidia/parakeet-tdt-0.6b-v3", "models": [ { - "id": "parakeet-tdt-0.6b-v2-int8", - "name": "Parakeet TDT 0.6B v2 (English, INT8)", + "id": "parakeet-tdt-0.6b-v3-int8", + "name": "Parakeet TDT 0.6B v3 (25 languages, INT8)", "default": true, "source": "huggingface", "repo_id": "streamkit/parakeet-models", @@ -168,15 +168,9 @@ "joiner.int8.onnx", "tokens.txt" ], - "expected_size_bytes": 661190513, + "expected_size_bytes": 671145061, "license": "CC-BY-4.0", - "license_url": "https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2", - "file_checksums": { - "encoder.int8.onnx": "a32b12d17bbbc309d0686fbbcc2987b5e9b8333a7da83fa6b089f0a2acd651ab", - "decoder.int8.onnx": "b6bb64963457237b900e496ee9994b59294526439fbcc1fecf705b31a15c6b4e", - "joiner.int8.onnx": "7946164367946e7f9f29a122407c3252b680dbae9a51343eb2488d057c3c43d2", - "tokens.txt": "ec182b70dd42113aff6c5372c75cac58c952443eb22322f57bbd7f53977d497d" - } + "license_url": "https://huggingface.co/nvidia/parakeet-tdt-0.6b-v3" }, { "id": "silero-vad", @@ -313,7 +307,7 @@ { "id": "slint", "name": "Slint", - "version": "0.2.0", + "version": "0.3.0", "node_kind": "slint", "kind": "native", "entrypoint": "libslint.so", @@ -366,7 +360,7 @@ { "id": "vad", "name": "VAD", - "version": "0.2.0", + "version": "0.3.0", "node_kind": "vad", "kind": "native", "entrypoint": "libvad.so", diff --git a/plugins/native/parakeet/Cargo.lock b/plugins/native/parakeet/Cargo.lock index 8a3deca7..70b8a36c 100644 --- a/plugins/native/parakeet/Cargo.lock +++ b/plugins/native/parakeet/Cargo.lock @@ -522,9 +522,7 @@ dependencies = [ name = "parakeet-plugin-native" version = "0.1.0" dependencies = [ - "cc", "ndarray", - "once_cell", "ort", "serde", "serde_json", diff --git a/plugins/native/parakeet/plugin.yml b/plugins/native/parakeet/plugin.yml index a13be801..f030083b 100644 --- a/plugins/native/parakeet/plugin.yml +++ b/plugins/native/parakeet/plugin.yml @@ -7,11 +7,11 @@ entrypoint: libparakeet.so artifact: target/plugins/release/libparakeet.so description: Fast speech-to-text using NVIDIA Parakeet TDT via sherpa-onnx license: MPL-2.0 -homepage: https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2 +homepage: https://huggingface.co/nvidia/parakeet-tdt-0.6b-v3 repo: https://github.com/streamer45/streamkit models: -- id: parakeet-tdt-0.6b-v2-int8 - name: Parakeet TDT 0.6B v2 (English, INT8) +- id: parakeet-tdt-0.6b-v3-int8 + name: Parakeet TDT 0.6B v3 (25 languages, INT8) default: true source: huggingface repo_id: streamkit/parakeet-models @@ -21,14 +21,9 @@ models: - decoder.int8.onnx - joiner.int8.onnx - tokens.txt - expected_size_bytes: 661190513 + expected_size_bytes: 671145061 license: CC-BY-4.0 - license_url: https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2 - file_checksums: - encoder.int8.onnx: a32b12d17bbbc309d0686fbbcc2987b5e9b8333a7da83fa6b089f0a2acd651ab - decoder.int8.onnx: b6bb64963457237b900e496ee9994b59294526439fbcc1fecf705b31a15c6b4e - joiner.int8.onnx: 7946164367946e7f9f29a122407c3252b680dbae9a51343eb2488d057c3c43d2 - tokens.txt: ec182b70dd42113aff6c5372c75cac58c952443eb22322f57bbd7f53977d497d + license_url: https://huggingface.co/nvidia/parakeet-tdt-0.6b-v3 - id: silero-vad name: Silero VAD (v6.2) default: true diff --git a/plugins/native/parakeet/src/config.rs b/plugins/native/parakeet/src/config.rs index 23a42bda..fbe75fa3 100644 --- a/plugins/native/parakeet/src/config.rs +++ b/plugins/native/parakeet/src/config.rs @@ -38,10 +38,15 @@ pub struct ParakeetConfig { /// Maximum segment duration before forcing transcription (seconds) #[serde(default = "default_max_segment_duration_secs")] pub max_segment_duration_secs: f32, + + /// Minimum speech duration to transcribe (milliseconds). + /// Segments shorter than this are discarded as noise. + #[serde(default = "default_min_speech_duration_ms")] + pub min_speech_duration_ms: u64, } fn default_model_dir() -> String { - "models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v2-int8".to_string() + "models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v3-int8".to_string() } const fn default_num_threads() -> i32 { @@ -72,6 +77,10 @@ const fn default_max_segment_duration_secs() -> f32 { 30.0 } +const fn default_min_speech_duration_ms() -> u64 { + 300 +} + impl Default for ParakeetConfig { fn default() -> Self { Self { @@ -83,6 +92,7 @@ impl Default for ParakeetConfig { vad_threshold: default_vad_threshold(), min_silence_duration_ms: default_min_silence_duration_ms(), max_segment_duration_secs: default_max_segment_duration_secs(), + min_speech_duration_ms: default_min_speech_duration_ms(), } } } diff --git a/plugins/native/parakeet/src/parakeet_node.rs b/plugins/native/parakeet/src/parakeet_node.rs index 3b24a2a0..fdfcd6b7 100644 --- a/plugins/native/parakeet/src/parakeet_node.rs +++ b/plugins/native/parakeet/src/parakeet_node.rs @@ -106,7 +106,7 @@ impl NativeProcessorNode for ParakeetNode { "model_dir": { "type": "string", "description": "Path to Parakeet TDT model directory (contains encoder.int8.onnx, decoder.int8.onnx, joiner.int8.onnx, tokens.txt). IMPORTANT: Input audio must be 16kHz mono f32.", - "default": "models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v2-int8" + "default": "models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v3-int8" }, "num_threads": { "type": "integer", @@ -151,6 +151,13 @@ impl NativeProcessorNode for ParakeetNode { "default": 30.0, "minimum": 5.0, "maximum": 120.0 + }, + "min_speech_duration_ms": { + "type": "integer", + "description": "Minimum speech duration to transcribe (milliseconds). Shorter segments are discarded as noise.", + "default": 300, + "minimum": 0, + "maximum": 5000 } } })) @@ -416,6 +423,24 @@ impl ParakeetNode { // Allow: Sample count / sample rate for duration calculation #[allow(clippy::cast_precision_loss)] let duration_secs = samples.len() as f32 / 16000.0; + + // Skip segments shorter than the minimum speech duration — + // these are typically noise bursts that produce hallucinated + // output from the recognizer. + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let duration_ms = (duration_secs * 1000.0) as u64; + if duration_ms < self.config.min_speech_duration_ms { + plugin_info!( + self.logger, + "Skipping short segment: {} samples ({:.2}s < {}ms minimum)", + samples.len(), + duration_secs, + self.config.min_speech_duration_ms + ); + self.silence_frame_count = 0; + return Ok(()); + } + plugin_info!( self.logger, "Transcribing segment: {} samples ({:.2}s)", @@ -466,6 +491,14 @@ impl ParakeetNode { unsafe { CStr::from_ptr(result.text).to_string_lossy().into_owned() } }; + // Extract detected language from the FFI result (v3 model supports 25 languages). + let language = if result.lang.is_null() { + None + } else { + let lang = unsafe { CStr::from_ptr(result.lang).to_string_lossy().into_owned() }; + if lang.is_empty() { None } else { Some(lang) } + }; + // Cleanup unsafe { ffi::SherpaOnnxDestroyOfflineRecognizerResult(result_ptr); @@ -488,7 +521,7 @@ impl ParakeetNode { &Packet::Transcription(std::sync::Arc::new(TranscriptionData { text: segment.text.clone(), segments: vec![segment], - language: Some("en".to_string()), + language, metadata: None, })), )?; diff --git a/plugins/native/slint/Cargo.lock b/plugins/native/slint/Cargo.lock index 6ee6e5e5..112b52e7 100644 --- a/plugins/native/slint/Cargo.lock +++ b/plugins/native/slint/Cargo.lock @@ -3830,7 +3830,7 @@ dependencies = [ [[package]] name = "slint-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "pollster", "serde", diff --git a/plugins/native/slint/Cargo.toml b/plugins/native/slint/Cargo.toml index 7518e4e9..84271656 100644 --- a/plugins/native/slint/Cargo.toml +++ b/plugins/native/slint/Cargo.toml @@ -4,7 +4,7 @@ [package] name = "slint-plugin-native" -version = "0.2.0" +version = "0.3.0" edition = "2021" license = "MPL-2.0" diff --git a/plugins/native/slint/plugin.yml b/plugins/native/slint/plugin.yml index 2e1088aa..2b126a63 100644 --- a/plugins/native/slint/plugin.yml +++ b/plugins/native/slint/plugin.yml @@ -1,6 +1,6 @@ id: slint name: Slint -version: 0.2.0 +version: 0.3.0 node_kind: slint kind: native entrypoint: libslint.so diff --git a/plugins/native/slint/src/slint_thread.rs b/plugins/native/slint/src/slint_thread.rs index ee6310e8..879838c9 100644 --- a/plugins/native/slint/src/slint_thread.rs +++ b/plugins/native/slint/src/slint_thread.rs @@ -125,187 +125,73 @@ pub fn send_work(item: SlintWorkItem) -> Result<(), String> { // ── Slint thread main loop ────────────────────────────────────────────────── +/// A property pair discovered at registration: a source property +/// `{name}` and its corresponding `prev-{name}`. When the source +/// property changes in an `UpdateConfig`, the old value is +/// automatically written to the prev property. +struct TrackedProp { + /// Source property name (snake_case), e.g. `"text"`. + source: String, + /// Prev property name (snake_case), e.g. `"prev_text"`. + prev: String, + /// Type-appropriate default when the source has no prior value. + default_value: serde_json::Value, +} + +/// Per-instance state living on the shared Slint thread. +struct InstanceState { + instance: SlintInstance, + config: SlintConfig, + result_tx: std::sync::mpsc::SyncSender, + /// Cached straight-alpha RGBA8 output from the last render. + cached_frame: Option>, + /// Keyframe index that produced `cached_frame`. + cached_keyframe_idx: Option, + /// Set by `UpdateConfig` to force a re-render on the next frame. + dirty: bool, + /// Original configured dimensions from init. Used to compute + /// the DPI scale factor when upstream resize hints request + /// different physical dimensions — content is rendered at the + /// original logical proportions but at higher physical + /// resolution for crisper text and vector graphics. + original_width: u32, + original_height: u32, + /// Properties tracked for automatic previous-value injection. + /// Populated at registration when the component declares a + /// `prev-{name}` property alongside `{name}` with a matching + /// type (opt-in by the `.slint` author). + tracked_props: Vec, + /// Auto-incrementing revision counter. `Some(n)` when the + /// component declares a `revision` (number) property; bumped + /// whenever at least one tracked property changes value. + revision: Option, +} + /// Entry point for the shared Slint thread. /// /// Processes work items from all plugin instances. The platform backend is /// set once on this thread; all `SlintInstance` values live here. #[allow(clippy::needless_pass_by_value)] fn slint_thread_main(work_rx: std::sync::mpsc::Receiver) { - /// Per-instance state living on the shared thread. - struct InstanceState { - instance: SlintInstance, - config: SlintConfig, - result_tx: std::sync::mpsc::SyncSender, - /// Cached straight-alpha RGBA8 output from the last render. - cached_frame: Option>, - /// Keyframe index that produced `cached_frame`. - cached_keyframe_idx: Option, - /// Set by `UpdateConfig` to force a re-render on the next frame. - dirty: bool, - /// Original configured dimensions from init. Used to compute - /// the DPI scale factor when upstream resize hints request - /// different physical dimensions — content is rendered at the - /// original logical proportions but at higher physical - /// resolution for crisper text and vector graphics. - original_width: u32, - original_height: u32, - } - let mut instances: HashMap = HashMap::new(); let mut platform_set = false; while let Ok(work) = work_rx.recv() { match work { SlintWorkItem::Register { node_id, config, result_tx } => { - match create_slint_instance(&config, &mut platform_set) { - Ok(instance) => { - // Discover publicly declared properties from the compiled - // component. Only types the UI can render as controls - // (bool, number, string) are included. - let properties = - discover_properties(&instance.definition, &instance.component); - - tracing::info!( - node_id = %node_id, - slint_file = %config.slint_file, - discovered_properties = properties.len(), - "Created Slint instance", - ); - let _ = result_tx.send(SlintThreadResult::InitOk { properties }); - instances.insert( - node_id, - InstanceState { - instance, - original_width: config.width, - original_height: config.height, - config, - result_tx, - cached_frame: None, - cached_keyframe_idx: None, - dirty: true, - }, - ); - }, - Err(e) => { - tracing::error!( - node_id = %node_id, - error = %e, - "Failed to create Slint instance", - ); - let _ = result_tx.send(SlintThreadResult::InitErr(e)); - }, - } + handle_register(&mut instances, &mut platform_set, node_id, config, result_tx); }, SlintWorkItem::Render { node_id } => { - if let Some(state) = instances.get_mut(&node_id) { - // Pump Slint timers/animations (process-global) so Timer - // callbacks and CSS-like transitions advance even when the - // frame is served from cache. This call is idempotent and - // wall-clock-based, so running it N times per tick cycle - // (once per instance) is harmless. - slint::platform::update_timers_and_animations(); - - let rgba_data = if state.config.static_ui { - // ── Static UI path: cache the rendered frame ──────── - let kf_idx = if state.config.property_keyframes.is_empty() { - None - } else { - let interval = state.config.keyframe_interval.max(1); - Some( - (state.instance.frame_counter / interval) as usize - % state.config.property_keyframes.len(), - ) - }; - - let need_render = state.dirty - || state.cached_keyframe_idx != kf_idx - || state.cached_frame.is_none(); - - if need_render { - let data = render_slint_frame(&mut state.instance, &state.config); - state.cached_frame = Some(data); - state.cached_keyframe_idx = kf_idx; - state.dirty = false; - } else { - // Advance frame counter so keyframe boundaries - // are detected at the right time. - state.instance.frame_counter = - state.instance.frame_counter.wrapping_add(1); - } - // Clone from cache — avoids a redundant allocation - // compared to cloning before storing. - state.cached_frame.clone().unwrap_or_default() - } else { - // ── Dynamic UI path: always re-render ─────────────── - render_slint_frame(&mut state.instance, &state.config) - }; - - // Use try_send to avoid blocking: if the consumer is slow, - // drop the frame rather than stalling the shared thread. - match state.result_tx.try_send(SlintThreadResult::Frame { rgba_data }) { - Ok(()) => {}, - Err(std::sync::mpsc::TrySendError::Full(_)) => { - tracing::debug!( - node_id = %node_id, - "Result channel full, dropping frame", - ); - }, - Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { - instances.remove(&node_id); - }, - } - } + handle_render(&mut instances, &node_id); }, SlintWorkItem::UpdateConfig { node_id, config } => { if let Some(state) = instances.get_mut(&node_id) { - state.config = config; - state.dirty = true; + state.apply_config_update(&node_id, config); } }, SlintWorkItem::Resize { node_id, width, height } => { if let Some(state) = instances.get_mut(&node_id) { - if state.instance.width != width || state.instance.height != height { - state.instance.width = width; - state.instance.height = height; - state.config.width = width; - state.config.height = height; - - // Compute DPI scale factor so content renders at - // original logical proportions but higher physical - // resolution. Text and vector graphics benefit - // from crisper rendering without changing apparent - // size. `min()` picks the axis that limits - // scaling (letterbox logic). - #[allow(clippy::cast_precision_loss)] - let scale = f32::min( - width as f32 / state.original_width.max(1) as f32, - height as f32 / state.original_height.max(1) as f32, - ) - .max(0.1); - - // Notify Slint of the new scale factor, then set - // the physical size. Order matters: scale must be - // applied first so Slint computes correct logical - // coordinates from the physical dimensions. - state.instance.component.window().dispatch_event( - slint::platform::WindowEvent::ScaleFactorChanged { - scale_factor: scale, - }, - ); - state.instance.window.set_size(PhysicalSize::new(width, height)); - - let pixel_count = (width as usize) * (height as usize); - state.instance.buffer = - vec![PremultipliedRgbaColor::default(); pixel_count]; - state.cached_frame = None; - state.dirty = true; - tracing::info!( - node_id = %node_id, - width, height, - scale_factor = %scale, - "Resized Slint instance via upstream hint", - ); - } + state.apply_resize(&node_id, width, height); } }, SlintWorkItem::Unregister { node_id } => { @@ -315,6 +201,184 @@ fn slint_thread_main(work_rx: std::sync::mpsc::Receiver) { } } +/// Handle a `Register` work item: compile the `.slint` file, discover +/// properties, and insert the instance into the map. +fn handle_register( + instances: &mut HashMap, + platform_set: &mut bool, + node_id: NodeId, + config: SlintConfig, + result_tx: std::sync::mpsc::SyncSender, +) { + match create_slint_instance(&config, platform_set) { + Ok(instance) => { + let properties = discover_properties(&instance.definition, &instance.component); + let tracked_props = discover_tracked_props(&instance.definition); + let revision = if instance.definition.properties().any(|(n, _)| n == "revision") { + Some(0i64) + } else { + None + }; + + tracing::info!( + node_id = %node_id, + slint_file = %config.slint_file, + discovered_properties = properties.len(), + tracked_pairs = tracked_props.len(), + has_revision = revision.is_some(), + "Created Slint instance", + ); + let _ = result_tx.send(SlintThreadResult::InitOk { properties }); + instances.insert( + node_id, + InstanceState { + instance, + original_width: config.width, + original_height: config.height, + config, + result_tx, + cached_frame: None, + cached_keyframe_idx: None, + dirty: true, + tracked_props, + revision, + }, + ); + }, + Err(e) => { + tracing::error!(node_id = %node_id, error = %e, "Failed to create Slint instance"); + let _ = result_tx.send(SlintThreadResult::InitErr(e)); + }, + } +} + +/// Handle a `Render` work item: produce a frame (from cache or fresh render) +/// and send it back on the result channel. +fn handle_render(instances: &mut HashMap, node_id: &NodeId) { + let Some(state) = instances.get_mut(node_id) else { return }; + + // Pump Slint timers/animations (process-global) so Timer callbacks + // and CSS-like transitions advance even when the frame is served + // from cache. + slint::platform::update_timers_and_animations(); + + let rgba_data = if state.config.static_ui { + // ── Static UI path: cache the rendered frame ──────── + let kf_idx = if state.config.property_keyframes.is_empty() { + None + } else { + let interval = state.config.keyframe_interval.max(1); + Some( + (state.instance.frame_counter / interval) as usize + % state.config.property_keyframes.len(), + ) + }; + + let need_render = + state.dirty || state.cached_keyframe_idx != kf_idx || state.cached_frame.is_none(); + + if need_render { + let data = render_slint_frame(&mut state.instance, &state.config); + state.cached_frame = Some(data); + state.cached_keyframe_idx = kf_idx; + state.dirty = false; + } else { + state.instance.frame_counter = state.instance.frame_counter.wrapping_add(1); + } + state.cached_frame.clone().unwrap_or_default() + } else { + // ── Dynamic UI path: always re-render ─────────────── + render_slint_frame(&mut state.instance, &state.config) + }; + + match state.result_tx.try_send(SlintThreadResult::Frame { rgba_data }) { + Ok(()) => {}, + Err(std::sync::mpsc::TrySendError::Full(_)) => { + tracing::debug!(node_id = %node_id, "Result channel full, dropping frame"); + }, + Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { + instances.remove(node_id); + }, + } +} + +impl InstanceState { + /// Apply an `UpdateConfig`: inject previous values for tracked + /// properties, bump the revision counter, and mark the instance dirty. + fn apply_config_update(&mut self, node_id: &NodeId, mut config: SlintConfig) { + let mut any_changed = false; + for tp in &self.tracked_props { + let new_val = config.properties.get(&tp.source); + let old_val = self.config.properties.get(&tp.source); + if new_val.is_some() && new_val != old_val { + let prev = old_val.cloned().unwrap_or_else(|| tp.default_value.clone()); + tracing::debug!( + node_id = %node_id, + property = %tp.source, + prev = %prev, + new = %new_val.unwrap_or(&serde_json::Value::Null), + "Tracked property changed, injecting prev value", + ); + config.properties.insert(tp.prev.clone(), prev); + any_changed = true; + } + } + if any_changed { + if let Some(rev) = &mut self.revision { + *rev += 1; + tracing::debug!(node_id = %node_id, revision = *rev, "Bumped revision counter"); + config.properties.insert("revision".to_string(), serde_json::json!(*rev)); + } + } + tracing::debug!( + node_id = %node_id, + properties = ?config.properties.keys().collect::>(), + "UpdateConfig applied", + ); + self.config = config; + self.dirty = true; + } + + /// Apply a resize: update dimensions, recompute the DPI scale factor, + /// and reallocate the rendering buffer. + fn apply_resize(&mut self, node_id: &NodeId, width: u32, height: u32) { + if self.instance.width == width && self.instance.height == height { + return; + } + self.instance.width = width; + self.instance.height = height; + self.config.width = width; + self.config.height = height; + + // Compute DPI scale factor so content renders at original logical + // proportions but higher physical resolution. + #[allow(clippy::cast_precision_loss)] + let scale = f32::min( + width as f32 / self.original_width.max(1) as f32, + height as f32 / self.original_height.max(1) as f32, + ) + .max(0.1); + + // Scale must be applied first so Slint computes correct logical + // coordinates from the physical dimensions. + self.instance.component.window().dispatch_event( + slint::platform::WindowEvent::ScaleFactorChanged { scale_factor: scale }, + ); + self.instance.window.set_size(PhysicalSize::new(width, height)); + + let pixel_count = (width as usize) * (height as usize); + self.instance.buffer = vec![PremultipliedRgbaColor::default(); pixel_count]; + self.cached_frame = None; + self.dirty = true; + tracing::info!( + node_id = %node_id, + width, height, + scale_factor = %scale, + "Resized Slint instance via upstream hint", + ); + } +} + // ── Slint rendering internals ─────────────────────────────────────────────── /// Scope guard that clears the `CURRENT_WINDOW` thread-local on drop. @@ -482,6 +546,36 @@ fn discover_properties( .collect() } +/// Discover `prev-{name}` / `{name}` property pairs for automatic +/// previous-value injection. +/// +/// A pair is tracked when the component declares both `{name}` and +/// `prev-{name}` with the same `ValueType`. Only JSON-representable +/// types (string, number, bool) are supported. Property names are +/// returned in snake_case to match the StreamKit config convention. +fn discover_tracked_props(definition: &ComponentDefinition) -> Vec { + let prop_map: HashMap = definition.properties().collect(); + let mut tracked = Vec::new(); + for (name, vt) in &prop_map { + if let Some(source) = name.strip_prefix("prev-") { + if prop_map.get(source) == Some(vt) { + let default_value = match vt { + ValueType::String => serde_json::Value::String(String::new()), + ValueType::Number => serde_json::json!(0), + ValueType::Bool => serde_json::Value::Bool(false), + _ => continue, + }; + tracked.push(TrackedProp { + source: source.replace('-', "_"), + prev: name.replace('-', "_"), + default_value, + }); + } + } + } + tracked +} + /// Render a single frame from the Slint instance, returning raw RGBA8 data. /// /// Applies property keyframe cycling. Timer/animation pumping is handled @@ -519,13 +613,30 @@ fn render_slint_frame(instance: &mut SlintInstance, config: &SlintConfig) -> Vec // ── Private helpers ───────────────────────────────────────────────────────── /// Map JSON property values to Slint `Value` and set them on the component. +/// +/// `revision` is always set **last** so that any properties it drives +/// (e.g. crossfade layer selection via `use-a: Math.mod(revision, 2)`) +/// see up-to-date data values (`text`, `prev-text`, etc.) rather than +/// stale ones. Without this ordering, HashMap iteration might set +/// `revision` before `text`, creating a one-frame flash of old content. fn set_properties(component: &ComponentInstance, properties: &HashMap) { + let mut deferred_revision = None; for (key, json_val) in properties { + if key == "revision" { + deferred_revision = Some(json_val); + continue; + } let slint_val = json_to_slint_value(json_val); if let Err(e) = component.set_property(key, slint_val) { tracing::warn!(property = %key, error = %e, "Failed to set Slint property"); } } + if let Some(json_val) = deferred_revision { + let slint_val = json_to_slint_value(json_val); + if let Err(e) = component.set_property("revision", slint_val) { + tracing::warn!(property = "revision", error = %e, "Failed to set Slint property"); + } + } } /// Convert a JSON value to a Slint interpreter `Value`. diff --git a/plugins/native/vad/Cargo.lock b/plugins/native/vad/Cargo.lock index fcadbd5c..a401efdb 100644 --- a/plugins/native/vad/Cargo.lock +++ b/plugins/native/vad/Cargo.lock @@ -362,7 +362,7 @@ checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" [[package]] name = "vad-plugin-native" -version = "0.2.0" +version = "0.3.0" dependencies = [ "serde", "serde_json", diff --git a/plugins/native/vad/Cargo.toml b/plugins/native/vad/Cargo.toml index 786a8236..82b67b1c 100644 --- a/plugins/native/vad/Cargo.toml +++ b/plugins/native/vad/Cargo.toml @@ -4,7 +4,7 @@ [package] name = "vad-plugin-native" -version = "0.2.0" +version = "0.3.0" edition = "2021" license = "MPL-2.0" diff --git a/plugins/native/vad/plugin.yml b/plugins/native/vad/plugin.yml index fa071593..9e85e1c3 100644 --- a/plugins/native/vad/plugin.yml +++ b/plugins/native/vad/plugin.yml @@ -1,6 +1,6 @@ id: vad name: VAD -version: 0.2.0 +version: 0.3.0 node_kind: vad kind: native entrypoint: libvad.so diff --git a/plugins/native/vad/src/vad_node.rs b/plugins/native/vad/src/vad_node.rs index df8aee75..392654c3 100644 --- a/plugins/native/vad/src/vad_node.rs +++ b/plugins/native/vad/src/vad_node.rs @@ -133,6 +133,7 @@ impl VadNode { ) -> Result<(), String> { let data = serde_json::json!({ "event_type": event_type, + "is_speech": event_type == "speech_start", "timestamp_ms": timestamp_ms, "duration_ms": duration_ms }); diff --git a/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml new file mode 100644 index 00000000..a62933a6 --- /dev/null +++ b/samples/pipelines/dynamic/video_moq_webcam_subtitles.yml @@ -0,0 +1,201 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +# Live subtitle demo: webcam PiP over colorbars with real-time Parakeet +# transcription rendered as a Slint subtitle overlay. +# +# Data flow (subtitles): +# mic → opus_decoder → resampler → parakeet → [best_effort] → +# subtitle_bridge → UpdateParams(text) → slint subtitle → compositor +# +# Data flow (speaking indicator): +# resampler → vad → [best_effort] → +# vad_bridge → UpdateParams(speaking) → slint subtitle +# +# (param_bridge also emits stt.result telemetry for stream view) +# +# Requires: +# - plugin::native::parakeet loaded (with Parakeet TDT model) +# - plugin::native::vad loaded (with TEN-VAD model) +# - plugin::native::slint loaded +# just build-plugin-native-parakeet && just build-plugin-native-vad && just build-plugin-native-slint && just copy-plugins-native +# just download-parakeet-models && just download-silero-vad && just download-tenvad-models + +name: Webcam PiP + Live Subtitles (MoQ) +description: >- + Composites the user's webcam as picture-in-picture over colorbars with + real-time Parakeet TDT transcription displayed as subtitles via a Slint + overlay. Demonstrates the param_bridge node for cross-node control + messaging. ~10x faster than Whisper on CPU. +mode: dynamic +client: + gateway_path: /moq/video + publish: + broadcast: input + tracks: + - kind: audio + source: microphone + - kind: video + source: camera + watch: + broadcast: output + audio: true + video: true + +nodes: + # --- Background --- + + colorbars_bg: + kind: video::colorbars + params: + width: 1280 + height: 720 + fps: 30 + pixel_format: rgba8 + + # --- MoQ transport (publish + subscribe) --- + + moq_peer: + kind: transport::moq::peer + params: + gateway_path: /moq/video + input_broadcasts: + - input + output_broadcast: output + allow_reconnect: true + needs: + in: opus_encoder + in_1: vp9_encoder + + # --- Audio → STT path --- + + opus_decoder: + kind: audio::opus::decoder + needs: + in: moq_peer.audio/data + + resampler: + kind: audio::resampler + params: + target_sample_rate: 16000 + needs: opus_decoder + + parakeet: + kind: plugin::native::parakeet + params: + model_dir: models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v3-int8 + num_threads: 4 + use_vad: true + vad_model_path: models/silero_vad.onnx + vad_threshold: 0.5 + min_silence_duration_ms: 700 + needs: resampler + + # --- Real-time VAD → speaking indicator --- + + vad: + kind: plugin::native::vad + params: + output_mode: events + threshold: 0.5 + min_silence_duration_s: 0.3 + min_speech_duration_s: 0.15 + needs: resampler + + vad_bridge: + kind: core::param_bridge + params: + target_node: subtitles + mode: template + template: + properties: + speaking: "{{ is_speech }}" + needs: + in: + node: vad + mode: best_effort + + # --- Subtitle rendering (Slint) --- + + subtitles: + kind: plugin::native::slint + params: + width: 1280 + height: 120 + fps: 30 + slint_file: samples/slint/system/subtitle.slint + properties: + text: "" + show: true + speaking: false + + subtitle_bridge: + kind: core::param_bridge + params: + target_node: subtitles + mode: template + template: + properties: + text: "{{ text }}" + debounce_ms: 100 + needs: + in: + node: parakeet + mode: best_effort + + # --- Video decode + compositing --- + + vp9_decoder: + kind: video::vp9::decoder + needs: + in: moq_peer.video/hd + + compositor: + kind: video::compositor + params: + width: 1280 + height: 720 + num_inputs: 3 + output_format: nv12 + layers: + in_0: + opacity: 1.0 + z_index: 0 + in_1: + rect: + x: 880 + y: 20 + width: 380 + height: 285 + opacity: 0.9 + z_index: 1 + crop_zoom: 2.2 + crop_x: 0.5 + crop_y: 0.5 + in_2: + rect: + x: 0 + y: 600 + width: 1280 + height: 120 + opacity: 1.0 + z_index: 2 + needs: + - colorbars_bg + - vp9_decoder + - subtitles + + # --- Encode + output --- + + vp9_encoder: + kind: video::vp9::encoder + params: + keyframe_interval: 30 + needs: compositor + + # --- Audio loopback --- + + opus_encoder: + kind: audio::opus::encoder + needs: opus_decoder diff --git a/samples/pipelines/oneshot/parakeet-stt.yml b/samples/pipelines/oneshot/parakeet-stt.yml index a73af762..e06a4f76 100644 --- a/samples/pipelines/oneshot/parakeet-stt.yml +++ b/samples/pipelines/oneshot/parakeet-stt.yml @@ -24,7 +24,7 @@ steps: - kind: plugin::native::parakeet params: - model_dir: models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v2-int8 + model_dir: models/sherpa-onnx-nemo-parakeet-tdt-0.6b-v3-int8 num_threads: 4 use_vad: true vad_model_path: models/silero_vad.onnx diff --git a/samples/slint/system/subtitle.slint b/samples/slint/system/subtitle.slint new file mode 100644 index 00000000..88a84d2f --- /dev/null +++ b/samples/slint/system/subtitle.slint @@ -0,0 +1,173 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +// Subtitle overlay for real-time transcription display. +// +// Crossfade: the Slint plugin's general-purpose previous-value +// tracking detects the `prev-text` / `text` pair and automatically +// injects the old value + bumps `revision` on each change. The two +// text layers alternate based on revision parity — purely reactive. +// +// Auto-fade: a Timer marks the subtitle as stale 5 s after the last +// revision bump. The panel stays visible while `speaking` is true +// (driven by the pipeline VAD) even if stale, so it only fades when +// the speaker actually goes silent AND no new text has arrived. +// +// Speaking indicator: driven by a pipeline VAD node via a second +// param_bridge that sets `speaking` in real time. +// +// Properties updated at runtime via param_bridge → UpdateParams: +// text, show, speaking +// Properties auto-managed by the Slint plugin (prev-value tracking): +// prev-text, revision + +export component Subtitle inherits Window { + // ── Pipeline-driven properties ───────────────────────────── + in property text: ""; + in property prev-text: ""; + in property revision: 0; + in property show: true; + in property speaking: false; + + // ── Crossfade state (reactive) ───────────────────────────── + + property use-a: Math.mod(revision, 2) == 0; + property a-text: use-a ? text : prev-text; + property b-text: !use-a ? text : prev-text; + + // ── Auto-fade after inactivity ───────────────────────────── + // + // `stale-revision` records the revision at which the fade timer + // last fired. When it equals the current revision, the text is + // stale. When revision bumps (new text), the mismatch restarts + // the timer naturally — no toggle hacks needed. + + property stale-revision: -1; + property has-text: show && text != ""; + property stale: has-text && stale-revision == revision; + + fade-timer := Timer { + interval: 5000ms; + running: root.has-text && !root.stale; + triggered => { + root.stale-revision = root.revision; + } + } + + // ── Derived visibility ───────────────────────────────────── + // + // The backdrop bar is always visible (controlled by `show`). + // Text fades 5 s after the last transcription. `speaking` + // only drives the visual indicators (dot / accent), NOT text + // visibility — otherwise stale text reappears when the VAD + // fires before the next transcription arrives. + + property text-visible: has-text && !stale; + property a-on: text-visible && a-text != "" && use-a; + property b-on: text-visible && b-text != "" && !use-a; + + width: 1280px; + height: 120px; + background: transparent; + + Rectangle { + width: 100%; + height: 100%; + background: transparent; + + // Backdrop panel — always visible when `show` is true. + Rectangle { + x: 60px; + y: root.show ? 14px : 22px; + width: root.width - 120px; + height: root.height - 28px; + border-radius: 12px; + background: #0c1018e0; + opacity: root.show ? 1.0 : 0.0; + border-width: 1px; + border-color: root.speaking ? #ffffff1a : #ffffff0e; + + animate opacity { duration: 280ms; easing: ease-out; } + animate y { duration: 380ms; easing: cubic-bezier(0.22, 1, 0.36, 1); } + animate border-color { duration: 200ms; easing: ease-out; } + + // Speaking indicator — glow ring (behind dot). + Rectangle { + x: 6px; + y: (parent.height - 26px) / 2; + width: 26px; + height: 26px; + border-radius: 13px; + background: root.speaking ? #4ade8020 : transparent; + + animate background { duration: 300ms; easing: ease-out; } + } + + // Speaking indicator — solid dot. + Rectangle { + x: 12px; + y: (parent.height - 14px) / 2; + width: 14px; + height: 14px; + border-radius: 7px; + background: root.speaking ? #4ade80 : #ffffff20; + + animate background { duration: 150ms; easing: ease-out; } + } + + // Accent highlight — wider and brighter when speaking. + Rectangle { + x: (parent.width - self.width) / 2; + y: 0px; + width: root.speaking ? parent.width * 0.5 : parent.width * 0.3; + height: 2px; + border-radius: 1px; + background: root.speaking ? #58a6ffa0 : #58a6ff50; + + animate width { duration: 400ms; delay: 60ms; easing: cubic-bezier(0.22, 1, 0.36, 1); } + animate background { duration: 250ms; easing: ease-out; } + } + + // Text layer A — active on even revisions. + Text { + x: 38px; + y: root.a-on ? 0px : 8px; + width: parent.width - 56px; + height: parent.height; + text: root.a-text; + color: #ffffffea; + font-size: 24px; + font-weight: 600; + wrap: word-wrap; + vertical-alignment: center; + horizontal-alignment: center; + letter-spacing: 0.3px; + opacity: root.a-on ? 1.0 : 0.0; + + animate opacity { duration: 180ms; easing: ease-in; } + animate y { duration: 300ms; easing: cubic-bezier(0.22, 1, 0.36, 1); } + } + + // Text layer B — active on odd revisions. + Text { + x: 38px; + y: root.b-on ? 0px : 8px; + width: parent.width - 56px; + height: parent.height; + text: root.b-text; + color: #ffffffea; + font-size: 24px; + font-weight: 600; + wrap: word-wrap; + vertical-alignment: center; + horizontal-alignment: center; + letter-spacing: 0.3px; + opacity: root.b-on ? 1.0 : 0.0; + + animate opacity { duration: 180ms; easing: ease-in; } + animate y { duration: 300ms; easing: cubic-bezier(0.22, 1, 0.36, 1); } + } + } + } +}