Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
0df4ddc
feat(engine): make AddNode non-blocking with async node creation
streamkit-devin Apr 11, 2026
399c7b5
fix(engine): address review findings in async node creation
streamkit-devin Apr 11, 2026
38d6ef6
fix(engine): replace cancelled_creations with creation_id generation …
streamkit-devin Apr 11, 2026
b429367
style(engine): apply cargo fmt
streamkit-devin Apr 11, 2026
e02edd8
fix(engine): zero Creating gauge on RemoveNode while Creating
streamkit-devin Apr 11, 2026
a17b6cb
fix(test): accept Creating and Failed states in session_destroy test
streamkit-devin Apr 11, 2026
164d97e
fix(engine): close gauge gap, prevent leaked pending connections, imp…
streamkit-devin Apr 11, 2026
6055fc6
fix(test): remove Failed from accepted states in session_destroy test
streamkit-devin Apr 11, 2026
6493d4f
fix(engine): queue TuneNode for Creating nodes, reject duplicate AddN…
streamkit-devin Apr 11, 2026
3c7312d
fix(engine): queue TuneNode for Creating nodes, reject duplicate AddN…
streamkit-devin Apr 11, 2026
6f0ec93
fix(test): accept Failed state in session_destroy test with rationale
streamkit-devin Apr 11, 2026
e7b5ee9
fix(test): use registered audio::gain nodes in session_destroy test
streamkit-devin Apr 11, 2026
561d27c
test(engine): add test for TuneNode queuing during Creating state
streamkit-devin Apr 11, 2026
5f877f3
style: format TuneTrackingSlowNode factory call
streamkit-devin Apr 11, 2026
3dbe16b
fix(engine,skit): batch duplicate check, node_kinds cleanup, test nit
streamkit-devin Apr 11, 2026
7b91d3c
fix: update rand 0.10.0 → 0.10.1 (RUSTSEC-2026-0097)
streamkit-devin Apr 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions apps/skit/src/websocket_handlers.rs
Comment thread
staging-devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,11 @@ async fn handle_add_node(

{
let mut pipeline = session.pipeline.lock().await;
if pipeline.nodes.contains_key(&node_id) {
return Some(ResponsePayload::Error {
message: format!("Node '{node_id}' already exists in the pipeline"),
});
}
Comment thread
staging-devin-ai-integration[bot] marked this conversation as resolved.
pipeline.nodes.insert(
node_id.clone(),
streamkit_api::Node { kind: kind.clone(), params: params.clone(), state: None },
Expand Down Expand Up @@ -1224,6 +1229,33 @@ async fn handle_apply_batch(
});
}

// Pre-validate duplicate node_ids against the pipeline model.
// Simulate the batch's Add/Remove sequence so that Remove→Add for
// the same ID within the batch is allowed, but duplicate Adds
// (without intervening Remove) are rejected before any mutation.
{
let pipeline = session.pipeline.lock().await;
let mut live_ids: std::collections::HashSet<&str> =
pipeline.nodes.keys().map(String::as_str).collect();
for op in &operations {
match op {
streamkit_api::BatchOperation::AddNode { node_id, .. } => {
if !live_ids.insert(node_id.as_str()) {
return Some(ResponsePayload::Error {
message: format!(
"Batch rejected: node '{node_id}' already exists in the pipeline"
),
});
}
},
streamkit_api::BatchOperation::RemoveNode { node_id } => {
live_ids.remove(node_id.as_str());
},
_ => {},
}
}
} // Pipeline lock released after pre-validation

// Validate permissions for all operations
for op in &operations {
if let streamkit_api::BatchOperation::AddNode { kind, params, .. } = op {
Comment on lines 1259 to 1261
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Missing oneshot and plugin allowlist checks in batch operations

The handle_add_node function (line 432-453) rejects oneshot-only nodes (streamkit::http_input, streamkit::http_output) and enforces the plugin allowlist (perms.is_plugin_allowed). Neither handle_apply_batch (lines 1259-1320) nor handle_validate_batch (lines 1135-1191) perform these checks. This means a user could add oneshot-only marker nodes or bypass the plugin allowlist via the batch API. This is a pre-existing issue (the validation loops are unchanged in this PR), but it's worth noting since the developer was actively working on batch validation (adding the duplicate node_id pre-check).

(Refers to lines 1259-1320)

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — the missing oneshot and plugin allowlist checks in the batch path are a pre-existing issue, not introduced by this PR. The batch validation loop at lines 1259-1320 was unchanged; I only added the duplicate node_id pre-check. Fixing the missing oneshot/allowlist checks in the batch path would be a good follow-up but is out of scope for this PR.

Expand Down
25 changes: 12 additions & 13 deletions apps/skit/tests/session_lifecycle_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,19 +458,15 @@ async fn test_session_destroy_shuts_down_pipeline() {

println!("✅ Session created: {}", session_id);

// Add a source node (silence generator)
// Add a source node (audio::gain is a registered core node with in/out pins)
let add_source_request = Request {
message_type: MessageType::Request,
correlation_id: Some("add-source".to_string()),
payload: RequestPayload::AddNode {
session_id: session_id.clone(),
node_id: "source".to_string(),
kind: "silence".to_string(),
params: Some(json!({
"duration_ms": 10000, // 10 seconds
"sample_rate": 48000,
"channels": 2
})),
kind: "audio::gain".to_string(),
params: Some(json!({"gain": 1.0})),
},
};

Expand All @@ -488,15 +484,15 @@ async fn test_session_destroy_shuts_down_pipeline() {

println!("✅ Added source node");

// Add a gain node
// Add a second gain node
let add_gain_request = Request {
message_type: MessageType::Request,
correlation_id: Some("add-gain".to_string()),
payload: RequestPayload::AddNode {
session_id: session_id.clone(),
node_id: "gain".to_string(),
kind: "gain".to_string(),
params: Some(json!({"gain": 1.0})),
kind: "audio::gain".to_string(),
params: Some(json!({"gain": 0.5})),
},
};

Expand Down Expand Up @@ -562,18 +558,21 @@ async fn test_session_destroy_shuts_down_pipeline() {
ResponsePayload::Pipeline { pipeline } => {
assert_eq!(pipeline.nodes.len(), 2);

// Check that nodes are in Running state (not Failed or Stopped)
// Both nodes use "audio::gain" which is a registered built-in
// node type. With async creation they may still be in
// Creating state, or have progressed to Initializing/Ready.
for (node_id, node) in &pipeline.nodes {
if let Some(state) = &node.state {
println!("Node '{}' state: {:?}", node_id, state);
assert!(
matches!(
state,
streamkit_core::NodeState::Initializing
streamkit_core::NodeState::Creating
| streamkit_core::NodeState::Initializing
| streamkit_core::NodeState::Ready
| streamkit_core::NodeState::Running
),
"Node '{}' should be initializing/ready/running, got: {:?}",
"Node '{}' should be creating/initializing/ready/running, got: {:?}",
node_id,
state
);
Expand Down
11 changes: 11 additions & 0 deletions crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//! Nodes transition through these states during their lifecycle:
//!
//! ```text
//! Creating
//! ↓
//! Initializing
//! ↓
//! Ready ──────────┐
Expand Down Expand Up @@ -89,6 +91,8 @@ impl From<String> for StopReason {
/// Nodes transition through these states during their lifecycle:
///
/// ```text
/// Creating
/// ↓
/// Initializing
/// ↓
/// Ready ──────────┐
Expand All @@ -105,6 +109,8 @@ impl From<String> for StopReason {
/// ```
///
/// ### Valid Transitions:
/// - `Creating` → `Initializing` (node factory completed successfully)
/// - `Creating` → `Failed` (node factory returned an error)
/// - `Initializing` → `Ready` (source nodes) or `Running` (processing nodes)
/// - `Ready` → `Running` (when pipeline is ready)
/// - `Running` → `Recovering` (temporary issues, will retry)
Expand All @@ -120,6 +126,11 @@ impl From<String> for StopReason {
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export)]
pub enum NodeState {
/// Node is being created by the factory (e.g., loading ONNX models via FFI).
/// This state is set immediately when `AddNode` is received, before the
/// (potentially slow) constructor runs in a background task.
Creating,

/// Node is starting up and performing initialization.
/// Examples: Opening connections, loading resources, validating configuration.
Initializing,
Expand Down
Loading
Loading