diff --git a/.gitignore b/.gitignore index 256f54b7..ffceb842 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ debug.log # Stakpak local files .stakpak .DS_Store +debug_acp.sh +PR_Description.md \ No newline at end of file diff --git a/cli/src/commands/acp/fs_handler.rs b/cli/src/commands/acp/fs_handler.rs index 39b9de19..42e19418 100644 --- a/cli/src/commands/acp/fs_handler.rs +++ b/cli/src/commands/acp/fs_handler.rs @@ -87,7 +87,8 @@ pub async fn execute_acp_fs_tool( .map_err(|e| format!("Failed to parse tool arguments: {}", e))?; use super::tool_names; - match tool_call.function.name.as_str() { + let stripped_name = super::utils::strip_tool_name(&tool_call.function.name); + match stripped_name { tool_names::VIEW => { let path = args .get("path") diff --git a/cli/src/commands/acp/server.rs b/cli/src/commands/acp/server.rs index 2d42e101..bd920c8e 100644 --- a/cli/src/commands/acp/server.rs +++ b/cli/src/commands/acp/server.rs @@ -1,5 +1,6 @@ use crate::commands::agent::run::helpers::{system_message, user_message}; -use crate::{commands::agent::run::helpers::convert_tools_with_filter, config::AppConfig}; +use crate::commands::agent::run::stream::ToolCallAccumulator; +use crate::config::AppConfig; use agent_client_protocol::{self as acp, Client as AcpClient, SessionNotification}; use futures_util::StreamExt; use stakpak_api::models::ApiStreamError; @@ -8,8 +9,8 @@ use stakpak_mcp_client::McpClient; use stakpak_shared::models::integrations::mcp::CallToolResultExt; use stakpak_shared::models::integrations::openai::{ AgentModel, ChatCompletionChoice, ChatCompletionResponse, ChatCompletionStreamResponse, - ChatMessage, FinishReason, FunctionCall, FunctionCallDelta, MessageContent, Role, Tool, - ToolCall, ToolCallResultProgress, ToolCallResultStatus, + ChatMessage, FinishReason, MessageContent, Role, Tool, ToolCall, ToolCallResultProgress, + ToolCallResultStatus, }; use stakpak_shared::models::llm::LLMTokenUsage; use std::cell::Cell; @@ -20,8 +21,8 @@ use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt use uuid::Uuid; pub struct StakpakAcpAgent { - config: AppConfig, - client: Arc, + config: Arc>, + client: Arc>>, session_update_tx: mpsc::UnboundedSender<(acp::SessionNotification, oneshot::Sender<()>)>, next_session_id: Cell, mcp_client: Option>, @@ -89,9 +90,14 @@ impl StakpakAcpAgent { // Initialize MCP client and tools (optional for ACP) let (mcp_client, mcp_tools, tools) = match Self::initialize_mcp_server_and_tools(&config).await { - Ok((client, mcp_tools, tool_list)) => { + Ok(result) => { log::info!("MCP client initialized successfully"); - (Some(client), mcp_tools, tool_list) + // Hold shutdown handles to keep servers alive + // They'll be dropped when this agent is dropped, which is fine + // since new() is only called once at startup and run_stdio reinitializes + let _server_shutdown = result.server_shutdown_tx; + let _proxy_shutdown = result.proxy_shutdown_tx; + (Some(result.client), result.mcp_tools, result.tools) } Err(e) => { log::warn!( @@ -112,8 +118,8 @@ impl StakpakAcpAgent { }; Ok(Self { - config, - client, + config: Arc::new(tokio::sync::RwLock::new(config)), + client: Arc::new(tokio::sync::RwLock::new(client)), session_update_tx, next_session_id: Cell::new(0), mcp_client, @@ -681,18 +687,7 @@ impl StakpakAcpAgent { tool_calls: Vec, session_id: &acp::SessionId, ) -> Result, acp::Error> { - log::info!( - "🔧 DEBUG: Starting tool call processing with {} tool calls", - tool_calls.len() - ); - for (i, tool_call) in tool_calls.iter().enumerate() { - log::info!( - "🔧 DEBUG: Tool call {}: {} (id: {})", - i, - tool_call.function.name, - tool_call.id - ); - } + log::info!("Processing {} tool calls", tool_calls.len()); let mut tool_calls_queue = tool_calls; let mut results = Vec::new(); @@ -734,8 +729,10 @@ impl StakpakAcpAgent { } let raw_input = serde_json::from_str(&tool_call.function.arguments) .unwrap_or(serde_json::Value::Null); - let tool_title = self.generate_tool_title(&tool_call.function.name, &raw_input); - let tool_kind = self.get_tool_kind(&tool_call.function.name); + let stripped_name = + crate::commands::acp::utils::strip_tool_name(&tool_call.function.name); + let tool_title = self.generate_tool_title(stripped_name, &raw_input); + let tool_kind = self.get_tool_kind(stripped_name); // Prepare content and locations for diff tools let file_path = raw_input @@ -761,8 +758,8 @@ impl StakpakAcpAgent { .map(|p| p.to_string()) .unwrap_or_else(|| file_path.to_string_lossy().to_string()); - let (content, locations) = if self.should_use_diff_content(&tool_call.function.name) { - if self.is_file_creation_tool(&tool_call.function.name) { + let (content, locations) = if self.should_use_diff_content(stripped_name) { + if self.is_file_creation_tool(stripped_name) { // For file creation: old_text = None, new_text = result_content let diff_content = vec![acp::ToolCallContent::Diff(acp::Diff::new( file_path.clone(), @@ -799,7 +796,7 @@ impl StakpakAcpAgent { .await?; // Check permissions - let permission_granted = if self.is_auto_approved_tool(&tool_call.function.name) { + let permission_granted = if self.is_auto_approved_tool(stripped_name) { true } else { self.send_permission_request( @@ -848,15 +845,15 @@ impl StakpakAcpAgent { // Check if this is a filesystem tool that should use native ACP // Decide if this should be handled by native ACP FS. Avoid read_text_file for directories. - let is_view_directory = if tool_call.function.name == super::tool_names::VIEW { + let is_view_directory = if stripped_name == super::tool_names::VIEW { Path::new(&abs_path).is_dir() } else { false }; - let tool_name = tool_call.function.name.as_str(); - let is_read_tool = super::tool_names::is_fs_file_read(tool_name) && !is_view_directory; - let is_write_tool = super::tool_names::is_fs_file_write(tool_name); + let is_read_tool = + super::tool_names::is_fs_file_read(stripped_name) && !is_view_directory; + let is_write_tool = super::tool_names::is_fs_file_write(stripped_name); // Delegate fs operations to the client so it can access unsaved editor // state and track modifications. Per ACP spec, both read and write @@ -884,7 +881,7 @@ impl StakpakAcpAgent { })? } else if let Some(ref mcp_client) = self.mcp_client { log::info!( - "🔧 DEBUG: Executing tool call: {} with MCP client", + "Executing tool call: {} with MCP client", tool_call.function.name ); @@ -912,11 +909,6 @@ impl StakpakAcpAgent { return Err(acp::Error::internal_error().data(error_msg)); }; - log::info!( - "🔧 DEBUG: Tool call execution completed for: {}", - tool_call.function.name - ); - if let Some(tool_result) = result { // Check if the tool call was cancelled if CallToolResultExt::get_status(&tool_result) == ToolCallResultStatus::Cancelled { @@ -965,7 +957,7 @@ impl StakpakAcpAgent { .join("\n"); // Send completion notification - let completion_content = if self.should_use_diff_content(&tool_call.function.name) { + let completion_content = if self.should_use_diff_content(stripped_name) { // For diff tools, we already sent the diff in the initial notification // Just send a simple completion without additional content None @@ -1052,22 +1044,14 @@ impl StakpakAcpAgent { pub async fn initialize_mcp_server_and_tools( config: &AppConfig, - ) -> Result<(Arc, Vec, Vec), String> { - // Initialize MCP client via stdio proxy - let mcp_client = Arc::new( - stakpak_mcp_client::connect(None) // progress_tx will be set later in run_stdio - .await - .map_err(|e| format!("Failed to connect to MCP proxy: {}", e))?, - ); - - // Get tools from MCP client - let mcp_tools = stakpak_mcp_client::get_tools(&mcp_client) - .await - .map_err(|e| format!("Failed to get tools: {}", e))?; + ) -> Result { + use crate::commands::agent::run::mcp_init::{ + McpInitConfig, initialize_mcp_server_and_tools, + }; - let tools = convert_tools_with_filter(&mcp_tools, config.allowed_tools.as_ref()); + let mcp_config = McpInitConfig::default(); - Ok((mcp_client, mcp_tools, tools)) + initialize_mcp_server_and_tools(config, mcp_config, None).await } async fn process_acp_streaming_response_with_cancellation( @@ -1103,6 +1087,8 @@ impl StakpakAcpAgent { ..Default::default() }; + let mut tool_call_accumulator = ToolCallAccumulator::new(); + // Compile regex once outside the loop let checkpoint_regex = regex::Regex::new(r".*?").ok(); @@ -1216,61 +1202,7 @@ impl StakpakAcpAgent { // Handle tool calls streaming if let Some(tool_calls) = &delta.tool_calls { for delta_tool_call in tool_calls { - if chat_message.tool_calls.is_none() { - chat_message.tool_calls = Some(vec![]); - } - - let tool_calls_vec = chat_message.tool_calls.as_mut(); - if let Some(tool_calls_vec) = tool_calls_vec { - match tool_calls_vec.get_mut(delta_tool_call.index) { - Some(tool_call) => { - let delta_func = delta_tool_call - .function - .as_ref() - .unwrap_or(&FunctionCallDelta { - name: None, - arguments: None, - }); - tool_call.function.arguments = - tool_call.function.arguments.clone() - + delta_func.arguments.as_deref().unwrap_or(""); - } - None => { - // push empty tool calls until the index is reached - tool_calls_vec.extend( - (tool_calls_vec.len()..delta_tool_call.index).map( - |_| ToolCall { - id: "".to_string(), - r#type: "function".to_string(), - function: FunctionCall { - name: "".to_string(), - arguments: "".to_string(), - }, - }, - ), - ); - - tool_calls_vec.push(ToolCall { - id: delta_tool_call.id.clone().unwrap_or_default(), - r#type: "function".to_string(), - function: FunctionCall { - name: delta_tool_call - .function - .as_ref() - .unwrap_or(&FunctionCallDelta { - name: None, - arguments: None, - }) - .name - .as_deref() - .unwrap_or("") - .to_string(), - arguments: "".to_string(), - }, - }); - } - } - } + tool_call_accumulator.process_delta(delta_tool_call); } } } @@ -1299,17 +1231,13 @@ impl StakpakAcpAgent { rx.await.map_err(|_| "Failed to await flushed content")?; } - // filter out empty tool calls - chat_message.tool_calls = Some( - chat_message - .tool_calls - .as_ref() - .unwrap_or(&vec![]) - .iter() - .filter(|tool_call| !tool_call.id.is_empty()) - .cloned() - .collect::>(), - ); + // Get accumulated tool calls (already filtered for empty IDs) + let final_tool_calls = tool_call_accumulator.into_tool_calls(); + chat_message.tool_calls = if final_tool_calls.is_empty() { + None + } else { + Some(final_tool_calls) + }; chat_completion_response.choices.push(ChatCompletionChoice { index: 0, @@ -1348,25 +1276,41 @@ impl StakpakAcpAgent { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); // Set up progress channel for streaming tool results - let (progress_tx, mut progress_rx) = tokio::sync::mpsc::channel::(100); - - // Reinitialize MCP client with progress channel - let (mcp_client, mcp_tools, tools) = match Self::initialize_mcp_server_and_tools(&self.config).await { - Ok((client, mcp_tools, tool_list)) => { - log::info!("MCP client reinitialized with progress channel"); - (Some(client), mcp_tools, tool_list) - } - Err(e) => { - log::warn!("Failed to reinitialize MCP client with progress channel: {}, continuing without tools", e); - (None, Vec::new(), Vec::new()) - } - }; + let (progress_tx, mut progress_rx) = + tokio::sync::mpsc::channel::(100); + + // Reinitialize MCP client with progress channel (in-process server + proxy) + let config_snapshot = self.config.read().await.clone(); + let (mcp_client, mcp_tools, tools, _mcp_server_shutdown, _mcp_proxy_shutdown) = + match Self::initialize_mcp_server_and_tools(&config_snapshot).await { + Ok(result) => { + log::info!("MCP client reinitialized in run_stdio"); + ( + Some(result.client), + result.mcp_tools, + result.tools, + Some(result.server_shutdown_tx), + Some(result.proxy_shutdown_tx), + ) + } + Err(e) => { + log::warn!( + "Failed to reinitialize MCP client: {}, continuing without tools", + e + ); + (None, Vec::new(), Vec::new(), None, None) + } + }; // Create permission request channel - let (permission_tx, mut permission_rx) = mpsc::unbounded_channel::<(acp::RequestPermissionRequest, oneshot::Sender)>(); + let (permission_tx, mut permission_rx) = mpsc::unbounded_channel::<( + acp::RequestPermissionRequest, + oneshot::Sender, + )>(); // Create filesystem operation channel for native ACP filesystem operations - let (fs_operation_tx, fs_operation_rx) = mpsc::unbounded_channel::(); + let (fs_operation_tx, fs_operation_rx) = + mpsc::unbounded_channel::(); // Create a new agent with the proper channel let agent = StakpakAcpAgent { @@ -1400,15 +1344,21 @@ impl StakpakAcpAgent { let conn_arc = Arc::new(conn); // Spawn filesystem handler for native ACP filesystem operations - crate::commands::acp::fs_handler::spawn_fs_handler(conn_arc.clone(), fs_operation_rx); + crate::commands::acp::fs_handler::spawn_fs_handler( + conn_arc.clone(), + fs_operation_rx, + ); // Start a background task to send session notifications to the client let conn_for_notifications = conn_arc.clone(); tokio::task::spawn_local(async move { while let Some((session_notification, ack_tx)) = rx.recv().await { log::info!("Sending session notification: {:?}", session_notification); - let result = - AcpClient::session_notification(&*conn_for_notifications, session_notification).await; + let result = AcpClient::session_notification( + &*conn_for_notifications, + session_notification, + ) + .await; if let Err(e) = result { log::error!("Failed to send session notification: {}", e); break; @@ -1423,7 +1373,10 @@ impl StakpakAcpAgent { tokio::task::spawn_local(async move { while let Some((permission_request, response_tx)) = permission_rx.recv().await { log::info!("Sending permission request: {:?}", permission_request); - match conn_for_permissions.request_permission(permission_request).await { + match conn_for_permissions + .request_permission(permission_request) + .await + { Ok(response) => { log::info!("Permission request response: {:?}", response); let _ = response_tx.send(response); @@ -1431,11 +1384,9 @@ impl StakpakAcpAgent { Err(e) => { log::error!("Permission request failed: {}", e); // Send a default rejection response - let _ = response_tx.send( - acp::RequestPermissionResponse::new( - acp::RequestPermissionOutcome::Cancelled, - ), - ); + let _ = response_tx.send(acp::RequestPermissionResponse::new( + acp::RequestPermissionOutcome::Cancelled, + )); } } } @@ -1528,7 +1479,7 @@ impl acp::Agent for StakpakAcpAgent { // If no API key, provide an auth method for browser-based authentication // This implements ACP Agent Auth - the agent handles the OAuth-like flow internally - let auth_methods = if self.config.api_key.is_none() { + let auth_methods = if self.config.read().await.api_key.is_none() { vec![acp::AuthMethod::new( acp::AuthMethodId::new("stakpak"), "Login to Stakpak", @@ -1583,15 +1534,44 @@ impl acp::Agent for StakpakAcpAgent { return Err(acp::Error::auth_required().data("Invalid API key format".to_string())); } - // Save the API key to config - let mut config = self.config.clone(); - config.api_key = Some(api_key.clone()); - config.save().map_err(|e| { - log::error!("Failed to save API key to config: {}", e); - acp::Error::internal_error().data(format!("Failed to save config: {}", e)) - })?; + // Save the API key to config (both disk and in-memory) + { + let mut config = self.config.write().await; + config.api_key = Some(api_key.clone()); + config.save().map_err(|e| { + log::error!("Failed to save API key to config: {}", e); + acp::Error::internal_error().data(format!("Failed to save config: {}", e)) + })?; + } - log::info!("Authentication successful, API key saved to config"); + // Rebuild the AgentClient with the new API key so subsequent + // requests use the authenticated Stakpak provider + { + let config = self.config.read().await; + let stakpak = Some(StakpakConfig { + api_key: api_key.clone(), + api_endpoint: config.api_endpoint.clone(), + }); + let new_client = AgentClient::new(AgentClientConfig { + stakpak, + providers: config.get_llm_provider_config(), + eco_model: config.eco_model.clone(), + recovery_model: config.recovery_model.clone(), + smart_model: config.smart_model.clone(), + store_path: None, + hook_registry: None, + }) + .await + .map_err(|e| { + log::error!("Failed to rebuild agent client after auth: {}", e); + acp::Error::internal_error().data(format!("Failed to rebuild client: {}", e)) + })?; + + let mut client = self.client.write().await; + *client = Arc::new(new_client); + } + + log::info!("Authentication successful, API key saved and client rebuilt"); return Ok(acp::AuthenticateResponse::new()); } @@ -1626,23 +1606,23 @@ impl acp::Agent for StakpakAcpAgent { log::info!("Received new session request {args:?}"); // Check if we have a valid API key - // First check in-memory config, then reload from disk (in case authenticate() saved a new key), - // finally check environment variable - let has_api_key = - self.config.api_key.is_some() || std::env::var("STAKPAK_API_KEY").is_ok() || { - // Try to reload config from disk to pick up newly saved API key from authenticate() - match crate::config::AppConfig::load(&self.config.profile_name, None::<&str>) { - Ok(fresh_config) => { - if fresh_config.api_key.is_some() { - log::info!("Found API key in refreshed config from disk"); - true - } else { - false - } + // In-memory config is now always up-to-date since authenticate() updates it directly + let config = self.config.read().await; + let has_api_key = config.api_key.is_some() || std::env::var("STAKPAK_API_KEY").is_ok() || { + // Try to reload config from disk as a fallback (e.g., key set externally) + match crate::config::AppConfig::load(&config.profile_name, None::<&str>) { + Ok(fresh_config) => { + if fresh_config.api_key.is_some() { + log::info!("Found API key in refreshed config from disk"); + true + } else { + false } - Err(_) => false, } - }; + Err(_) => false, + } + }; + drop(config); if !has_api_key { log::error!("API key is missing - authentication required"); @@ -1738,8 +1718,8 @@ impl acp::Agent for StakpakAcpAgent { // Only pass tools if we have any let tools_option = if tools.is_empty() { None } else { Some(tools) }; - let (stream, _request_id) = self - .client + let client = self.client.read().await.clone(); + let (stream, _request_id) = client .chat_completion_stream( AgentModel::Smart, messages, @@ -1769,8 +1749,10 @@ impl acp::Agent for StakpakAcpAgent { ); } }; - log::info!("Chat completion successful, response: {:?}", response); - log::info!("Response choices count: {}", response.choices.len()); + log::info!( + "Chat completion successful, response choices: {}", + response.choices.len() + ); if !response.choices.is_empty() { log::info!("First choice message: {:?}", response.choices[0].message); log::info!( @@ -1833,29 +1815,12 @@ impl acp::Agent for StakpakAcpAgent { .map(|tc| !tc.is_empty()) .unwrap_or(false); - log::info!( - "🔧 DEBUG: Initial response has tool calls: {}", - has_tool_calls - ); - if has_tool_calls && let Some(tool_calls) = response.choices[0].message.tool_calls.as_ref() - { - log::info!("🔧 DEBUG: Initial tool calls count: {}", tool_calls.len()); - for (i, tool_call) in tool_calls.iter().enumerate() { - log::info!( - "🔧 DEBUG: Initial tool call {}: {} (id: {})", - i, - tool_call.function.name, - tool_call.id - ); - } - } + log::info!("Initial response has tool calls: {}", has_tool_calls); // Create cancellation receiver for tool call processing let mut tool_cancel_rx = self.tool_cancel_tx.as_ref().map(|tx| tx.subscribe()); while has_tool_calls { - log::info!("🔧 DEBUG: Starting tool call processing loop iteration"); - if let Some(ref mut cancel_rx) = tool_cancel_rx && cancel_rx.try_recv().is_ok() { @@ -1939,8 +1904,8 @@ impl acp::Agent for StakpakAcpAgent { messages.clone() }; - let (follow_up_stream, _request_id) = self - .client + let client = self.client.read().await.clone(); + let (follow_up_stream, _request_id) = client .chat_completion_stream( AgentModel::Smart, current_messages.clone(), diff --git a/cli/src/commands/acp/utils.rs b/cli/src/commands/acp/utils.rs index 3b3be077..00baca3a 100644 --- a/cli/src/commands/acp/utils.rs +++ b/cli/src/commands/acp/utils.rs @@ -1,5 +1,27 @@ use regex::Regex; +/// Strip the MCP server prefix and any trailing "()" from a tool name. +/// Example: "stakpak__run_command" -> "run_command" +/// Example: "run_command" -> "run_command" +/// Example: "str_replace()" -> "str_replace" +pub fn strip_tool_name(name: &str) -> &str { + let mut result = name; + + // Strip the MCP server prefix (e.g., "stakpak__") + if let Some(pos) = result.find("__") + && pos + 2 < result.len() + { + result = &result[pos + 2..]; + } + + // Strip trailing "()" if present + if result.ends_with("()") { + result = &result[..result.len() - 2]; + } + + result +} + /// Convert XML tags to markdown headers using pattern matching /// Handles the 4 specific tags: scratchpad, todo, local_context, rulebooks pub fn convert_xml_tags_to_markdown(text: &str) -> String { @@ -61,6 +83,18 @@ pub fn process_all_xml_patterns(text: &str) -> String { mod tests { use super::*; + #[test] + fn test_strip_tool_name() { + assert_eq!(strip_tool_name("stakpak__run_command"), "run_command"); + assert_eq!(strip_tool_name("run_command"), "run_command"); + assert_eq!(strip_tool_name("other__server__tool"), "server__tool"); + assert_eq!(strip_tool_name("prefix__"), "prefix__"); + assert_eq!(strip_tool_name("__tool"), "tool"); + assert_eq!(strip_tool_name("str_replace()"), "str_replace"); + assert_eq!(strip_tool_name("create()"), "create"); + assert_eq!(strip_tool_name("stakpak__str_replace()"), "str_replace"); + } + #[test] fn test_convert_xml_tags_to_markdown() { let input = "\n\n- Task 1\n- Task 2\n\n"; diff --git a/cli/src/commands/agent/run/stream.rs b/cli/src/commands/agent/run/stream.rs index 5966d538..c6768698 100644 --- a/cli/src/commands/agent/run/stream.rs +++ b/cli/src/commands/agent/run/stream.rs @@ -19,19 +19,19 @@ use uuid::Uuid; /// /// This distinction is important because some providers (like Anthropic via StakAI adapter) /// send multiple tool calls with the same index but different IDs. -struct ToolCallAccumulator { +pub struct ToolCallAccumulator { tool_calls: Vec, } impl ToolCallAccumulator { - fn new() -> Self { + pub fn new() -> Self { Self { tool_calls: Vec::new(), } } /// Process a tool call delta and accumulate it into the appropriate tool call. - fn process_delta(&mut self, delta: &ToolCallDelta) { + pub fn process_delta(&mut self, delta: &ToolCallDelta) { let delta_id = delta.id.as_deref().filter(|id| !id.is_empty()); let delta_func = delta.function.as_ref(); @@ -93,7 +93,7 @@ impl ToolCallAccumulator { } /// Get the accumulated tool calls, filtering out empty placeholders. - fn into_tool_calls(self) -> Vec { + pub fn into_tool_calls(self) -> Vec { self.tool_calls .into_iter() .filter(|tc| !tc.id.is_empty())