From d7188ecb31d2b7f43866c50b429e1559ca257ad0 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 29 Dec 2025 06:59:31 +0800 Subject: [PATCH 1/4] feat(acp): Honor MCP servers from clients like Zed (stdio + http) Session-scoped MCP server support allows IDE clients to pass MCP server configurations when creating new sessions. Servers are connected at session start and tools merged into the available tool set. Signed-off-by: Adrian Cole --- .gitignore | 3 + Cargo.lock | 53 +++ cli/Cargo.toml | 2 + cli/src/commands/acp/server.rs | 374 ++++++++++++---- cli/src/commands/agent/run/mcp_init.rs | 5 +- cli/src/config.rs | 14 + cli/tests/acp_integration_test.rs | 403 ++++++++++++++++++ cli/tests/common.rs | 42 ++ cli/tests/test_data/openai_basic_response.txt | 9 + .../test_data/openai_session_description.json | 1 + .../test_data/openai_tool_call_response.txt | 9 + .../test_data/openai_tool_result_response.txt | 9 + libs/mcp/client/src/lib.rs | 44 +- libs/mcp/proxy/src/client/mod.rs | 6 +- libs/mcp/proxy/src/server/mod.rs | 4 +- libs/shared/src/cert_utils.rs | 6 + 16 files changed, 884 insertions(+), 100 deletions(-) create mode 100644 cli/tests/acp_integration_test.rs create mode 100644 cli/tests/common.rs create mode 100644 cli/tests/test_data/openai_basic_response.txt create mode 100644 cli/tests/test_data/openai_session_description.json create mode 100644 cli/tests/test_data/openai_tool_call_response.txt create mode 100644 cli/tests/test_data/openai_tool_result_response.txt diff --git a/.gitignore b/.gitignore index 1f66e6ef..34f5d941 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ debug.log # Stakpak local files .stakpak + +# IDE +.idea/ diff --git a/Cargo.lock b/Cargo.lock index 006afeea..58b2924a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,6 +233,16 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "async-broadcast" version = "0.7.2" @@ -1261,6 +1271,24 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "deadpool" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" +dependencies = [ + "deadpool-runtime", + "lazy_static", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "deflate64" version = "0.1.10" @@ -5373,6 +5401,7 @@ version = "0.3.4-beta.3" dependencies = [ "agent-client-protocol", "async-trait", + "axum 0.8.7", "base64 0.22.1", "chrono", "clap 4.5.51", @@ -5411,6 +5440,7 @@ dependencies = [ "tracing-subscriber", "uuid", "walkdir", + "wiremock", "zip", ] @@ -7210,6 +7240,29 @@ dependencies = [ "winapi", ] +[[package]] +name = "wiremock" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" +dependencies = [ + "assert-json-diff", + "base64 0.22.1", + "deadpool", + "futures", + "http 1.3.1", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen" version = "0.46.0" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index eb427812..3b6a3181 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -48,6 +48,8 @@ base64 = "0.22" [dev-dependencies] tempfile = "3" test-case = "3" +wiremock = "0.6" +axum = "0.8" [lints.clippy] unwrap_used = "deny" diff --git a/cli/src/commands/acp/server.rs b/cli/src/commands/acp/server.rs index bc4f3ef1..a0f10318 100644 --- a/cli/src/commands/acp/server.rs +++ b/cli/src/commands/acp/server.rs @@ -10,6 +10,7 @@ use stakpak_api::{ remote::{ClientConfig, RemoteClient}, }; use stakpak_mcp_client::McpClient; +use stakpak_mcp_proxy::client::ServerConfig; use stakpak_shared::models::integrations::mcp::CallToolResultExt; use stakpak_shared::models::integrations::openai::{ AgentModel, ChatCompletionChoice, ChatCompletionResponse, ChatCompletionStreamResponse, @@ -17,13 +18,28 @@ use stakpak_shared::models::integrations::openai::{ ToolCall, ToolCallResultProgress, ToolCallResultStatus, }; use stakpak_shared::models::llm::LLMTokenUsage; -use std::cell::Cell; +use std::cell::{Cell, RefCell}; use std::path::Path; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; use uuid::Uuid; +struct CurrentSession { + id: Uuid, + /// MCP tools provided by the IDE for this session + mcp_tools: Vec<(Arc, rmcp::model::Tool)>, +} + +impl Drop for CurrentSession { + fn drop(&mut self) { + // Cancel all MCP clients to trigger graceful shutdown + for (client, _) in &self.mcp_tools { + client.cancellation_token().cancel(); + } + } +} + pub struct StakpakAcpAgent { config: AppConfig, client: Arc, @@ -32,7 +48,6 @@ pub struct StakpakAcpAgent { mcp_client: Option>, mcp_tools: Vec, tools: Option>, - current_session_id: Cell>, progress_tx: Option>, // Add persistent message history for conversation context messages: Arc>>, @@ -56,6 +71,64 @@ pub struct StakpakAcpAgent { fs_operation_tx: Option>, // Capabilities advertised by the client during initialization client_capabilities: Arc>, + /// Session-scoped state + current_session: RefCell>, +} + +/// Convert ACP McpServer to proxy ServerConfig +fn mcp_server_to_proxy_config(server: acp::McpServer) -> Result<(String, ServerConfig), String> { + match server { + acp::McpServer::Http(http) => Ok(( + http.name.clone(), + ServerConfig::Http { + url: http.url, + headers: http + .headers + .into_iter() + .map(|h| (h.name, h.value)) + .collect(), + certificate_chain: Arc::new(None), + }, + )), + acp::McpServer::Stdio(stdio) => Ok(( + stdio.name.clone(), + ServerConfig::Stdio { + command: stdio.command.to_string_lossy().to_string(), + args: stdio.args, + env: Some(stdio.env.into_iter().map(|e| (e.name, e.value)).collect()), + }, + )), + acp::McpServer::Sse(sse) => Err(format!( + "SSE transport is deprecated and not supported: {}", + sse.name + )), + _ => Err("Unknown MCP server transport type".to_string()), + } +} + +async fn connect_mcp_server( + config: ServerConfig, + name: &str, + progress_tx: Option>, +) -> Result<(Arc, Vec), acp::Error> { + let client = match config { + ServerConfig::Http { url, headers, .. } => { + stakpak_mcp_client::connect_https(&url, None, headers, progress_tx).await + } + ServerConfig::Stdio { command, args, env } => { + stakpak_mcp_client::connect_stdio(&command, args, env, progress_tx).await + } + } + .map_err(|e| { + acp::Error::internal_error().data(format!("Failed to connect to MCP server {name}: {e}")) + })?; + + let tools = stakpak_mcp_client::get_tools(&client).await.map_err(|e| { + acp::Error::internal_error() + .data(format!("Failed to get tools from MCP server {name}: {e}")) + })?; + + Ok((Arc::new(client), tools)) } impl StakpakAcpAgent { @@ -73,18 +146,18 @@ impl StakpakAcpAgent { api_key: Some("dummy_for_initialization".to_string()), api_endpoint: api_config.api_endpoint.clone(), }) - .map_err(|e| format!("Failed to create client: {}", e))?; + .map_err(|e| format!("Failed to create client: {e}"))?; Arc::new(client) } else { let client = RemoteClient::new(&api_config) - .map_err(|e| format!("Failed to create client: {}", e))?; + .map_err(|e| format!("Failed to create client: {e}"))?; Arc::new(client) } } ProviderType::Local => { let client = LocalClient::new(LocalClientConfig { stakpak_base_url: Some(config.api_endpoint.clone()), - store_path: None, + store_path: config.store_path.clone(), anthropic_config: config.anthropic.clone(), openai_config: config.openai.clone(), gemini_config: config.gemini.clone(), @@ -94,7 +167,7 @@ impl StakpakAcpAgent { hook_registry: None, }) .await - .map_err(|e| format!("Failed to create local client: {}", e))?; + .map_err(|e| format!("Failed to create local client: {e}"))?; Arc::new(client) } }; @@ -107,10 +180,7 @@ impl StakpakAcpAgent { (Some(client), mcp_tools, tool_list) } Err(e) => { - log::warn!( - "Failed to initialize MCP client: {}, continuing without tools", - e - ); + log::warn!("Failed to initialize MCP client: {e}, continuing without tools"); (None, Vec::new(), Vec::new()) } }; @@ -132,7 +202,6 @@ impl StakpakAcpAgent { mcp_client, mcp_tools, tools: if tools.is_empty() { None } else { Some(tools) }, - current_session_id: Cell::new(None), progress_tx: None, messages: Arc::new(tokio::sync::Mutex::new(messages)), permission_request_tx: None, @@ -145,6 +214,7 @@ impl StakpakAcpAgent { client_capabilities: Arc::new(tokio::sync::Mutex::new( acp::ClientCapabilities::default(), )), + current_session: RefCell::new(None), }) } @@ -228,7 +298,7 @@ impl StakpakAcpAgent { tool_call.function.name, tool_title ); - log::info!("Tool Call ID: {}", tool_call_id); + log::info!("Tool Call ID: {tool_call_id}"); // Create permission options as shown in the image let options = vec![ @@ -308,7 +378,7 @@ impl StakpakAcpAgent { tool_names::VIEW => { // Extract path from arguments for view tool if let Some(path) = raw_input.get("path").and_then(|p| p.as_str()) { - format!("Read {}", path) + format!("Read {path}") } else { "Read".to_string() } @@ -316,7 +386,7 @@ impl StakpakAcpAgent { tool_names::RUN_COMMAND => { // Extract command from arguments for run_command tool if let Some(command) = raw_input.get("command").and_then(|c| c.as_str()) { - format!("Run command {}", command) + format!("Run command {command}") } else { "Run command".to_string() } @@ -324,7 +394,7 @@ impl StakpakAcpAgent { tool_names::CREATE | tool_names::CREATE_FILE => { // Extract path from arguments for create tool if let Some(path) = raw_input.get("path").and_then(|p| p.as_str()) { - format!("Creating {}", path) + format!("Creating {path}") } else { "Creating".to_string() } @@ -332,7 +402,7 @@ impl StakpakAcpAgent { tool_names::STR_REPLACE | tool_names::EDIT_FILE => { // Extract path from arguments for edit tool if let Some(path) = raw_input.get("path").and_then(|p| p.as_str()) { - format!("Editing {}", path) + format!("Editing {path}") } else { "Editing".to_string() } @@ -340,7 +410,7 @@ impl StakpakAcpAgent { tool_names::DELETE_FILE => { // Extract path from arguments for delete tool if let Some(path) = raw_input.get("path").and_then(|p| p.as_str()) { - format!("Deleting {}", path) + format!("Deleting {path}") } else { "Deleting".to_string() } @@ -348,7 +418,7 @@ impl StakpakAcpAgent { tool_names::SEARCH_DOCS => { // Extract query from arguments for search tool if let Some(query) = raw_input.get("query").and_then(|q| q.as_str()) { - format!("Search docs: {}", query) + format!("Search docs: {query}") } else { "Search docs".to_string() } @@ -356,7 +426,7 @@ impl StakpakAcpAgent { tool_names::LOCAL_CODE_SEARCH => { // Extract query from arguments for search tool if let Some(query) = raw_input.get("query").and_then(|q| q.as_str()) { - format!("Search local context: {}", query) + format!("Search local context: {query}") } else { "Search local context".to_string() } @@ -366,7 +436,7 @@ impl StakpakAcpAgent { // Default case: format tool name nicely and add path if available let formatted_name = self.format_tool_name(tool_name); if let Some(path) = raw_input.get("path").and_then(|p| p.as_str()) { - format!("{} {}", formatted_name, path) + format!("{formatted_name} {path}") } else { formatted_name } @@ -585,7 +655,7 @@ impl StakpakAcpAgent { .map_err(|_| acp::Error::internal_error())?; rx.await.map_err(|_| acp::Error::internal_error())?; - log::info!("Sent agent plan with {} entries", entries_count); + log::info!("Sent agent plan with {entries_count} entries"); Ok(()) } @@ -895,34 +965,71 @@ impl StakpakAcpAgent { log::error!("ACP filesystem tool execution failed: {e}"); acp::Error::internal_error().data(format!("Tool execution failed: {e}")) })? - } else if let Some(ref mcp_client) = self.mcp_client { - log::info!( - "🔧 DEBUG: Executing tool call: {} with MCP client", - tool_call.function.name - ); + } else { + // Check if tool is from session MCP servers first + let session_match = { + let session = self.current_session.borrow(); + session.as_ref().and_then(|s| { + s.mcp_tools + .iter() + .find(|(_, t)| t.name == tool_call.function.name) + .map(|(c, t)| (c.clone(), t.clone())) + }) + }; - // Create cancellation receiver for this tool call - let tool_cancel_rx = self.tool_cancel_tx.as_ref().map(|tx| tx.subscribe()); + let session_id = self.current_session.borrow().as_ref().map(|s| s.id); - crate::commands::agent::run::tooling::run_tool_call( - mcp_client, - &self.mcp_tools, - &tool_call, - tool_cancel_rx, - self.current_session_id.get(), - ) - .await - .map_err(|e| { - log::error!("MCP tool execution failed: {e}"); - acp::Error::internal_error().data(format!("MCP tool execution failed: {e}")) - })? - } else { - let error_msg = format!( - "No execution method available for tool: {}", - tool_call.function.name - ); - log::error!("{error_msg}"); - return Err(acp::Error::internal_error().data(error_msg)); + if let Some((client, tool)) = session_match { + log::info!( + "🔧 DEBUG: Executing tool call: {} with session MCP client", + tool_call.function.name + ); + + let tool_cancel_rx = self.tool_cancel_tx.as_ref().map(|tx| tx.subscribe()); + let tools_slice = [tool]; + + crate::commands::agent::run::tooling::run_tool_call( + &client, + &tools_slice, + &tool_call, + tool_cancel_rx, + session_id, + ) + .await + .map_err(|e| { + log::error!("Session MCP tool execution failed: {e}"); + acp::Error::internal_error() + .data(format!("Session MCP tool execution failed: {e}")) + })? + } else if let Some(ref mcp_client) = self.mcp_client { + log::info!( + "🔧 DEBUG: Executing tool call: {} with MCP client", + tool_call.function.name + ); + + // Create cancellation receiver for this tool call + let tool_cancel_rx = self.tool_cancel_tx.as_ref().map(|tx| tx.subscribe()); + + crate::commands::agent::run::tooling::run_tool_call( + mcp_client, + &self.mcp_tools, + &tool_call, + tool_cancel_rx, + session_id, + ) + .await + .map_err(|e| { + log::error!("MCP tool execution failed: {e}"); + acp::Error::internal_error().data(format!("MCP tool execution failed: {e}")) + })? + } else { + let error_msg = format!( + "No execution method available for tool: {}", + tool_call.function.name + ); + log::error!("{error_msg}"); + return Err(acp::Error::internal_error().data(error_msg)); + } }; log::info!( @@ -1070,13 +1177,13 @@ impl StakpakAcpAgent { let mcp_client = Arc::new( stakpak_mcp_client::connect(None) // progress_tx will be set later in run_stdio .await - .map_err(|e| format!("Failed to connect to MCP proxy: {}", e))?, + .map_err(|e| format!("Failed to connect to MCP proxy: {e}"))?, ); // Get tools from MCP client let mcp_tools = stakpak_mcp_client::get_tools(&mcp_client) .await - .map_err(|e| format!("Failed to get tools: {}", e))?; + .map_err(|e| format!("Failed to get tools: {e}"))?; let tools = convert_tools_with_filter(&mcp_tools, config.allowed_tools.as_ref()); @@ -1195,7 +1302,7 @@ impl StakpakAcpAgent { if !plan_entries.is_empty() && let Err(e) = self.send_agent_plan(session_id, plan_entries).await { - log::warn!("Failed to send agent plan during streaming: {}", e); + log::warn!("Failed to send agent plan during streaming: {e}"); // Don't fail the streaming if plan sending fails } @@ -1287,7 +1394,7 @@ impl StakpakAcpAgent { } } Err(e) => { - return Err(format!("Stream error: {:?}", e)); + return Err(format!("Stream error: {e:?}")); } } } @@ -1343,7 +1450,7 @@ impl StakpakAcpAgent { // Spawn signal handler task tokio::spawn(async move { if let Err(e) = tokio::signal::ctrl_c().await { - log::error!("Failed to install Ctrl+C handler: {}", e); + log::error!("Failed to install Ctrl+C handler: {e}"); return; } log::info!("Received Ctrl+C, shutting down ACP agent..."); @@ -1369,7 +1476,7 @@ impl StakpakAcpAgent { (Some(client), mcp_tools, tool_list) } Err(e) => { - log::warn!("Failed to reinitialize MCP client with progress channel: {}, continuing without tools", e); + log::warn!("Failed to reinitialize MCP client with progress channel: {e}, continuing without tools"); (None, Vec::new(), Vec::new()) } }; @@ -1389,7 +1496,6 @@ impl StakpakAcpAgent { mcp_client, mcp_tools, tools: if tools.is_empty() { None } else { Some(tools) }, - current_session_id: self.current_session_id.clone(), progress_tx: Some(progress_tx), messages: self.messages.clone(), permission_request_tx: Some(permission_tx), @@ -1400,6 +1506,7 @@ impl StakpakAcpAgent { streaming_buffer: self.streaming_buffer.clone(), fs_operation_tx: Some(fs_operation_tx), client_capabilities: self.client_capabilities.clone(), + current_session: RefCell::new(None), }; // Start up the StakpakAcpAgent connected to stdio. @@ -1418,11 +1525,11 @@ impl StakpakAcpAgent { let conn_for_notifications = conn_arc.clone(); tokio::task::spawn_local(async move { while let Some((session_notification, ack_tx)) = rx.recv().await { - log::info!("Sending session notification: {:?}", session_notification); + log::info!("Sending session notification: {session_notification:?}"); let result = AcpClient::session_notification(&*conn_for_notifications, session_notification).await; if let Err(e) = result { - log::error!("Failed to send session notification: {}", e); + log::error!("Failed to send session notification: {e}"); break; } log::info!("Session notification sent successfully"); @@ -1434,14 +1541,14 @@ impl StakpakAcpAgent { let conn_for_permissions = conn_arc.clone(); tokio::task::spawn_local(async move { while let Some((permission_request, response_tx)) = permission_rx.recv().await { - log::info!("Sending permission request: {:?}", permission_request); + log::info!("Sending permission request: {permission_request:?}"); match conn_for_permissions.request_permission(permission_request).await { Ok(response) => { - log::info!("Permission request response: {:?}", response); + log::info!("Permission request response: {response:?}"); let _ = response_tx.send(response); } Err(e) => { - log::error!("Permission request failed: {}", e); + log::error!("Permission request failed: {e}"); // Send a default rejection response let _ = response_tx.send( acp::RequestPermissionResponse::new( @@ -1485,7 +1592,7 @@ impl StakpakAcpAgent { result = handle_io => { match result { Ok(_) => log::info!("ACP connection closed normally"), - Err(e) => log::error!("ACP connection error: {}", e), + Err(e) => log::error!("ACP connection error: {e}"), } } _ = shutdown_rx.recv() => { @@ -1509,7 +1616,6 @@ impl Clone for StakpakAcpAgent { mcp_client: self.mcp_client.clone(), mcp_tools: self.mcp_tools.clone(), tools: self.tools.clone(), - current_session_id: Cell::new(self.current_session_id.get()), progress_tx: self.progress_tx.clone(), messages: self.messages.clone(), permission_request_tx: self.permission_request_tx.clone(), @@ -1520,6 +1626,7 @@ impl Clone for StakpakAcpAgent { streaming_buffer: self.streaming_buffer.clone(), fs_operation_tx: self.fs_operation_tx.clone(), client_capabilities: self.client_capabilities.clone(), + current_session: RefCell::new(None), } } } @@ -1617,16 +1724,13 @@ impl acp::Agent for StakpakAcpAgent { )); } - let temp_session_id = Uuid::new_v4(); - let session_id = acp::SessionId::new(temp_session_id.to_string()); - - // Track the current session ID - self.current_session_id.set(Some(temp_session_id)); + let session_uuid = Uuid::new_v4(); + let session_id = acp::SessionId::new(session_uuid.to_string()); // Clear message history for new session { let mut messages = self.messages.lock().await; - //copy system message if exists + // Keep system message if exists let system_message = messages .iter() .find(|msg| msg.role == Role::System) @@ -1637,6 +1741,35 @@ impl acp::Agent for StakpakAcpAgent { } } + // Per ACP spec, MCP servers are session-scoped. + // Connect to session MCP servers and collect their tools. + let mut mcp_tools: Vec<(Arc, rmcp::model::Tool)> = Vec::new(); + for mcp_server in args.mcp_servers { + let (name, config) = match mcp_server_to_proxy_config(mcp_server) { + Ok(c) => c, + Err(e) => { + log::error!("Failed to convert MCP server config: {e}"); + return Err(acp::Error::invalid_params().data(e)); + } + }; + + let (client, tools) = + connect_mcp_server(config, &name, self.progress_tx.clone()).await?; + + // Store each tool with its client + mcp_tools.extend(tools.into_iter().map(|t| (client.clone(), t))); + } + + if !mcp_tools.is_empty() { + log::info!("Session MCP tools initialized: {} tools", mcp_tools.len()); + } + + // Store session state + *self.current_session.borrow_mut() = Some(CurrentSession { + id: session_uuid, + mcp_tools, + }); + Ok(acp::NewSessionResponse::new(session_id)) } @@ -1653,10 +1786,12 @@ impl acp::Agent for StakpakAcpAgent { Err(_) => return Err(acp::Error::invalid_params()), }; - // Track the loaded session ID - self.current_session_id.set(Some(session_uuid)); + *self.current_session.borrow_mut() = Some(CurrentSession { + id: session_uuid, + mcp_tools: Vec::new(), + }); - log::info!("Loaded session: {}", session_id_str); + log::info!("Loaded session: {session_id_str}"); Ok(acp::LoadSessionResponse::new()) } @@ -1673,7 +1808,7 @@ impl acp::Agent for StakpakAcpAgent { }) .collect::>() .join(" "); - log::info!("Processed prompt text: {}", prompt_text); + log::info!("Processed prompt text: {prompt_text}"); let user_msg = user_message(prompt_text); // Add user message to conversation history @@ -1682,9 +1817,15 @@ impl acp::Agent for StakpakAcpAgent { messages.push(user_msg.clone()); } - // Use tools if available - let tools = self.tools.clone().unwrap_or_default(); - log::info!("Available tools: {}", tools.len()); + // Use tools if available (combine connection-scoped + session-scoped tools) + let mut tools = self.tools.clone().unwrap_or_default(); + if let Some(ref session) = *self.current_session.borrow() { + let session_tools: Vec<_> = session.mcp_tools.iter().map(|(_, t)| t.clone()).collect(); + let converted = + convert_tools_with_filter(&session_tools, self.config.allowed_tools.as_ref()); + tools.extend(converted); + } + log::info!("Available tools: {} (including session tools)", tools.len()); // Get current conversation history let messages = { @@ -1698,8 +1839,8 @@ impl acp::Agent for StakpakAcpAgent { tools.len(), messages.len() ); - log::info!("User message: {:?}", user_msg); - log::info!("Tools: {:?}", tools); + log::info!("User message: {user_msg:?}"); + log::info!("Tools: {tools:?}"); // Only pass tools if we have any let tools_option = if tools.is_empty() { None } else { Some(tools) }; @@ -1729,7 +1870,7 @@ impl acp::Agent for StakpakAcpAgent { ); } }; - log::info!("Chat completion successful, response: {:?}", response); + log::info!("Chat completion successful, response: {response:?}"); log::info!("Response choices count: {}", response.choices.len()); if !response.choices.is_empty() { log::info!("First choice message: {:?}", response.choices[0].message); @@ -1748,7 +1889,7 @@ impl acp::Agent for StakpakAcpAgent { let content = if let Some(content) = &response.choices[0].message.content { match content { MessageContent::String(s) => { - log::info!("Content from chat completion: '{}'", s); + log::info!("Content from chat completion: '{s}'"); s.clone() } MessageContent::Array(parts) => { @@ -1759,10 +1900,7 @@ impl acp::Agent for StakpakAcpAgent { .filter(|text| !text.starts_with("")) .collect::>() .join("\n"); - log::info!( - "Content from chat completion array: '{}'", - extracted_content - ); + log::info!("Content from chat completion array: '{extracted_content}'"); extracted_content } } @@ -1771,7 +1909,7 @@ impl acp::Agent for StakpakAcpAgent { String::new() }; - log::info!("Final content to send: '{}'", content); + log::info!("Final content to send: '{content}'"); // If content is empty, provide a fallback response if content.is_empty() { @@ -1793,10 +1931,7 @@ impl acp::Agent for StakpakAcpAgent { .map(|tc| !tc.is_empty()) .unwrap_or(false); - log::info!( - "🔧 DEBUG: Initial response has tool calls: {}", - has_tool_calls - ); + log::info!("🔧 DEBUG: Initial response has tool calls: {has_tool_calls}"); if has_tool_calls && let Some(tool_calls) = response.choices[0].message.tool_calls.as_ref() { log::info!("🔧 DEBUG: Initial tool calls count: {}", tool_calls.len()); @@ -1861,7 +1996,7 @@ impl acp::Agent for StakpakAcpAgent { .process_tool_calls_with_cancellation(tool_calls.clone(), &args.session_id) .await .map_err(|e| { - log::error!("Tool call processing failed: {}", e); + log::error!("Tool call processing failed: {e}"); e })?; @@ -1950,7 +2085,7 @@ impl acp::Agent for StakpakAcpAgent { .map(|tc| !tc.is_empty()) .unwrap_or(false); - log::info!("Follow-up response has tool calls: {}", has_tool_calls); + log::info!("Follow-up response has tool calls: {has_tool_calls}"); } else { // No tool calls in the latest message, exit the loop break; @@ -1979,7 +2114,7 @@ impl acp::Agent for StakpakAcpAgent { // Cancel streaming if channel is available if let Some(tx) = &self.stream_cancel_tx { if let Err(e) = tx.send(()) { - log::warn!("Failed to send stream cancellation signal: {}", e); + log::warn!("Failed to send stream cancellation signal: {e}"); } else { log::info!("Stream cancellation signal sent"); } @@ -1988,7 +2123,7 @@ impl acp::Agent for StakpakAcpAgent { // Cancel tool execution if channel is available if let Some(tx) = &self.tool_cancel_tx { if let Err(e) = tx.send(()) { - log::warn!("Failed to send tool cancellation signal: {}", e); + log::warn!("Failed to send tool cancellation signal: {e}"); } else { log::info!("Tool cancellation signal sent"); } @@ -2019,7 +2154,7 @@ impl acp::Agent for StakpakAcpAgent { } if tool_calls_count > 0 { - log::info!("Cancelled {} active tool calls", tool_calls_count); + log::info!("Cancelled {tool_calls_count} active tool calls"); } Ok(()) @@ -2028,9 +2163,64 @@ impl acp::Agent for StakpakAcpAgent { #[cfg(test)] mod tests { + use super::mcp_server_to_proxy_config; use crate::commands::acp::tool_names; + use agent_client_protocol as acp; + use stakpak_mcp_proxy::client::ServerConfig; + use std::collections::HashMap; + use std::sync::Arc; use test_case::test_case; + #[test_case( + acp::McpServer::Stdio( + acp::McpServerStdio::new("github", "/path/to/github-mcp-server") + .args(vec!["stdio".into()]) + .env(vec![acp::EnvVariable::new( + "GITHUB_PERSONAL_ACCESS_TOKEN", + "ghp_xxxxxxxxxxxx", + )]) + ), + Ok(( + "github".to_string(), + ServerConfig::Stdio { + command: "/path/to/github-mcp-server".to_string(), + args: vec!["stdio".to_string()], + env: Some(HashMap::from([ + ("GITHUB_PERSONAL_ACCESS_TOKEN".to_string(), "ghp_xxxxxxxxxxxx".to_string()) + ])), + } + )) + ; "stdio transport" + )] + #[test_case( + acp::McpServer::Http( + acp::McpServerHttp::new("github", "https://api.githubcopilot.com/mcp/") + .headers(vec![acp::HttpHeader::new("Authorization", "Bearer ghp_xxxxxxxxxxxx")]) + ), + Ok(( + "github".to_string(), + ServerConfig::Http { + url: "https://api.githubcopilot.com/mcp/".to_string(), + headers: HashMap::from([ + ("Authorization".to_string(), "Bearer ghp_xxxxxxxxxxxx".to_string()) + ]), + certificate_chain: Arc::new(None), + } + )) + ; "http transport" + )] + #[test_case( + acp::McpServer::Sse(acp::McpServerSse::new("test-sse", "https://example.com/sse")), + Err("SSE transport is deprecated and not supported: test-sse".to_string()) + ; "sse transport rejected" + )] + fn test_mcp_server_to_proxy_config( + input: acp::McpServer, + expected: Result<(String, ServerConfig), String>, + ) { + assert_eq!(mcp_server_to_proxy_config(input), expected); + } + // Per ACP spec, agents MUST check client capabilities before delegating fs operations. // Both readTextFile and writeTextFile default to false - delegation requires explicit opt-in. // diff --git a/cli/src/commands/agent/run/mcp_init.rs b/cli/src/commands/agent/run/mcp_init.rs index 28a1ca78..04b3c7a7 100644 --- a/cli/src/commands/agent/run/mcp_init.rs +++ b/cli/src/commands/agent/run/mcp_init.rs @@ -143,7 +143,7 @@ fn build_proxy_config( "stakpak".to_string(), ServerConfig::Http { url: local_server_url, - headers: None, + headers: std::collections::HashMap::new(), certificate_chain: server_cert_chain, }, ); @@ -153,7 +153,7 @@ fn build_proxy_config( "paks".to_string(), ServerConfig::Http { url: "https://apiv2.stakpak.dev/v1/paks/mcp".to_string(), - headers: None, + headers: std::collections::HashMap::new(), certificate_chain: Arc::new(None), }, ); @@ -211,6 +211,7 @@ async fn connect_to_proxy( match stakpak_mcp_client::connect_https( proxy_url, Some(cert_chain.clone()), + std::collections::HashMap::new(), progress_tx.clone(), ) .await diff --git a/cli/src/config.rs b/cli/src/config.rs index 6467f525..2e3d56a2 100644 --- a/cli/src/config.rs +++ b/cli/src/config.rs @@ -45,6 +45,8 @@ pub struct ProfileConfig { pub api_key: Option, /// Provider type (remote or local) pub provider: Option, + /// Path to the local SQLite database + pub store_path: Option, /// Allowed tools (empty = all tools allowed) pub allowed_tools: Option>, /// Tools that auto-approve without asking @@ -91,6 +93,8 @@ pub struct AppConfig { pub profile_name: String, /// Path to the config file (used for saving) pub config_path: String, + /// Path to the local SQLite database + pub store_path: Option, /// Allowed tools (empty = all tools allowed) pub allowed_tools: Option>, /// Tools that auto-approve without asking @@ -182,6 +186,7 @@ impl From for ProfileConfig { rulebooks: config.rulebooks, warden: config.warden, provider: None, + store_path: config.store_path, openai: config.openai, anthropic: config.anthropic, gemini: config.gemini, @@ -376,6 +381,10 @@ impl ProfileConfig { .provider .clone() .or_else(|| other.and_then(|config| config.provider.clone())), + store_path: self + .store_path + .clone() + .or_else(|| other.and_then(|config| config.store_path.clone())), openai: self .openai .clone() @@ -489,6 +498,7 @@ impl AppConfig { auto_append_gitignore: settings.auto_append_gitignore, profile_name: profile_name.to_string(), config_path: path.display().to_string(), + store_path: profile_config.store_path, allowed_tools: profile_config.allowed_tools, auto_approve: profile_config.auto_approve, rulebooks: profile_config.rulebooks, @@ -671,6 +681,7 @@ auto_append_gitignore = true auto_append_gitignore: Some(false), profile_name: profile_name.into(), config_path: "/tmp/stakpak/config.toml".into(), + store_path: None, allowed_tools: Some(vec!["git".into(), "curl".into()]), auto_approve: Some(vec!["git status".into()]), rulebooks: Some(RulebookConfig { @@ -831,6 +842,7 @@ auto_append_gitignore = true volumes: vec!["/tmp:/tmp:ro".into()], }), provider: None, + store_path: None, openai: None, anthropic: None, gemini: None, @@ -850,6 +862,7 @@ auto_append_gitignore = true rulebooks: None, warden: None, provider: None, + store_path: None, openai: None, anthropic: None, gemini: None, @@ -1037,6 +1050,7 @@ auto_append_gitignore = true auto_append_gitignore: Some(false), profile_name: "dev".into(), config_path: path.to_string_lossy().into_owned(), + store_path: None, allowed_tools: Some(vec!["git".into(), "curl".into()]), auto_approve: Some(vec!["git status".into()]), rulebooks: Some(RulebookConfig { diff --git a/cli/tests/acp_integration_test.rs b/cli/tests/acp_integration_test.rs new file mode 100644 index 00000000..d523161f --- /dev/null +++ b/cli/tests/acp_integration_test.rs @@ -0,0 +1,403 @@ +#![allow(clippy::unwrap_used, clippy::expect_used)] + +mod common; + +use agent_client_protocol::{ + self as acp, Agent, Client, ClientSideConnection, ContentBlock, ContentChunk, + InitializeRequest, NewSessionRequest, PromptRequest, ProtocolVersion, RequestPermissionRequest, + RequestPermissionResponse, SessionNotification, SessionUpdate, StopReason, TextContent, +}; +use rmcp::transport::streamable_http_server::{ + StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager, +}; +use rmcp::{ + ErrorData as McpError, ServerHandler, handler::server::router::tool::ToolRouter, model::*, + tool, tool_handler, tool_router, +}; +use std::collections::VecDeque; +use std::path::Path; +use std::process::Stdio; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::process::{Child, Command}; +use tokio::task::JoinHandle; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +/// Fake code returned by the MCP server - an LLM couldn't know this from memory +const FAKE_CODE: &str = "test-uuid-12345-67890"; + +#[tokio::test] +async fn test_acp_basic_completion() { + let prompt = "what is 1+1"; + let mock_server = setup_mock_openai(vec![( + prompt.to_string(), // Just match the prompt text + include_str!("./test_data/openai_basic_response.txt"), + )]) + .await; + + run_acp_session( + &mock_server, + vec![], + |conn, session_id, updates| async move { + let response = conn + .prompt(PromptRequest::new( + session_id, + vec![ContentBlock::Text(TextContent::new(prompt))], + )) + .await + .unwrap(); + + assert_eq!(response.stop_reason, StopReason::EndTurn); + wait_for_text(&updates, "2", Duration::from_secs(5)).await; + }, + ) + .await; +} + +#[tokio::test] +async fn test_acp_with_mcp_http_server() { + let prompt = "Use the get_code tool and output only its result."; + let (mcp_url, _handle) = spawn_mcp_http_server().await; + + let mock_server = setup_mock_openai(vec![ + ( + format!(r#"{prompt}\n\n","role":"user""#), + include_str!("./test_data/openai_tool_call_response.txt"), + ), + ( + format!(r#"{FAKE_CODE}\n\n\n","role":"user""#), + include_str!("./test_data/openai_tool_result_response.txt"), + ), + ]) + .await; + + run_acp_session( + &mock_server, + vec![acp::McpServer::Http(acp::McpServerHttp::new( + "lookup", &mcp_url, + ))], + |conn, session_id, updates| async move { + let response = conn + .prompt(PromptRequest::new( + session_id, + vec![ContentBlock::Text(TextContent::new(prompt))], + )) + .await + .unwrap(); + + assert_eq!(response.stop_reason, StopReason::EndTurn); + wait_for_text(&updates, FAKE_CODE, Duration::from_secs(5)).await; + }, + ) + .await; +} + +async fn wait_for_text( + updates: &Arc>>, + expected: &str, + timeout: Duration, +) { + let deadline = tokio::time::Instant::now() + timeout; + loop { + let actual = extract_text(&updates.lock().unwrap()); + if actual.contains(expected) { + return; + } + if tokio::time::Instant::now() > deadline { + panic!("Timeout waiting for text.\nExpected to contain: {expected}\nActual: {actual}"); + } + tokio::task::yield_now().await; + } +} + +const TITLE_GENERATION_RESPONSE: &str = include_str!("./test_data/openai_session_description.json"); + +async fn setup_mock_openai(exchanges: Vec<(String, &'static str)>) -> MockServer { + let mock_server = MockServer::start().await; + let queue: VecDeque<(String, &'static str)> = exchanges.into_iter().collect(); + let queue = Arc::new(Mutex::new(queue)); + + Mock::given(method("POST")) + .and(path("/v1/chat/completions")) + .respond_with({ + let queue = queue.clone(); + move |req: &wiremock::Request| { + let body = String::from_utf8_lossy(&req.body); + let is_streaming = body.contains(r#""stream":true"#); + + if !is_streaming { + return ResponseTemplate::new(200) + .insert_header("content-type", "application/json") + .set_body_string(TITLE_GENERATION_RESPONSE); + } + + let (expected, response) = { + let mut q = queue.lock().unwrap(); + match q.pop_front() { + Some(item) => item, + None => { + return ResponseTemplate::new(500) + .set_body_string(format!("unexpected request: {body}")); + } + } + }; + + if !body.contains(&expected) { + return ResponseTemplate::new(500).set_body_string(format!( + "expected body to contain: {expected}\nactual: {body}" + )); + } + + ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_string(response) + } + }) + .mount(&mock_server) + .await; + + mock_server +} + +fn extract_text(updates: &[SessionNotification]) -> String { + updates + .iter() + .filter_map(|n| match &n.update { + SessionUpdate::AgentMessageChunk(ContentChunk { + content: ContentBlock::Text(t), + .. + }) => Some(t.text.clone()), + _ => None, + }) + .collect() +} + +fn create_test_config(mock_server: &MockServer, work_dir: &Path) -> std::path::PathBuf { + let config_path = work_dir.join("config.toml"); + let uri = mock_server.uri(); + let db_path = work_dir.join("local.db").to_string_lossy().to_string(); + let config_content = format!( + r#"[profiles.default] +api_key = "test" +provider = "local" +smart_model = "gpt-4" +eco_model = "gpt-4" +store_path = "{db_path}" +[profiles.default.openai] +api_endpoint = "{uri}/v1/chat/completions" +api_key = "test" +[settings] +"# + ); + std::fs::write(&config_path, config_content).unwrap(); + config_path +} + +async fn spawn_stakpak_acp(config_path: &Path) -> Child { + Command::new(&*common::STAKPAK_BINARY) + .args(["--config", config_path.to_str().unwrap(), "acp"]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .env( + "RUST_LOG", + std::env::var("RUST_LOG").unwrap_or_else(|_| "info".into()), + ) + .kill_on_drop(true) + .spawn() + .unwrap() +} + +#[derive(Clone)] +struct TestClient { + updates: Arc>>, +} + +#[async_trait::async_trait(?Send)] +impl Client for TestClient { + async fn request_permission( + &self, + request: RequestPermissionRequest, + ) -> acp::Result { + // Auto-approve the first option + let option_id = request.options.first().map(|opt| opt.option_id.clone()); + match option_id { + Some(id) => Ok(RequestPermissionResponse::new( + acp::RequestPermissionOutcome::Selected(acp::SelectedPermissionOutcome::new(id)), + )), + None => Ok(RequestPermissionResponse::new( + acp::RequestPermissionOutcome::Cancelled, + )), + } + } + + async fn write_text_file( + &self, + _args: acp::WriteTextFileRequest, + ) -> acp::Result { + Ok(acp::WriteTextFileResponse::default()) + } + + async fn read_text_file( + &self, + _args: acp::ReadTextFileRequest, + ) -> acp::Result { + Ok(acp::ReadTextFileResponse::new("")) + } + + async fn session_notification(&self, args: SessionNotification) -> acp::Result<()> { + self.updates.lock().unwrap().push(args); + Ok(()) + } + + async fn create_terminal( + &self, + _args: acp::CreateTerminalRequest, + ) -> acp::Result { + unimplemented!() + } + + async fn terminal_output( + &self, + _args: acp::TerminalOutputRequest, + ) -> acp::Result { + unimplemented!() + } + + async fn kill_terminal_command( + &self, + _args: acp::KillTerminalCommandRequest, + ) -> acp::Result { + unimplemented!() + } + + async fn release_terminal( + &self, + _args: acp::ReleaseTerminalRequest, + ) -> acp::Result { + unimplemented!() + } + + async fn wait_for_terminal_exit( + &self, + _args: acp::WaitForTerminalExitRequest, + ) -> acp::Result { + unimplemented!() + } + + async fn ext_method(&self, _args: acp::ExtRequest) -> acp::Result { + Err(acp::Error::method_not_found()) + } + + async fn ext_notification(&self, _args: acp::ExtNotification) -> acp::Result<()> { + Ok(()) + } +} + +async fn run_acp_session( + mock_server: &MockServer, + mcp_servers: Vec, + test_fn: F, +) where + F: FnOnce(ClientSideConnection, acp::SessionId, Arc>>) -> Fut, + Fut: std::future::Future, +{ + let work_dir = tempfile::tempdir().unwrap(); + let config_path = create_test_config(mock_server, work_dir.path()); + let mut child = spawn_stakpak_acp(&config_path).await; + let updates = Arc::new(Mutex::new(Vec::new())); + + let outgoing = child.stdin.take().unwrap().compat_write(); + let incoming = child.stdout.take().unwrap().compat(); + let stderr = child.stderr.take().unwrap(); + + // Drain stderr to prevent subprocess from blocking + let stderr_handle = tokio::spawn(async move { + use tokio::io::{AsyncBufReadExt, BufReader}; + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + while lines.next_line().await.is_ok_and(|l| l.is_some()) {} + }); + + let local_set = tokio::task::LocalSet::new(); + local_set + .run_until(async move { + let client = TestClient { + updates: updates.clone(), + }; + let (conn, handle_io) = ClientSideConnection::new(client, outgoing, incoming, |fut| { + tokio::task::spawn_local(fut); + }); + tokio::task::spawn_local(async move { + let _ = handle_io.await; + }); + + conn.initialize(InitializeRequest::new(ProtocolVersion::LATEST)) + .await + .unwrap(); + + let session = conn + .new_session(NewSessionRequest::new(work_dir.path()).mcp_servers(mcp_servers)) + .await + .unwrap(); + + test_fn(conn, session.session_id, updates).await; + }) + .await; + + // Kill the child and wait for stderr reader to finish + drop(child); + let _ = stderr_handle.await; +} + +// MCP HTTP server with get_code tool +#[derive(Clone)] +struct Lookup { + tool_router: ToolRouter, +} + +#[tool_router] +impl Lookup { + fn new() -> Self { + Self { + tool_router: Self::tool_router(), + } + } + + #[tool(description = "Get the code")] + fn get_code(&self) -> Result { + Ok(CallToolResult::success(vec![Content::text(FAKE_CODE)])) + } +} + +#[tool_handler] +impl ServerHandler for Lookup { + fn get_info(&self) -> ServerInfo { + ServerInfo { + protocol_version: rmcp::model::ProtocolVersion::V_2025_03_26, + capabilities: ServerCapabilities::builder().enable_tools().build(), + server_info: Implementation::from_build_env(), + instructions: Some("Lookup server with get_code tool.".into()), + } + } +} + +async fn spawn_mcp_http_server() -> (String, JoinHandle<()>) { + let service = StreamableHttpService::new( + || Ok(Lookup::new()), + LocalSessionManager::default().into(), + StreamableHttpServerConfig::default(), + ); + let router = axum::Router::new().nest_service("/mcp", service); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let url = format!("http://{addr}/mcp"); + + let handle = tokio::spawn(async move { + axum::serve(listener, router).await.unwrap(); + }); + + (url, handle) +} diff --git a/cli/tests/common.rs b/cli/tests/common.rs new file mode 100644 index 00000000..08ee0196 --- /dev/null +++ b/cli/tests/common.rs @@ -0,0 +1,42 @@ +#![allow(clippy::unwrap_used, clippy::expect_used)] + +use std::path::PathBuf; +use std::process::Command; +use std::sync::LazyLock; + +/// Build a binary from a package and return its path. +pub fn build_binary(package: &str, bin_name: &str) -> PathBuf { + let output = Command::new("cargo") + .args([ + "build", + "-p", + package, + "--bin", + bin_name, + "--message-format=json", + ]) + .output() + .expect("failed to build binary"); + + if !output.status.success() { + panic!("build failed: {}", String::from_utf8_lossy(&output.stderr)); + } + + String::from_utf8_lossy(&output.stdout) + .lines() + .filter_map(|line| serde_json::from_str::(line).ok()) + .filter(|msg| msg["reason"] == "compiler-artifact") + .filter(|msg| msg["target"]["name"] == bin_name) + .filter(|msg| { + msg["target"]["kind"] + .as_array() + .map(|k| k.iter().any(|v| v == "bin")) + .unwrap_or(false) + }) + .filter_map(|msg| msg["executable"].as_str().map(PathBuf::from)) + .next() + .expect("failed to find binary path in cargo output") +} + +#[allow(dead_code)] +pub static STAKPAK_BINARY: LazyLock = LazyLock::new(|| build_binary("stakpak", "stakpak")); diff --git a/cli/tests/test_data/openai_basic_response.txt b/cli/tests/test_data/openai_basic_response.txt new file mode 100644 index 00000000..4c3d0c69 --- /dev/null +++ b/cli/tests/test_data/openai_basic_response.txt @@ -0,0 +1,9 @@ +data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1766229303,"model":"gpt-5-nano","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]} + +data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1766229303,"model":"gpt-5-nano","choices":[{"index":0,"delta":{"content":"2"},"finish_reason":null}]} + +data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1766229303,"model":"gpt-5-nano","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} + +data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1766229303,"model":"gpt-5-nano","choices":[],"usage":{"prompt_tokens":100,"completion_tokens":10,"total_tokens":110}} + +data: [DONE] diff --git a/cli/tests/test_data/openai_session_description.json b/cli/tests/test_data/openai_session_description.json new file mode 100644 index 00000000..aab91b54 --- /dev/null +++ b/cli/tests/test_data/openai_session_description.json @@ -0,0 +1 @@ +{"id":"chatcmpl-test","object":"chat.completion","created":1766229622,"model":"gpt-5-nano","choices":[{"index":0,"message":{"role":"assistant","content":"Test session"},"finish_reason":"stop"}],"usage":{"prompt_tokens":79,"completion_tokens":10,"total_tokens":89}} diff --git a/cli/tests/test_data/openai_tool_call_response.txt b/cli/tests/test_data/openai_tool_call_response.txt new file mode 100644 index 00000000..527b7ac5 --- /dev/null +++ b/cli/tests/test_data/openai_tool_call_response.txt @@ -0,0 +1,9 @@ +data: {"id":"chatcmpl-test1","object":"chat.completion.chunk","created":1766309803,"model":"gpt-5-nano","choices":[{"index":0,"delta":{"role":"assistant","content":null,"tool_calls":[{"index":0,"id":"call_abc123","type":"function","function":{"name":"get_code","arguments":""}}],"refusal":null},"finish_reason":null}]} + +data: {"id":"chatcmpl-test1","object":"chat.completion.chunk","created":1766309803,"model":"gpt-5-nano","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{}"}}]},"finish_reason":null}]} + +data: {"id":"chatcmpl-test1","object":"chat.completion.chunk","created":1766309803,"model":"gpt-5-nano","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]} + +data: {"id":"chatcmpl-test1","object":"chat.completion.chunk","created":1766309803,"model":"gpt-5-nano","choices":[],"usage":{"prompt_tokens":100,"completion_tokens":50,"total_tokens":150}} + +data: [DONE] diff --git a/cli/tests/test_data/openai_tool_result_response.txt b/cli/tests/test_data/openai_tool_result_response.txt new file mode 100644 index 00000000..2a34ecc8 --- /dev/null +++ b/cli/tests/test_data/openai_tool_result_response.txt @@ -0,0 +1,9 @@ +data: {"id":"chatcmpl-test2","object":"chat.completion.chunk","created":1766309809,"model":"gpt-5-nano","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]} + +data: {"id":"chatcmpl-test2","object":"chat.completion.chunk","created":1766309809,"model":"gpt-5-nano","choices":[{"index":0,"delta":{"content":"test-uuid-12345-67890"},"finish_reason":null}]} + +data: {"id":"chatcmpl-test2","object":"chat.completion.chunk","created":1766309809,"model":"gpt-5-nano","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} + +data: {"id":"chatcmpl-test2","object":"chat.completion.chunk","created":1766309809,"model":"gpt-5-nano","choices":[],"usage":{"prompt_tokens":200,"completion_tokens":10,"total_tokens":210}} + +data: [DONE] diff --git a/libs/mcp/client/src/lib.rs b/libs/mcp/client/src/lib.rs index d1d95fd9..794c374f 100644 --- a/libs/mcp/client/src/lib.rs +++ b/libs/mcp/client/src/lib.rs @@ -23,10 +23,11 @@ pub async fn connect(progress_tx: Option>) -> Res local::connect(progress_tx).await } -/// Connect to an MCP server via HTTPS with optional mTLS +/// Connect to an MCP server via HTTPS with optional mTLS and custom headers pub async fn connect_https( url: &str, certificate_chain: Option>, + headers: std::collections::HashMap, progress_tx: Option>, ) -> Result { let mut client_builder = reqwest::Client::builder() @@ -40,6 +41,20 @@ pub async fn connect_https( client_builder = client_builder.use_preconfigured_tls(tls_config); } + // Configure custom headers + if !headers.is_empty() { + let mut header_map = reqwest::header::HeaderMap::new(); + for (k, v) in headers { + if let (Ok(name), Ok(value)) = ( + reqwest::header::HeaderName::from_bytes(k.as_bytes()), + reqwest::header::HeaderValue::from_str(&v), + ) { + header_map.insert(name, value); + } + } + client_builder = client_builder.default_headers(header_map); + } + let http_client = client_builder.build()?; let config = StreamableHttpClientTransportConfig::with_uri(url); @@ -52,6 +67,33 @@ pub async fn connect_https( Ok(client) } +/// Connect to an MCP server via stdio (arbitrary command) +pub async fn connect_stdio( + command: &str, + args: Vec, + env: Option>, + progress_tx: Option>, +) -> Result { + use rmcp::transport::TokioChildProcess; + + let mut cmd = tokio::process::Command::new(command); + for arg in args { + cmd.arg(arg); + } + if let Some(env_vars) = env { + cmd.envs(&env_vars); + } + cmd.stdin(std::process::Stdio::piped()); + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::null()); + + let proc = TokioChildProcess::new(cmd)?; + let client_handler = LocalClientHandler::new(progress_tx); + let client: McpClient = client_handler.serve(proc).await?; + + Ok(client) +} + /// Get all available tools from the MCP client pub async fn get_tools(client: &McpClient) -> Result> { let tools = client.list_tools(Default::default()).await?; diff --git a/libs/mcp/proxy/src/client/mod.rs b/libs/mcp/proxy/src/client/mod.rs index f49d541d..9c4f45fd 100644 --- a/libs/mcp/proxy/src/client/mod.rs +++ b/libs/mcp/proxy/src/client/mod.rs @@ -102,7 +102,7 @@ impl Default for ClientPool { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub enum ServerConfig { Stdio { command: String, @@ -111,7 +111,7 @@ pub enum ServerConfig { }, Http { url: String, - headers: Option>, + headers: HashMap, /// Optional certificate chain for mTLS (used for local server connections) certificate_chain: Arc>, }, @@ -183,7 +183,7 @@ impl ClientPoolConfig { } ServerConfigJson::UrlBased { url, headers } => ServerConfig::Http { url, - headers, + headers: headers.unwrap_or_default(), certificate_chain: Arc::new(None), }, }; diff --git a/libs/mcp/proxy/src/server/mod.rs b/libs/mcp/proxy/src/server/mod.rs index 7f8d6406..50834151 100644 --- a/libs/mcp/proxy/src/server/mod.rs +++ b/libs/mcp/proxy/src/server/mod.rs @@ -128,9 +128,9 @@ impl ProxyServer { } } - if let Some(headers_map) = headers { + if !headers.is_empty() { let mut header_map = reqwest::header::HeaderMap::new(); - for (key, value) in headers_map { + for (key, value) in headers { if let (Ok(header_name), Ok(header_value)) = ( reqwest::header::HeaderName::from_bytes(key.as_bytes()), reqwest::header::HeaderValue::from_str(&value), diff --git a/libs/shared/src/cert_utils.rs b/libs/shared/src/cert_utils.rs index 71741894..2e7d3dcd 100644 --- a/libs/shared/src/cert_utils.rs +++ b/libs/shared/src/cert_utils.rs @@ -23,6 +23,12 @@ impl std::fmt::Debug for CertificateChain { } } +impl PartialEq for CertificateChain { + fn eq(&self, _other: &Self) -> bool { + false + } +} + impl CertificateChain { pub fn generate() -> Result { // Generate CA certificate From d5c1017228ab3bf489461a56fbf680a0cc1988a4 Mon Sep 17 00:00:00 2001 From: George Date: Wed, 31 Dec 2025 18:53:00 -0800 Subject: [PATCH 2/4] feat: integrate paks MCP tools into system prompt - Add Knowledge Sources section documenting rulebooks and paks - Document paks__search_paks and paks__get_pak_content tools - Add knowledge lookup strategy prioritizing rulebooks - Include trust model explaining vetting differences - Add example workflow for combining knowledge sources --- .../system_prompt.txt | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/libs/api/src/local/hooks/inline_scratchpad_context/system_prompt.txt b/libs/api/src/local/hooks/inline_scratchpad_context/system_prompt.txt index aa2c1162..97799b7e 100644 --- a/libs/api/src/local/hooks/inline_scratchpad_context/system_prompt.txt +++ b/libs/api/src/local/hooks/inline_scratchpad_context/system_prompt.txt @@ -54,7 +54,50 @@ If the target topic cannot be found: - At the beginning of every session, you'll be provided with a list of Rule Books with more guidelines, procedures, and instructions specific to the user's environment. It is highly recommended to read only the Rule Books relevant to the task at hand and study them to perform your task better - Never treat software version numbers as decimal numbers (v1.15 ≠ 1.15 as decimal), use instead semantic versioning rules: MAJOR.MINOR.PATCH, for example: 1.15.2 > 1.8.0 because minor version 15 > 8 - Build container images for the deployment target architecture (most likely amd64, unless the deployment target is arm-based). This is especially important when running on apple silicon. -- Always use Python to do any math, calculations, or analysis that involves number. Python will produce more accurate and precise results. +- Always use Python to do any math, calculations, or analysis that involves number. Python will produce more accurate and precise results. + +# Knowledge Sources: Rulebooks and Paks + +You have access to two complementary knowledge systems: + +## 1. Rulebooks (User-Specific) +Rulebooks are provided at session start and contain guidelines, procedures, and instructions specific to the user's environment. Always check available rulebooks first for task-relevant guidance. + +## 2. Paks (Community Knowledge) +Paks are community-contributed skill packages from the Stakpak registry. Use paks when: +- Available rulebooks don't cover the topic adequately +- You need additional best practices or procedures +- The task involves common DevOps patterns that may have community solutions + +**Trust Model:** +- Rulebooks are vetted by the Stakpak team and the user's organization - safe to use directly +- Paks are community-driven and unvetted - the paks__get_pak_content tool call requires user approval before execution + +### Paks MCP Tools +- **paks__search_paks**: Search the registry by keywords (e.g., "kubernetes terraform aws") +- **paks__get_pak_content**: Fetch pak content using URI format: `owner/pak_name[@version][/path]` + +### Knowledge Lookup Strategy +1. **Check rulebooks first** - User-specific guidance takes priority +2. **Search paks if needed** - When rulebooks are insufficient or missing for the topic +3. **Combine knowledge** - Use both sources together for comprehensive solutions + +### Example Workflow +``` +User: "Help me design an AWS architecture" + +1. Check rulebooks → Found: aws-architecture-design rulebook +2. Read rulebook for user-specific requirements +3. If rulebook lacks detail → Search paks: "aws architecture design" +4. Fetch relevant pak: paks__get_pak_content("stakpak/aws-architecture-design") +5. Combine both sources for complete guidance +``` + +### When to Search Paks +- Task involves unfamiliar technology not covered by rulebooks +- Need community best practices for common patterns +- Rulebook exists but lacks implementation details +- User asks about topics with no matching rulebook # Identity When asked about what you can support or do always search documentation first From 06747b8d31b9b81011e69688f69d283d010207b9 Mon Sep 17 00:00:00 2001 From: George Date: Wed, 31 Dec 2025 18:54:39 -0800 Subject: [PATCH 3/4] docs: add meta pak reference for publishing paks --- .../local/hooks/inline_scratchpad_context/system_prompt.txt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libs/api/src/local/hooks/inline_scratchpad_context/system_prompt.txt b/libs/api/src/local/hooks/inline_scratchpad_context/system_prompt.txt index 97799b7e..2c9b2254 100644 --- a/libs/api/src/local/hooks/inline_scratchpad_context/system_prompt.txt +++ b/libs/api/src/local/hooks/inline_scratchpad_context/system_prompt.txt @@ -97,7 +97,10 @@ User: "Help me design an AWS architecture" - Task involves unfamiliar technology not covered by rulebooks - Need community best practices for common patterns - Rulebook exists but lacks implementation details -- User asks about topics with no matching rulebook +- User asks about topics with no matching rulebook + +### Publishing Paks +If a user wants to create and publish their own pak, fetch the meta pak `stakpak/how-to-write-paks` which contains step-by-step guidance for authoring and publishing paks to the registry. # Identity When asked about what you can support or do always search documentation first From 7fe8d1263c0c3c5ea4c0831c439ff0af3ecdeaf5 Mon Sep 17 00:00:00 2001 From: Mostafa Ashraf Date: Sun, 4 Jan 2026 12:05:48 +0200 Subject: [PATCH 4/4] feat: Implement graceful shutdown for the MCP server and proxy. --- cli/src/commands/agent/run/mode_async.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/cli/src/commands/agent/run/mode_async.rs b/cli/src/commands/agent/run/mode_async.rs index e93c0624..df88a870 100644 --- a/cli/src/commands/agent/run/mode_async.rs +++ b/cli/src/commands/agent/run/mode_async.rs @@ -65,8 +65,8 @@ pub async fn run_async(ctx: AppConfig, config: RunAsyncConfig) -> Result<(), Str let mcp_init_result = initialize_mcp_server_and_tools(&ctx, mcp_init_config, None).await?; let mcp_client = mcp_init_result.client; let mcp_tools = mcp_init_result.mcp_tools; - let _server_shutdown_tx = mcp_init_result.server_shutdown_tx; - let _proxy_shutdown_tx = mcp_init_result.proxy_shutdown_tx; + let server_shutdown_tx = mcp_init_result.server_shutdown_tx; + let proxy_shutdown_tx = mcp_init_result.proxy_shutdown_tx; // Filter tools if allowed_tools is specified let tools = if let Some(allowed) = &config.allowed_tools { @@ -405,5 +405,16 @@ pub async fn run_async(ctx: AppConfig, config: RunAsyncConfig) -> Result<(), Str ); } + // Gracefully shutdown MCP server and proxy + print!( + "{}", + renderer.render_info("Shutting down MCP server and proxy...") + ); + let _ = server_shutdown_tx.send(()); + let _ = proxy_shutdown_tx.send(()); + // Give the servers a moment to cleanup + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + print!("{}", renderer.render_success("Shutdown complete")); + Ok(()) }