Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
19b8a7c
feat(engine,nodes): add cross-node control messaging via param_bridge
streamkit-devin Apr 9, 2026
587d05c
fix(nodes): skip unsupported packets in template mode, add unit tests
streamkit-devin Apr 9, 2026
966a7f7
style(nodes): fix clippy lints in param_bridge
streamkit-devin Apr 9, 2026
392bf8c
fix(nodes): address review findings for param_bridge
streamkit-devin Apr 9, 2026
7a39ace
fix(nodes): normalize template placeholders before substitution
streamkit-devin Apr 9, 2026
1560532
fix(slint): rename reserved 'visible' property in subtitle.slint
streamkit-devin Apr 9, 2026
8044c9d
feat(nodes): emit telemetry from param_bridge for stream view visibility
streamkit-devin Apr 9, 2026
2b91476
feat(pipeline): add VAD filtering and telemetry_out to subtitle pipeline
streamkit-devin Apr 9, 2026
81128d3
fix(nodes): handle control_rx Shutdown in param_bridge select loop
streamkit-devin Apr 9, 2026
e7550d8
fix(param_bridge): dedup identical params, decouple telemetry, add su…
streamkit-devin Apr 10, 2026
bcd8315
style: cargo fmt
streamkit-devin Apr 10, 2026
6d2350e
fix(sample): remove show:true from subtitle template
streamkit-devin Apr 10, 2026
4bd72ed
fix(sample): text transition effect + connection_mode syntax
streamkit-devin Apr 10, 2026
fe3847b
Merge remote-tracking branch 'origin/main' into devin/1775753343-para…
streamkit-devin Apr 10, 2026
db3a883
feat(sample): switch subtitle demo from Whisper to Parakeet TDT
streamkit-devin Apr 10, 2026
8b068ff
misc improvements
streamer45 Apr 11, 2026
00499f1
misc improvements
streamer45 Apr 11, 2026
c300c85
fix: address Devin Review findings (template loop, profiling, parakee…
streamkit-devin Apr 11, 2026
8c600e8
Merge remote-tracking branch 'origin/main' into devin/1775753343-para…
streamkit-devin Apr 11, 2026
9193aaf
style: cargo fmt slint plugin
streamkit-devin Apr 11, 2026
b699b76
refactor(slint): extract helpers to reduce cognitive complexity
streamkit-devin Apr 11, 2026
c711f2a
fix(samples): fix VAD model setup and remove duplicate telemetry
streamkit-devin Apr 11, 2026
63250fd
Merge branch 'main' into devin/1775753343-param-bridge-subtitles
streamkit-devin Apr 11, 2026
d0be2d5
fix(assets): flush tokio file after multipart upload to prevent trunc…
streamkit-devin Apr 12, 2026
3478ae2
refactor(assets): consolidate duplicate upload streaming logic
streamkit-devin Apr 12, 2026
88480c0
style(assets): apply rustfmt formatting to stream_field_to_file
streamkit-devin Apr 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 74 additions & 149 deletions apps/skit/src/assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,56 +238,85 @@ async fn list_assets(
}

/// Stream an uploaded multipart field to disk with size enforcement.
async fn write_upload_stream_to_disk(
///
/// On any error the partially-written file is removed before returning.
/// Callers that need a REUSE license sidecar should create it after this
/// function succeeds.
async fn stream_field_to_file(
mut field: axum::extract::multipart::Field<'_>,
file_path: &std::path::Path,
extension: &str,
max_size: usize,
) -> Result<usize, AssetsError> {
use tokio::fs::OpenOptions;

let mut file = OpenOptions::new()
.create_new(true)
.write(true)
.open(file_path)
.await
.map_err(|e| AssetsError::IoError(format!("Failed to create file: {e}")))?;

let mut total_bytes: usize = 0;
loop {
match field.chunk().await {
Ok(Some(chunk)) => {
total_bytes = total_bytes.saturating_add(chunk.len());
if total_bytes > MAX_AUDIO_FILE_SIZE {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::FileTooLarge(MAX_AUDIO_FILE_SIZE));
}
let open_result = OpenOptions::new().create_new(true).write(true).open(file_path).await;

if let Err(e) = file.write_all(&chunk).await {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::IoError(format!("Failed to write file: {e}")));
}
},
Ok(None) => break,
Err(e) => {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::InvalidRequest(format!(
"Failed to read upload stream: {e}"
)));
},
let mut file = match open_result {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
return Err(AssetsError::FileExists(
file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(),
));
},
Err(e) => return Err(AssetsError::IoError(format!("Failed to create file: {e}"))),
};

// Inner block: any error triggers a single cleanup path below.
let result = async {
let mut total_bytes: usize = 0;
loop {
match field.chunk().await {
Ok(Some(chunk)) => {
total_bytes = total_bytes.saturating_add(chunk.len());
if total_bytes > max_size {
return Err(AssetsError::FileTooLarge(max_size));
}
file.write_all(&chunk)
.await
.map_err(|e| AssetsError::IoError(format!("Failed to write file: {e}")))?;
},
Ok(None) => break,
Err(e) => {
return Err(AssetsError::InvalidRequest(format!(
"Failed to read upload stream: {e}"
)));
},
}
}

// Flush pending writes — tokio::fs::File::write_all returns as soon as
// data is copied to an internal buffer and a blocking write is spawned,
// so the last write may still be in-flight when the File is dropped.
file.flush()
.await
.map_err(|e| AssetsError::IoError(format!("Failed to flush file: {e}")))?;

Ok(total_bytes)
}
.await;

// Create default license file (best-effort).
let license_path = file_path.with_extension(format!("{extension}.license"));
// REUSE-IgnoreStart
let default_license =
"SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n";
// REUSE-IgnoreEnd
if let Err(e) = fs::write(&license_path, default_license).await {
warn!("Failed to create license file: {}", e);
if result.is_err() {
let _ = fs::remove_file(file_path).await;
}

Ok(total_bytes)
result
}

/// Create a default REUSE license sidecar next to the uploaded file.
fn create_license_sidecar(
file_path: &std::path::Path,
extension: &str,
) -> impl std::future::Future<Output = ()> + Send + 'static {
let license_path = file_path.with_extension(format!("{extension}.license"));
async move {
// REUSE-IgnoreStart
let default_license =
"SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n";
// REUSE-IgnoreEnd
if let Err(e) = fs::write(&license_path, default_license).await {
warn!("Failed to create license file: {}", e);
}
}
}

