Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 145 additions & 181 deletions apps/skit/src/websocket_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use crate::session::Session;
use crate::state::{AppState, BroadcastEvent};
use opentelemetry::global;
use streamkit_api::{
Event as ApiEvent, EventPayload, MessageType, RequestPayload, ResponsePayload,
Event as ApiEvent, EventPayload, MessageType, RequestPayload, ResponsePayload, ValidationError,
ValidationErrorType,
};
use streamkit_core::control::{EngineControlMessage, NodeControlMessage};
use streamkit_core::registry::NodeDefinition;
Expand All @@ -34,6 +35,76 @@ fn can_access_session(session: &Session, role_name: &str, perms: &Permissions) -
session.created_by.as_ref().is_none_or(|creator| creator == role_name)
}

/// Validate a single AddNode operation against permission and security rules.
///
/// Returns `Some(error_message)` if the operation is not allowed, `None` if it passes.
/// This is the single source of truth for AddNode validation, used by `handle_add_node`,
/// `handle_validate_batch`, and `handle_apply_batch`.
fn validate_add_node_op(
kind: &str,
params: Option<&serde_json::Value>,
perms: &Permissions,
security_config: &crate::config::SecurityConfig,
) -> Option<String> {
// Reject oneshot-only marker nodes on the dynamic control plane.
if kind == "streamkit::http_input" || kind == "streamkit::http_output" {
return Some(format!(
"Node type '{kind}' is oneshot-only and cannot be used in dynamic sessions"
));
}

// Check if the node type is allowed.
if !perms.is_node_allowed(kind) {
return Some(format!("Permission denied: node type '{kind}' not allowed"));
}

// If this is a plugin node, enforce the plugin allowlist too.
if kind.starts_with("plugin::") && !perms.is_plugin_allowed(kind) {
return Some(format!("Permission denied: plugin '{kind}' not allowed"));
}

// Security: validate file_reader paths.
if kind == "core::file_reader" {
let Some(path) = params.and_then(|p| p.get("path")).and_then(serde_json::Value::as_str)
else {
return Some(
"Invalid file_reader params: expected params.path to be a string".to_string(),
);
};
if let Err(e) = file_security::validate_file_path(path, security_config) {
return Some(format!("Invalid file path: {e}"));
}
}

// Security: validate file_writer paths.
if kind == "core::file_writer" {
let Some(path) = params.and_then(|p| p.get("path")).and_then(serde_json::Value::as_str)
else {
return Some(
"Invalid file_writer params: expected params.path to be a string".to_string(),
);
};
if let Err(e) = file_security::validate_write_path(path, security_config) {
return Some(format!("Invalid write path: {e}"));
}
}

// Security: validate script_path (if present) for core::script nodes.
if kind == "core::script" {
if let Some(path) =
params.and_then(|p| p.get("script_path")).and_then(serde_json::Value::as_str)
{
if !path.trim().is_empty() {
if let Err(e) = file_security::validate_file_path(path, security_config) {
return Some(format!("Invalid script_path: {e}"));
}
}
}
}

None
}

