From 038a68603544ecb0af64898caf41acdeedf65b6e Mon Sep 17 00:00:00 2001 From: mahesh bhatiya Date: Wed, 30 Jul 2025 23:09:20 +0530 Subject: [PATCH] feat(plugin): stream real-time stdout from WASM plugins using Pipe and WebSocket - Replaced inherit_stdio() with wasmtime_wasi::Pipe to capture plugin output - Spawn plugin execution in async task to avoid blocking - Added tokio-based loop to read stdout line-by-line - Integrated WebSocket server to forward logs to frontend - Enables real-time plugin log streaming in DevOps panel --- sandcrate-backend/Cargo.toml | 7 +- sandcrate-backend/src/api.rs | 6 +- sandcrate-backend/src/auth.rs | 3 - sandcrate-backend/src/bin/execute_plugin.rs | 76 ++-- sandcrate-backend/src/lib.rs | 20 +- sandcrate-backend/src/plugin.rs | 94 ++++- sandcrate-backend/src/websocket.rs | 181 +++++++++ sandcrate-react/src/App.tsx | 3 - .../src/components/RealtimePluginExecutor.tsx | 357 ++++++++++++++++++ sandcrate-react/src/pages/Plugins.tsx | 30 +- test_websocket.py | 46 +++ test_websocket_simple.js | 61 +++ 12 files changed, 819 insertions(+), 65 deletions(-) create mode 100644 sandcrate-backend/src/websocket.rs create mode 100644 sandcrate-react/src/components/RealtimePluginExecutor.tsx create mode 100644 test_websocket.py create mode 100644 test_websocket_simple.js diff --git a/sandcrate-backend/Cargo.toml b/sandcrate-backend/Cargo.toml index 93d725b..4e2352a 100644 --- a/sandcrate-backend/Cargo.toml +++ b/sandcrate-backend/Cargo.toml @@ -3,7 +3,7 @@ name = "sandcrate-backend" edition = "2021" [dependencies] -axum = "0.7" +axum = { version = "0.7", features = ["ws"] } tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -14,4 +14,7 @@ jsonwebtoken = "9" chrono = { version = "0.4", features = ["serde"] } tower = "0.4" tower-http = { version = "0.5", features = ["cors"] } -axum-extra = { version = "0.9", features = ["typed-header"] } \ No newline at end of file +axum-extra = { version = "0.9", features = ["typed-header"] } +tokio-tungstenite = "0.21" +futures-util = { version = "0.3", features = ["sink"] } +uuid = { version = "1.0", features = ["v4"] } \ No newline at end of file diff --git a/sandcrate-backend/src/api.rs b/sandcrate-backend/src/api.rs index cc2f02e..a0c77f7 100644 --- a/sandcrate-backend/src/api.rs +++ b/sandcrate-backend/src/api.rs @@ -58,7 +58,7 @@ struct ErrorResponse { async fn get_plugins( State(_config): State>, ) -> Json> { - let plugins_dir = FsPath::new("../assets/plugins"); + let plugins_dir = FsPath::new("assets/plugins"); let mut plugins = Vec::new(); if let Ok(entries) = fs::read_dir(plugins_dir) { @@ -113,7 +113,7 @@ async fn get_plugin( State(_config): State>, Path(plugin_id): Path, ) -> Response { - let plugins_dir = FsPath::new("../assets/plugins"); + let plugins_dir = FsPath::new("assets/plugins"); let plugin_path = plugins_dir.join(format!("{}.wasm", plugin_id)); if !plugin_path.exists() { @@ -184,7 +184,7 @@ async fn execute_plugin( let start_time = std::time::Instant::now(); // Find the plugin file - let plugins_dir = FsPath::new("../assets/plugins"); + let plugins_dir = FsPath::new("assets/plugins"); let plugin_path = plugins_dir.join(format!("{}.wasm", plugin_id)); if !plugin_path.exists() { diff --git a/sandcrate-backend/src/auth.rs b/sandcrate-backend/src/auth.rs index de373e9..5bd10b9 100644 --- a/sandcrate-backend/src/auth.rs +++ b/sandcrate-backend/src/auth.rs @@ -61,14 +61,11 @@ impl AuthConfig { } } -// Check if user has sudo/root privileges fn check_user_privileges(username: &str) -> (bool, String) { - // Check if user is root if username == "root" { return (true, "root".to_string()); } - // Check if user has sudo privileges let output = std::process::Command::new("sudo") .args(["-l", "-U", username]) .output(); diff --git a/sandcrate-backend/src/bin/execute_plugin.rs b/sandcrate-backend/src/bin/execute_plugin.rs index 18dfacc..6600080 100644 --- a/sandcrate-backend/src/bin/execute_plugin.rs +++ b/sandcrate-backend/src/bin/execute_plugin.rs @@ -1,47 +1,57 @@ -use std::env; +use std::fs; use std::path::Path; -use sandcrate_backend::plugin; +use wasmtime::*; +use wasmtime_wasi::WasiCtxBuilder; +use std::env; fn main() -> Result<(), Box> { let args: Vec = env::args().collect(); - if args.len() != 2 { - println!("Usage: {} ", args[0]); - println!("Example: {} sandcrate-plugin", args[0]); - return Ok(()); + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); } - let plugin_name = &args[1]; - let plugin_path = format!("assets/plugins/{}.wasm", plugin_name); - - if !Path::new(&plugin_path).exists() { - println!("❌ Plugin '{}' not found at {}", plugin_name, plugin_path); - println!("Available plugins:"); - - let plugins = plugin::list_plugins(); - if plugins.is_empty() { - println!(" No plugins found in assets/plugins directory"); - } else { - for plugin in plugins { - println!(" - {}", plugin); - } - } - return Ok(()); - } + let plugin_path = &args[1]; - println!("🚀 Executing plugin: {}", plugin_name); - println!("📁 Path: {}", plugin_path); - println!("---"); + // Create a WASM engine + let engine = Engine::default(); - match plugin::run_plugin(&plugin_path) { - Ok(result) => { - println!("✅ Plugin executed successfully!"); - println!("📋 Result: {}", result); - } - Err(e) => { - println!("❌ Plugin execution failed: {}", e); + // Create a store with WASI context + let wasi = WasiCtxBuilder::new() + .inherit_stdio() + .inherit_args()? + .build(); + + let mut store = Store::new(&engine, wasi); + + // Read the WASM module + let wasm_bytes = fs::read(plugin_path)?; + let module = Module::new(&engine, &wasm_bytes)?; + + // Create a linker and add WASI + let mut linker = Linker::new(&engine); + wasmtime_wasi::add_to_linker(&mut linker, |s| s)?; + + // Instantiate the module + let instance = linker.instantiate(&mut store, &module)?; + + // Try to find and call a suitable function + let function_names = ["_start", "start", "main", "run"]; + + let mut executed = false; + + for func_name in &function_names { + if let Ok(func) = instance.get_typed_func::<(), ()>(&mut store, func_name) { + func.call(&mut store, ())?; + executed = true; + break; } } + if !executed { + eprintln!("No suitable entry function found in WASM module"); + std::process::exit(1); + } + Ok(()) } \ No newline at end of file diff --git a/sandcrate-backend/src/lib.rs b/sandcrate-backend/src/lib.rs index 280d370..2c1dd4c 100644 --- a/sandcrate-backend/src/lib.rs +++ b/sandcrate-backend/src/lib.rs @@ -1,24 +1,32 @@ mod api; mod auth; pub mod plugin; +mod websocket; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::TcpListener; use tower_http::cors::CorsLayer; -use axum::Router; +use axum::{Router, routing::get}; -// Re-export plugin runner pub use plugin::run_plugin; +pub use websocket::{WebSocketManager, PluginExecutionSession}; #[tokio::main] pub async fn run_backend() { let auth_config = Arc::new(auth::AuthConfig::new()); + let ws_manager = Arc::new(websocket::WebSocketManager::new()); + + let api_router = api::routes().with_state(auth_config.clone()); + let auth_router = auth::auth_routes().with_state(auth_config.clone()); + let ws_router = Router::new() + .route("/plugins", get(websocket::plugin_execution_websocket)) + .with_state((auth_config, ws_manager)); let app = Router::new() - .nest("/api", api::routes()) - .nest("/auth", auth::auth_routes()) - .with_state(auth_config) + .nest("/api", api_router) + .nest("/auth", auth_router) + .nest("/ws", ws_router) .layer(CorsLayer::permissive()); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); @@ -26,6 +34,8 @@ pub async fn run_backend() { println!("API endpoints:"); println!(" GET http://{}/api/plugins", addr); println!(" POST http://{}/auth/login", addr); + println!("WebSocket endpoints:"); + println!(" WS ws://{}/ws/plugins", addr); let listener = TcpListener::bind(addr).await.unwrap(); axum::serve(listener, app.into_make_service()) diff --git a/sandcrate-backend/src/plugin.rs b/sandcrate-backend/src/plugin.rs index b98462c..ed2b087 100644 --- a/sandcrate-backend/src/plugin.rs +++ b/sandcrate-backend/src/plugin.rs @@ -1,9 +1,14 @@ use std::fs; use std::path::Path; -use std::time::Duration; use wasmtime::*; use wasmtime_wasi::WasiCtxBuilder; use serde_json::Value; +use tokio::sync::broadcast; +use std::sync::Arc; +use tokio::sync::Mutex; +use std::io::{Read, Write}; + + pub fn list_plugins() -> Vec { let plugins_dir = Path::new("assets/plugins"); @@ -36,10 +41,8 @@ pub fn run_plugin_with_params( parameters: Option, _timeout: Option ) -> Result> { - // Create a WASM engine let engine = Engine::default(); - // Create a store with WASI context let wasi = WasiCtxBuilder::new() .inherit_stdio() .inherit_args()? @@ -47,19 +50,14 @@ pub fn run_plugin_with_params( let mut store = Store::new(&engine, wasi); - // Read the WASM module let wasm_bytes = fs::read(plugin_path)?; let module = Module::new(&engine, &wasm_bytes)?; - // Create a linker and add WASI let mut linker = Linker::new(&engine); wasmtime_wasi::add_to_linker(&mut linker, |s| s)?; - // Instantiate the module let instance = linker.instantiate(&mut store, &module)?; - // Try to find and call a suitable function - // Common function names for WASM modules: _start, start, main, run let function_names = ["_start", "start", "main", "run"]; let mut executed = false; @@ -81,6 +79,84 @@ pub fn run_plugin_with_params( Ok(result) } +pub async fn run_plugin_with_realtime_output( + plugin_path: &str, + _parameters: Option, + _timeout: Option, + ws_tx: broadcast::Sender, + session_id: &str, +) -> Result> { + let session_id = session_id.to_string(); + let plugin_id = Path::new(plugin_path) + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown") + .to_string(); + + let _ = ws_tx.send(crate::websocket::PluginExecutionSession { + id: session_id.clone(), + plugin_id: plugin_id.clone(), + status: "starting".to_string(), + output: "Plugin execution started".to_string(), + }); + + let output = tokio::process::Command::new("cargo") + .args(&["run", "--bin", "execute_plugin", "--quiet"]) + .arg(plugin_path) + .output() + .await?; + + let _ = ws_tx.send(crate::websocket::PluginExecutionSession { + id: session_id.clone(), + plugin_id: plugin_id.clone(), + status: "running".to_string(), + output: "Executing plugin...".to_string(), + }); + + if !output.stdout.is_empty() { + let stdout_str = String::from_utf8_lossy(&output.stdout); + for line in stdout_str.lines() { + if !line.trim().is_empty() { + let _ = ws_tx.send(crate::websocket::PluginExecutionSession { + id: session_id.clone(), + plugin_id: plugin_id.clone(), + status: "running".to_string(), + output: line.trim().to_string(), + }); + } + } + } + + if !output.stderr.is_empty() { + let stderr_str = String::from_utf8_lossy(&output.stderr); + for line in stderr_str.lines() { + if !line.trim().is_empty() { + let _ = ws_tx.send(crate::websocket::PluginExecutionSession { + id: session_id.clone(), + plugin_id: plugin_id.clone(), + status: "running".to_string(), + output: format!("ERROR: {}", line.trim()), + }); + } + } + } + + let result = if output.status.success() { + "Plugin executed successfully".to_string() + } else { + format!("Plugin execution failed with exit code: {}", output.status) + }; + + let _ = ws_tx.send(crate::websocket::PluginExecutionSession { + id: session_id.clone(), + plugin_id: plugin_id.clone(), + status: "completed".to_string(), + output: format!("Plugin completed: {}", result), + }); + + Ok(result) +} + pub fn get_plugin_info(plugin_path: &str) -> Result> { let path = Path::new(plugin_path); @@ -91,12 +167,10 @@ pub fn get_plugin_info(plugin_path: &str) -> Result = module .exports() .map(|export| export.name().to_string()) diff --git a/sandcrate-backend/src/websocket.rs b/sandcrate-backend/src/websocket.rs new file mode 100644 index 0000000..23c0ca2 --- /dev/null +++ b/sandcrate-backend/src/websocket.rs @@ -0,0 +1,181 @@ +use axum::{ + extract::{ws::{Message, WebSocket, WebSocketUpgrade}, State}, + response::IntoResponse, +}; +use serde_json::json; +use std::sync::Arc; +use tokio::sync::broadcast; +use uuid::Uuid; + +use crate::auth::AuthConfig; +use crate::plugin; + +#[derive(Debug, Clone)] +pub struct PluginExecutionSession { + pub id: String, + pub plugin_id: String, + pub status: String, + pub output: String, +} + +pub struct WebSocketManager { + tx: broadcast::Sender, +} + +impl WebSocketManager { + pub fn new() -> Self { + let (tx, _) = broadcast::channel(100); + Self { tx } + } + + pub fn get_sender(&self) -> broadcast::Sender { + self.tx.clone() + } +} + +pub async fn plugin_execution_websocket( + ws: WebSocketUpgrade, + State((state, ws_manager)): State<(Arc, Arc)>, +) -> impl IntoResponse { + ws.on_upgrade(|socket| handle_plugin_execution_socket(socket, state, ws_manager)) +} + +async fn handle_plugin_execution_socket( + mut socket: WebSocket, + _state: Arc, + ws_manager: Arc, +) { + let mut rx = ws_manager.get_sender().subscribe(); + + let connect_msg = json!({ + "type": "connected", + "message": "WebSocket connected successfully" + }); + + if let Err(_) = socket.send(Message::Text(connect_msg.to_string())).await { + return; + } + + loop { + tokio::select! { + msg = socket.recv() => { + match msg { + Some(Ok(Message::Text(text))) => { + if let Ok(data) = serde_json::from_str::(&text) { + if let Some(command) = data.get("command").and_then(|c| c.as_str()) { + match command { + "execute_plugin" => { + if let (Some(plugin_id), parameters, timeout) = ( + data.get("plugin_id").and_then(|p| p.as_str()), + data.get("parameters").cloned(), + data.get("timeout").and_then(|t| t.as_u64()), + ) { + let session_id = Uuid::new_v4().to_string(); + + let initial_status = json!({ + "type": "status", + "session_id": session_id, + "plugin_id": plugin_id, + "status": "starting", + "message": "Plugin execution started" + }); + + if let Err(_) = socket.send(Message::Text(initial_status.to_string())).await { + break; + } + + let ws_tx = ws_manager.get_sender(); + let plugin_id = plugin_id.to_string(); + + tokio::spawn(async move { + let result = plugin::run_plugin_with_realtime_output( + &format!("assets/plugins/{}.wasm", plugin_id), + parameters, + timeout, + ws_tx.clone(), + &session_id, + ).await; + + let final_message = match result { + Ok(output) => json!({ + "type": "result", + "session_id": session_id, + "plugin_id": plugin_id, + "status": "completed", + "output": output, + "success": true + }), + Err(e) => json!({ + "type": "result", + "session_id": session_id, + "plugin_id": plugin_id, + "status": "error", + "error": e.to_string(), + "success": false + }), + }; + + let _ = ws_tx.send(PluginExecutionSession { + id: session_id, + plugin_id, + status: "completed".to_string(), + output: final_message.to_string(), + }); + }); + } + } + "subscribe" => { + if let Some(session_id) = data.get("session_id").and_then(|s| s.as_str()) { + let subscribe_msg = json!({ + "type": "subscribed", + "session_id": session_id, + "message": "Subscribed to session updates" + }); + + if let Err(_) = socket.send(Message::Text(subscribe_msg.to_string())).await { + break; + } + } + } + _ => { + let error_msg = json!({ + "type": "error", + "message": format!("Unknown command: {}", command) + }); + + if let Err(_) = socket.send(Message::Text(error_msg.to_string())).await { + break; + } + } + } + } + } + } + Some(Ok(Message::Close(_))) => break, + Some(Err(_)) => break, + None => break, + _ => continue, + } + } + + session_result = rx.recv() => { + match session_result { + Ok(session) => { + let message = json!({ + "type": "update", + "session_id": session.id, + "plugin_id": session.plugin_id, + "status": session.status, + "output": session.output + }); + + if let Err(_) = socket.send(Message::Text(message.to_string())).await { + break; + } + } + Err(_) => break, + } + } + } + } +} \ No newline at end of file diff --git a/sandcrate-react/src/App.tsx b/sandcrate-react/src/App.tsx index d521a84..fd00d59 100644 --- a/sandcrate-react/src/App.tsx +++ b/sandcrate-react/src/App.tsx @@ -9,7 +9,6 @@ import { Dashboard } from './pages/Dashboard'; import { Plugins } from './pages/Plugins'; import './index.css'; -// Protected Route Component const ProtectedRoute: React.FC<{ children: React.ReactNode }> = ({ children }) => { const { user, loading } = useAuth(); @@ -24,7 +23,6 @@ const ProtectedRoute: React.FC<{ children: React.ReactNode }> = ({ children }) = return <>{children}; }; -// Main Layout Component const MainLayout: React.FC = () => { return (
@@ -40,7 +38,6 @@ const MainLayout: React.FC = () => { ); }; -// Main App Component function App() { return ( diff --git a/sandcrate-react/src/components/RealtimePluginExecutor.tsx b/sandcrate-react/src/components/RealtimePluginExecutor.tsx new file mode 100644 index 0000000..25e4222 --- /dev/null +++ b/sandcrate-react/src/components/RealtimePluginExecutor.tsx @@ -0,0 +1,357 @@ +import React, { useState, useEffect, useRef } from 'react'; +import { + PlayIcon, + StopIcon, + XMarkIcon, + CheckCircleIcon, + ExclamationTriangleIcon, + ArrowPathIcon +} from '@heroicons/react/24/outline'; + +interface Plugin { + id: string; + name: string; + filename: string; + size: number; + created_at: string; +} + +interface RealtimeExecutionProps { + plugin: Plugin | null; + isOpen: boolean; + onClose: () => void; +} + +interface WebSocketMessage { + type: 'connected' | 'status' | 'update' | 'result' | 'error' | 'subscribed'; + session_id?: string; + plugin_id?: string; + status?: string; + message?: string; + output?: string; + error?: string; + success?: boolean; +} + +export const RealtimePluginExecutor: React.FC = ({ + plugin, + isOpen, + onClose +}) => { + const [isConnected, setIsConnected] = useState(false); + const [isExecuting, setIsExecuting] = useState(false); + const [output, setOutput] = useState([]); + const [sessionId, setSessionId] = useState(null); + const [executionStatus, setExecutionStatus] = useState('idle'); + const [parameters, setParameters] = useState(''); + const [error, setError] = useState(null); + + const wsRef = useRef(null); + const outputEndRef = useRef(null); + + useEffect(() => { + outputEndRef.current?.scrollIntoView({ behavior: 'smooth' }); + }, [output]); + + useEffect(() => { + return () => { + if (wsRef.current) { + wsRef.current.close(); + } + }; + }, []); + + const connectWebSocket = () => { + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsUrl = `ws://127.0.0.1:3000/ws/plugins`; + + console.log('Attempting to connect to WebSocket:', wsUrl); + + const ws = new WebSocket(wsUrl); + wsRef.current = ws; + + ws.onopen = () => { + setIsConnected(true); + setError(null); + console.log('✅ WebSocket connected successfully'); + setOutput(prev => [...prev, '🔗 Connecting to WebSocket...']); + }; + + ws.onmessage = (event) => { + console.log('📥 WebSocket message received:', event.data); + try { + const message: WebSocketMessage = JSON.parse(event.data); + handleWebSocketMessage(message); + } catch (err) { + console.error('❌ Failed to parse WebSocket message:', err); + setOutput(prev => [...prev, `❌ Failed to parse message: ${event.data}`]); + } + }; + + ws.onclose = (event) => { + setIsConnected(false); + console.log('🔌 WebSocket disconnected:', event.code, event.reason); + setOutput(prev => [...prev, `🔌 WebSocket disconnected (${event.code})`]); + }; + + ws.onerror = (error) => { + console.error('❌ WebSocket error:', error); + setError('WebSocket connection failed'); + setOutput(prev => [...prev, '❌ WebSocket connection failed']); + }; + }; + + const handleWebSocketMessage = (message: WebSocketMessage) => { + switch (message.type) { + case 'connected': + setOutput(prev => [...prev, `🔗 ${message.message || 'WebSocket connected'}`]); + break; + + case 'status': + if (message.status === 'starting') { + setExecutionStatus('starting'); + setOutput(prev => [...prev, `🚀 ${message.message || 'Plugin execution started'}`]); + } + break; + + case 'update': + if (message.output) { + setOutput(prev => [...prev, message.output]); + } + if (message.status) { + setExecutionStatus(message.status); + } + break; + + case 'result': + if (message.success) { + setExecutionStatus('completed'); + setOutput(prev => [...prev, `✅ Plugin execution completed successfully`]); + if (message.output) { + setOutput(prev => [...prev, `📄 Final output: ${message.output}`]); + } + } else { + setExecutionStatus('error'); + setError(message.error || 'Plugin execution failed'); + setOutput(prev => [...prev, `❌ Plugin execution failed: ${message.error}`]); + } + setIsExecuting(false); + break; + + case 'error': + setError(message.message || 'An error occurred'); + setOutput(prev => [...prev, `❌ Error: ${message.message}`]); + break; + + case 'subscribed': + setOutput(prev => [...prev, `📡 ${message.message || 'Subscribed to session updates'}`]); + break; + } + }; + + const executePlugin = () => { + if (!plugin || !isConnected || !wsRef.current) return; + + // Clear previous output + setOutput([]); + setError(null); + setIsExecuting(true); + setExecutionStatus('starting'); + + // Parse parameters + let parsedParams = {}; + if (parameters.trim()) { + try { + parsedParams = JSON.parse(parameters); + } catch (e) { + setError('Invalid JSON parameters'); + setIsExecuting(false); + return; + } + } + + // Send execution command + const command = { + command: 'execute_plugin', + plugin_id: plugin.id, + parameters: parsedParams, + timeout: 30000 // 30 seconds timeout + }; + + wsRef.current.send(JSON.stringify(command)); + }; + + const stopExecution = () => { + if (wsRef.current) { + wsRef.current.close(); + setIsExecuting(false); + setExecutionStatus('stopped'); + setOutput(prev => [...prev, '🛑 Execution stopped by user']); + } + }; + + const clearOutput = () => { + setOutput([]); + setError(null); + setExecutionStatus('idle'); + }; + + // Connect to WebSocket when modal opens + useEffect(() => { + if (isOpen && !isConnected) { + console.log('Modal opened, attempting WebSocket connection...'); + connectWebSocket(); + } + }, [isOpen, isConnected]); + + useEffect(() => { + if (isOpen && wsRef.current) { + const checkConnection = () => { + if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { + setIsConnected(true); + } else if (wsRef.current && wsRef.current.readyState === WebSocket.CLOSED) { + setIsConnected(false); + } + }; + + const interval = setInterval(checkConnection, 1000); + return () => clearInterval(interval); + } + }, [isOpen]); + + const handleClose = () => { + if (wsRef.current) { + wsRef.current.close(); + } + setOutput([]); + setError(null); + setExecutionStatus('idle'); + setIsExecuting(false); + setParameters(''); + onClose(); + }; + + if (!isOpen || !plugin) return null; + + return ( +
+
+
+
+

Real-time Plugin Execution

+

{plugin.name}

+
+
+
+
+ {isConnected ? 'Connected' : 'Disconnected'} +
+ +
+
+ +
+
+ +