/// Build AudioAsset response for uploaded file
Expand Down Expand Up @@ -332,7 +361,8 @@ async fn process_upload(
return Err(AssetsError::FileExists(filename));
}

let written_bytes = write_upload_stream_to_disk(field, &file_path, &extension).await?;
let written_bytes = stream_field_to_file(field, &file_path, MAX_AUDIO_FILE_SIZE).await?;
create_license_sidecar(&file_path, &extension).await;

info!("Uploaded audio asset: {}", filename);

Expand Down Expand Up @@ -664,55 +694,6 @@ async fn list_image_assets(perms: &RolePermissions) -> Result<Vec<ImageAsset>, A
Ok(all_assets)
}

/// Stream an uploaded multipart image field to disk with size enforcement.
///
/// Uses `create_new(true)` so the call fails atomically if the file already
/// exists, avoiding the TOCTOU race of a separate `exists()` pre-check.
async fn write_image_upload_to_disk(
mut field: axum::extract::multipart::Field<'_>,
file_path: &std::path::Path,
) -> Result<usize, AssetsError> {
use tokio::fs::OpenOptions;

let mut file =
OpenOptions::new().create_new(true).write(true).open(file_path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::AlreadyExists {
AssetsError::FileExists(
file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(),
)
} else {
AssetsError::IoError(format!("Failed to create file: {e}"))
}
})?;

let mut total_bytes: usize = 0;
loop {
match field.chunk().await {
Ok(Some(chunk)) => {
total_bytes = total_bytes.saturating_add(chunk.len());
if total_bytes > MAX_IMAGE_FILE_SIZE {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::FileTooLarge(MAX_IMAGE_FILE_SIZE));
}

if let Err(e) = file.write_all(&chunk).await {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::IoError(format!("Failed to write file: {e}")));
}
},
Ok(None) => break,
Err(e) => {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::InvalidRequest(format!(
"Failed to read upload stream: {e}"
)));
},
}
}

Ok(total_bytes)
}