pub async fn handle_request_payload(
payload: RequestPayload,
app_state: &AppState,
Expand Down Expand Up @@ -77,8 +148,8 @@ pub async fn handle_request_payload(
RequestPayload::GetPipeline { session_id } => {
handle_get_pipeline(session_id, app_state, perms, role_name).await
},
RequestPayload::ValidateBatch { session_id: _, operations } => {
Some(handle_validate_batch(&operations, app_state, perms))
RequestPayload::ValidateBatch { session_id, operations } => {
Some(handle_validate_batch(session_id, &operations, app_state, perms, role_name).await)
},
RequestPayload::ApplyBatch { session_id, operations } => {
handle_apply_batch(session_id, operations, app_state, perms, role_name).await
Expand Down Expand Up @@ -429,73 +500,10 @@ async fn handle_add_node(
});
}

// Reject oneshot-only marker nodes on the dynamic control plane.
if kind == "streamkit::http_input" || kind == "streamkit::http_output" {
return Some(ResponsePayload::Error {
message: format!(
"Node type '{kind}' is oneshot-only and cannot be used in dynamic sessions"
),
});
}

// Check if the node type is allowed
if !perms.is_node_allowed(&kind) {
return Some(ResponsePayload::Error {
message: format!("Permission denied: node type '{kind}' not allowed"),
});
}

// If this is a plugin node, enforce the plugin allowlist too.
if kind.starts_with("plugin::") && !perms.is_plugin_allowed(&kind) {
return Some(ResponsePayload::Error {
message: format!("Permission denied: plugin '{kind}' not allowed"),
});
}

// Security: validate file_reader paths on the control plane too (not just oneshot/HTTP).
if kind == "core::file_reader" {
let Some(path) =
params.as_ref().and_then(|p| p.get("path")).and_then(serde_json::Value::as_str)
else {
return Some(ResponsePayload::Error {
message: "Invalid file_reader params: expected params.path to be a string"
.to_string(),
});
};
if let Err(e) = file_security::validate_file_path(path, &app_state.config.security) {
return Some(ResponsePayload::Error { message: format!("Invalid file path: {e}") });
}
}

// Security: validate file_writer paths on the control plane too (avoid arbitrary file writes).
if kind == "core::file_writer" {
let Some(path) =
params.as_ref().and_then(|p| p.get("path")).and_then(serde_json::Value::as_str)
else {
return Some(ResponsePayload::Error {
message: "Invalid file_writer params: expected params.path to be a string"
.to_string(),
});
};
if let Err(e) = file_security::validate_write_path(path, &app_state.config.security) {
return Some(ResponsePayload::Error { message: format!("Invalid write path: {e}") });
}
}

// Security: validate script_path (if present) for core::script nodes.
if kind == "core::script" {
if let Some(path) =
params.as_ref().and_then(|p| p.get("script_path")).and_then(serde_json::Value::as_str)
{
if !path.trim().is_empty() {
if let Err(e) = file_security::validate_file_path(path, &app_state.config.security)
{
return Some(ResponsePayload::Error {
message: format!("Invalid script_path: {e}"),
});
}
}
}
if let Some(message) =
validate_add_node_op(&kind, params.as_ref(), perms, &app_state.config.security)
{
return Some(ResponsePayload::Error { message });
}

// Get session with SHORT lock hold to avoid blocking other operations
Expand Down Expand Up @@ -1120,10 +1128,12 @@ async fn handle_get_pipeline(
Some(ResponsePayload::Pipeline { pipeline: Box::new(api_pipeline) })
}

fn handle_validate_batch(
async fn handle_validate_batch(
session_id: String,
operations: &[streamkit_api::BatchOperation],
app_state: &AppState,
perms: &Permissions,
role_name: &str,
) -> ResponsePayload {
// Validate that user has permission for modify_sessions
if !perms.modify_sessions {
Expand All @@ -1132,67 +1142,73 @@ fn handle_validate_batch(
};
}

// Basic validation: check that all referenced node types are allowed
for op in operations {
if let streamkit_api::BatchOperation::AddNode { kind, params, .. } = op {
if !perms.is_node_allowed(kind) {
return ResponsePayload::Error {
message: format!("Permission denied: node type '{kind}' not allowed"),
};
}
// Verify session exists
let session = {
let session_manager = app_state.session_manager.lock().await;
session_manager.get_session_by_name_or_id(&session_id)
};

if kind == "core::file_reader" {
let path =
params.as_ref().and_then(|p| p.get("path")).and_then(serde_json::Value::as_str);
let Some(path) = path else {
return ResponsePayload::Error {
message: "Invalid file_reader params: expected params.path to be a string"
.to_string(),
};
};
if let Err(e) = file_security::validate_file_path(path, &app_state.config.security)
{
return ResponsePayload::Error { message: format!("Invalid file path: {e}") };
}
}
let Some(session) = session else {
return ResponsePayload::Error { message: format!("Session '{session_id}' not found") };
};

if kind == "core::file_writer" {
let path =
params.as_ref().and_then(|p| p.get("path")).and_then(serde_json::Value::as_str);
let Some(path) = path else {
return ResponsePayload::Error {
message: "Invalid file_writer params: expected params.path to be a string"
.to_string(),
};
};
if let Err(e) = file_security::validate_write_path(path, &app_state.config.security)
{
return ResponsePayload::Error { message: format!("Invalid write path: {e}") };
}
}
// Check ownership
if !can_access_session(&session, role_name, perms) {
return ResponsePayload::Error {
message: "Permission denied: you do not own this session".to_string(),
};
}

if kind == "core::script" {
if let Some(path) = params
.as_ref()
.and_then(|p| p.get("script_path"))
.and_then(serde_json::Value::as_str)
{
if !path.trim().is_empty() {
if let Err(e) =
file_security::validate_file_path(path, &app_state.config.security)
{
return ResponsePayload::Error {
message: format!("Invalid script_path: {e}"),
};
}
}
// Collect all validation errors so the caller sees every problem at once.
let mut errors: Vec<ValidationError> = Vec::new();

// Pre-validate duplicate node_ids against the pipeline model, mirroring
// the same simulation that handle_apply_batch performs.
let mut live_ids: std::collections::HashSet<String> =
session.pipeline.lock().await.nodes.keys().cloned().collect();
for op in operations {
match op {
streamkit_api::BatchOperation::AddNode { node_id, .. } => {
if !live_ids.insert(node_id.clone()) {
errors.push(ValidationError {
error_type: ValidationErrorType::Error,
message: format!(
"Batch rejected: node '{node_id}' already exists in the pipeline"
),
node_id: Some(node_id.clone()),
connection_id: None,
});
}
},
streamkit_api::BatchOperation::RemoveNode { node_id } => {
live_ids.remove(node_id.as_str());
},
_ => {},
}
}

// Validate all AddNode operations against permission and security rules.
for op in operations {
if let streamkit_api::BatchOperation::AddNode { node_id, kind, params, .. } = op {
if let Some(message) =
validate_add_node_op(kind, params.as_ref(), perms, &app_state.config.security)
{
errors.push(ValidationError {
error_type: ValidationErrorType::Error,
message,
node_id: Some(node_id.clone()),
connection_id: None,
});
}
}
}

info!(operation_count = operations.len(), "Validated batch operations");
ResponsePayload::ValidationResult { errors: Vec::new() }
info!(
operation_count = operations.len(),
error_count = errors.len(),
"Validated batch operations"
);
ResponsePayload::ValidationResult { errors }
Comment on lines +1190 to +1211
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Behavioral change: ValidateBatch now returns ValidationResult instead of Error for invalid operations

The old handle_validate_batch short-circuited on the first failing AddNode operation and returned ResponsePayload::Error { message }. The new code accumulates all errors into a Vec<ValidationError> and returns ResponsePayload::ValidationResult { errors } with a non-empty list. This changes the wire-level response shape from {"action": "error", "message": "..."} to {"action": "validationresult", "errors": [...]} for operation-level failures. Clients that were matching on the error action for validation failures would need updating. This is arguably the correct behavior for a validation endpoint (returning structured results), but it is a breaking change in the API contract.

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — this is an intentional behavioral change requested in code review. The reviewer asked for ValidateBatch to return structured ValidationResult { errors } instead of opaque Error { message }, so that (a) the response shape is consistent between success and failure paths, and (b) all errors are reported at once instead of failing on the first.

Pre-flight checks (session-not-found, ownership-denied, permission-denied) still return ResponsePayload::Error since they prevent any validation from running.

No existing clients consume ValidateBatch error responses today (the batch API is new), so the migration risk is minimal.

}

#[allow(clippy::significant_drop_tightening)]
Expand Down Expand Up @@ -1256,65 +1272,13 @@ async fn handle_apply_batch(
}
} // Pipeline lock released after pre-validation

// Validate permissions for all operations
// Validate permissions for all operations.
for op in &operations {
if let streamkit_api::BatchOperation::AddNode { kind, params, .. } = op {
if !perms.is_node_allowed(kind) {
return Some(ResponsePayload::Error {
message: format!("Permission denied: node type '{kind}' not allowed"),
});
}

if kind == "core::file_reader" {
let path =
params.as_ref().and_then(|p| p.get("path")).and_then(serde_json::Value::as_str);
let Some(path) = path else {
return Some(ResponsePayload::Error {
message: "Invalid file_reader params: expected params.path to be a string"
.to_string(),
});
};
if let Err(e) = file_security::validate_file_path(path, &app_state.config.security)
{
return Some(ResponsePayload::Error {
message: format!("Invalid file path: {e}"),
});
}
}

if kind == "core::file_writer" {
let path =
params.as_ref().and_then(|p| p.get("path")).and_then(serde_json::Value::as_str);
let Some(path) = path else {
return Some(ResponsePayload::Error {
message: "Invalid file_writer params: expected params.path to be a string"
.to_string(),
});
};
if let Err(e) = file_security::validate_write_path(path, &app_state.config.security)
{
return Some(ResponsePayload::Error {
message: format!("Invalid write path: {e}"),
});
}
}

if kind == "core::script" {
if let Some(path) = params
.as_ref()
.and_then(|p| p.get("script_path"))
.and_then(serde_json::Value::as_str)
{
if !path.trim().is_empty() {
if let Err(e) =
file_security::validate_file_path(path, &app_state.config.security)
{
return Some(ResponsePayload::Error {
message: format!("Invalid script_path: {e}"),
});
}
}
}
if let Some(message) =
validate_add_node_op(kind, params.as_ref(), perms, &app_state.config.security)
{
return Some(ResponsePayload::Error { message });
}
}
}
Expand Down
Loading
Loading