/// Core image upload logic after permission check
async fn process_image_upload(
filename: String,
Expand All @@ -729,7 +710,7 @@ async fn process_image_upload(

let file_path = user_dir.join(&filename);

let written_bytes = write_image_upload_to_disk(field, &file_path).await?;
let written_bytes = stream_field_to_file(field, &file_path, MAX_IMAGE_FILE_SIZE).await?;

// SVG validation: parse with resvg to check validity and extract dimensions.
// Skip raster decode path entirely for SVGs.
Expand Down Expand Up @@ -1209,63 +1190,6 @@ async fn list_font_assets(perms: &RolePermissions) -> Result<Vec<FontAsset>, Ass
Ok(all_assets)
}

/// Stream an uploaded multipart font field to disk with size enforcement.
async fn write_font_upload_to_disk(
mut field: axum::extract::multipart::Field<'_>,
file_path: &std::path::Path,
extension: &str,
) -> Result<usize, AssetsError> {
use tokio::fs::OpenOptions;

let mut file =
OpenOptions::new().create_new(true).write(true).open(file_path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::AlreadyExists {
AssetsError::FileExists(
file_path.file_name().and_then(|n| n.to_str()).unwrap_or("unknown").to_string(),
)
} else {
AssetsError::IoError(format!("Failed to create file: {e}"))
}
})?;

let mut total_bytes: usize = 0;
loop {
match field.chunk().await {
Ok(Some(chunk)) => {
total_bytes = total_bytes.saturating_add(chunk.len());
if total_bytes > MAX_FONT_FILE_SIZE {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::FileTooLarge(MAX_FONT_FILE_SIZE));
}

if let Err(e) = file.write_all(&chunk).await {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::IoError(format!("Failed to write file: {e}")));
}
},
Ok(None) => break,
Err(e) => {
let _ = fs::remove_file(file_path).await;
return Err(AssetsError::InvalidRequest(format!(
"Failed to read upload stream: {e}"
)));
},
}
}

// Create default license file (best-effort).
let license_path = file_path.with_extension(format!("{extension}.license"));
// REUSE-IgnoreStart
let default_license =
"SPDX-FileCopyrightText: © 2025 User Upload\n\nSPDX-License-Identifier: CC0-1.0\n";
// REUSE-IgnoreEnd
if let Err(e) = fs::write(&license_path, default_license).await {
warn!("Failed to create license file: {}", e);
}

Ok(total_bytes)
}

/// Core font upload logic after permission check
async fn process_font_upload(
filename: String,
Expand All @@ -1281,7 +1205,8 @@ async fn process_font_upload(

let file_path = user_dir.join(&filename);

let written_bytes = write_font_upload_to_disk(field, &file_path, &extension).await?;
let written_bytes = stream_field_to_file(field, &file_path, MAX_FONT_FILE_SIZE).await?;
create_license_sidecar(&file_path, &extension).await;

// Validate that the uploaded file is actually a font by checking magic bytes.
let header = match fs::read(&file_path).await {
Expand All @@ -1306,7 +1231,7 @@ async fn process_font_upload(

if !is_valid_font {
let _ = fs::remove_file(&file_path).await;
// Also remove the license sidecar created by write_font_upload_to_disk.
// Also remove the license sidecar created by create_license_sidecar.
let license_path = file_path.with_extension(format!("{extension}.license"));
let _ = fs::remove_file(&license_path).await;
return Err(AssetsError::InvalidFormat(
Expand Down
8 changes: 8 additions & 0 deletions apps/skit/src/plugin_assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,14 @@ async fn write_upload_to_disk(
}
}

// Flush pending writes — tokio::fs::File::write_all returns as soon as
// data is copied to an internal buffer and a blocking write is spawned,
// so the last write may still be in-flight when the File is dropped.
if let Err(e) = file.flush().await {
let _ = fs::remove_file(file_path).await;
return Err(PluginAssetError::IoError(format!("Failed to flush file: {e}")));
}

Ok(total_bytes)
}

Expand Down
39 changes: 39 additions & 0 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,15 @@ pub struct NodeContext {
/// Channel for the node to emit structured view data for frontend consumption.
/// Like stats_tx, this is optional and best-effort.
pub view_data_tx: Option<mpsc::Sender<NodeViewDataUpdate>>,
/// Optional sender for engine-level control messages.
///
/// Allows nodes to send [`EngineControlMessage`] to the engine actor,
/// enabling cross-node control (e.g. sending `UpdateParams` to a sibling
/// node by name via [`EngineControlMessage::TuneNode`]).
///
/// Only provided in dynamic pipelines. `None` in oneshot/static
/// pipelines where the graph is fixed at build time.
pub engine_control_tx: Option<mpsc::Sender<crate::control::EngineControlMessage>>,
}

impl NodeContext {
Expand All @@ -348,6 +357,36 @@ impl NodeContext {
})
}

/// Send an `UpdateParams` control message to a sibling node by name.
///
/// This is a convenience wrapper around [`EngineControlMessage::TuneNode`]
/// that routes through the engine actor's control channel — the same path
/// the WebSocket/REST API uses.
///
/// Only works in dynamic pipelines (where `engine_control_tx` is `Some`).
///
/// # Errors
///
/// Returns a [`StreamKitError::Runtime`] if the engine control channel is
/// unavailable (oneshot pipeline) or closed (engine shut down).
pub async fn tune_sibling(
&self,
target_node_id: &str,
params: serde_json::Value,
) -> Result<(), StreamKitError> {
let tx = self.engine_control_tx.as_ref().ok_or_else(|| {
StreamKitError::Runtime(
"engine_control_tx not available (oneshot pipeline?)".to_string(),
)
})?;
tx.send(crate::control::EngineControlMessage::TuneNode {
node_id: target_node_id.to_string(),
message: crate::control::NodeControlMessage::UpdateParams(params),
})
.await
.map_err(|_| StreamKitError::Runtime("engine control channel closed".to_string()))
}

/// Receives a packet from the given receiver, respecting the cancellation token if present.
/// Returns None if cancelled or if the channel is closed.
///
Expand Down
5 changes: 5 additions & 0 deletions crates/engine/src/dynamic_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ pub struct DynamicEngine {
pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter<u64>,
// Node state metric (1=running, 0=not running)
pub(super) node_state_gauge: opentelemetry::metrics::Gauge<u64>,
/// Clone of the engine's own control sender, handed to every node via
/// [`NodeContext::engine_control_tx`] so that nodes can emit
/// [`EngineControlMessage::TuneNode`] to sibling nodes.
pub(super) engine_control_tx: mpsc::Sender<EngineControlMessage>,
/// Sender half of the internal channel for background node creation results.
/// Cloned into each spawned creation task.
pub(super) node_created_tx: mpsc::Sender<NodeCreatedEvent>,
Expand Down Expand Up @@ -691,6 +695,7 @@ impl DynamicEngine {
video_pool: Some(self.video_pool.clone()),
pipeline_mode: streamkit_core::PipelineMode::Dynamic,
view_data_tx: Some(channels.view_data.clone()),
engine_control_tx: Some(self.engine_control_tx.clone()),
};

// 5. Spawn Node
Expand Down
3 changes: 2 additions & 1 deletion crates/engine/src/graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ pub async fn wire_and_spawn_graph(
audio_pool: audio_pool.clone(),
video_pool: video_pool.clone(),
pipeline_mode: streamkit_core::PipelineMode::Oneshot,
view_data_tx: None, // Stateless pipelines don't emit view data
view_data_tx: None, // Stateless pipelines don't emit view data
engine_control_tx: None, // Stateless pipelines don't support cross-node control
};

tracing::debug!("Starting task for node '{}'", name);
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl Engine {
#[cfg(feature = "dynamic")]
pub fn start_dynamic_actor(&self, config: DynamicEngineConfig) -> DynamicEngineHandle {
let (control_tx, control_rx) = mpsc::channel(DEFAULT_ENGINE_CONTROL_CAPACITY);
let engine_control_tx = control_tx.clone();
let (query_tx, query_rx) = mpsc::channel(DEFAULT_ENGINE_QUERY_CAPACITY);

let node_input_capacity = config.node_input_capacity.unwrap_or(DEFAULT_NODE_INPUT_CAPACITY);
Expand Down Expand Up @@ -239,6 +240,7 @@ impl Engine {
.u64_gauge("node.state")
.with_description("Node state (1=running, 0=stopped/failed)")
.build(),
engine_control_tx,
node_created_tx: nc_tx,
node_created_rx: nc_rx,
pending_connections: Vec::new(),
Expand Down
Loading
Loading