From 3d4764d576cf12a244032004cfb70fd81ca280b9 Mon Sep 17 00:00:00 2001 From: Hackall <36754621+hackall360@users.noreply.github.com> Date: Sun, 28 Sep 2025 22:30:30 -0700 Subject: [PATCH] Add internal MCP server exposing Codex tooling --- codex-rs/Cargo.lock | 1 + codex-rs/mcp-server/Cargo.toml | 5 + .../mcp-server/src/bin/codex-mcp-internal.rs | 16 + codex-rs/mcp-server/src/internal_tools.rs | 367 ++++++++++++++++++ codex-rs/mcp-server/src/lib.rs | 4 + codex-rs/mcp-server/src/main.rs | 8 +- codex-rs/mcp-server/src/message_processor.rs | 134 +++++-- 7 files changed, 503 insertions(+), 32 deletions(-) create mode 100644 codex-rs/mcp-server/src/bin/codex-mcp-internal.rs create mode 100644 codex-rs/mcp-server/src/internal_tools.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index f1893d65b55..dcd8d286538 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -926,6 +926,7 @@ dependencies = [ "anyhow", "assert_cmd", "base64", + "codex-apply-patch", "codex-arg0", "codex-common", "codex-core", diff --git a/codex-rs/mcp-server/Cargo.toml b/codex-rs/mcp-server/Cargo.toml index e40dd15df71..a02e8efa525 100644 --- a/codex-rs/mcp-server/Cargo.toml +++ b/codex-rs/mcp-server/Cargo.toml @@ -7,6 +7,10 @@ version = { workspace = true } name = "codex-mcp-server" path = "src/main.rs" +[[bin]] +name = "codex-mcp-internal" +path = "src/bin/codex-mcp-internal.rs" + [lib] name = "codex_mcp_server" path = "src/lib.rs" @@ -21,6 +25,7 @@ codex-common = { workspace = true, features = ["cli"] } codex-core = { workspace = true } codex-login = { workspace = true } codex-protocol = { workspace = true } +codex-apply-patch = { workspace = true } mcp-types = { workspace = true } schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/codex-rs/mcp-server/src/bin/codex-mcp-internal.rs b/codex-rs/mcp-server/src/bin/codex-mcp-internal.rs new file mode 100644 index 00000000000..96d566e19b2 --- /dev/null +++ b/codex-rs/mcp-server/src/bin/codex-mcp-internal.rs @@ -0,0 +1,16 @@ +use codex_arg0::arg0_dispatch_or_else; +use codex_common::CliConfigOverrides; +use codex_mcp_server::ServerMode; +use codex_mcp_server::run_main; + +fn main() -> anyhow::Result<()> { + arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move { + run_main( + codex_linux_sandbox_exe, + CliConfigOverrides::default(), + ServerMode::InternalTools, + ) + .await?; + Ok(()) + }) +} diff --git a/codex-rs/mcp-server/src/internal_tools.rs b/codex-rs/mcp-server/src/internal_tools.rs new file mode 100644 index 00000000000..913b135f227 --- /dev/null +++ b/codex-rs/mcp-server/src/internal_tools.rs @@ -0,0 +1,367 @@ +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + +use codex_apply_patch::apply_patch; +use codex_core::config::Config; +use codex_core::exec::ExecParams; +use codex_core::exec::ExecToolCallOutput; +use codex_core::exec::SandboxType; +use codex_core::exec::process_exec_tool_call; +use codex_core::exec_env::create_env; +use codex_core::get_platform_sandbox; +use codex_core::plan_tool::UpdatePlanArgs; +use codex_protocol::models::ShellToolCallParams; +use mcp_types::CallToolResult; +use mcp_types::ContentBlock; +use mcp_types::TextContent; +use mcp_types::Tool; +use mcp_types::ToolInputSchema; +use serde::Deserialize; +use serde::de::DeserializeOwned; +use serde_json::json; +use tokio::sync::Mutex; + +pub struct InternalToolRegistry { + config: Arc, + plan_state: Mutex>, +} + +impl InternalToolRegistry { + pub fn new(config: Arc) -> Self { + Self { + config, + plan_state: Mutex::new(None), + } + } + + pub fn list_tools(&self) -> Vec { + vec![ + shell_tool(), + apply_patch_tool(), + update_plan_tool(), + view_image_tool(), + ] + } + + pub async fn try_call_tool( + &self, + name: &str, + arguments: Option, + ) -> Option { + match name { + "shell" | "container.exec" => Some(self.call_shell(arguments).await), + "apply_patch" => Some(self.call_apply_patch(arguments).await), + "update_plan" => Some(self.call_update_plan(arguments).await), + "view_image" => Some(self.call_view_image(arguments).await), + _ => None, + } + } + + async fn call_shell(&self, arguments: Option) -> CallToolResult { + let params: ShellToolCallParams = match parse_arguments(arguments) { + Ok(value) => value, + Err(err) => return error_result(err), + }; + if params.command.is_empty() { + return error_result("Shell tool requires a command to execute"); + } + let cwd = resolve_path(&self.config.cwd, params.workdir.as_deref()); + let env = create_env(&self.config.shell_environment_policy); + let exec_params = ExecParams { + command: params.command, + cwd, + timeout_ms: params.timeout_ms, + env, + with_escalated_permissions: params.with_escalated_permissions, + justification: params.justification, + }; + + let sandbox_type = match get_platform_sandbox() { + Some(SandboxType::LinuxSeccomp) if self.config.codex_linux_sandbox_exe.is_some() => { + SandboxType::LinuxSeccomp + } + Some(SandboxType::MacosSeatbelt) => SandboxType::MacosSeatbelt, + _ => SandboxType::None, + }; + + match process_exec_tool_call( + exec_params, + sandbox_type, + &self.config.sandbox_policy, + &self.config.cwd, + &self.config.codex_linux_sandbox_exe, + None, + ) + .await + { + Ok(output) => success_result(format_exec_output(&output)), + Err(err) => error_result(format!("Failed to execute command: {err}")), + } + } + + async fn call_apply_patch(&self, arguments: Option) -> CallToolResult { + #[derive(Deserialize)] + #[serde(untagged)] + enum ApplyPatchArgs { + Structured { input: String }, + Legacy { patch: String }, + } + + let value: ApplyPatchArgs = match parse_arguments(arguments) { + Ok(v) => v, + Err(err) => return error_result(err), + }; + let patch = match value { + ApplyPatchArgs::Structured { input } => input, + ApplyPatchArgs::Legacy { patch } => patch, + }; + + let mut stdout = Vec::new(); + let mut stderr = Vec::new(); + match apply_patch(&patch, &mut stdout, &mut stderr) { + Ok(()) => { + let mut output = String::new(); + if !stdout.is_empty() { + output.push_str(&String::from_utf8_lossy(&stdout)); + } + if !stderr.is_empty() { + if !output.is_empty() { + output.push('\n'); + } + output.push_str(&String::from_utf8_lossy(&stderr)); + } + if output.trim().is_empty() { + output = "Patch applied".to_string(); + } + success_result(output) + } + Err(err) => error_result(format!("Failed to apply patch: {err}")), + } + } + + async fn call_update_plan(&self, arguments: Option) -> CallToolResult { + let args: UpdatePlanArgs = match parse_arguments(arguments) { + Ok(value) => value, + Err(err) => return error_result(err), + }; + *self.plan_state.lock().await = Some(args.clone()); + match serde_json::to_string(&args) { + Ok(serialized) => success_result(format!("Plan updated: {serialized}")), + Err(_) => success_result("Plan updated".to_string()), + } + } + + async fn call_view_image(&self, arguments: Option) -> CallToolResult { + #[derive(Deserialize)] + struct ViewImageArgs { + path: String, + } + let args: ViewImageArgs = match parse_arguments(arguments) { + Ok(value) => value, + Err(err) => return error_result(err), + }; + let resolved = resolve_path(&self.config.cwd, Some(args.path.as_str())); + match std::fs::metadata(&resolved) { + Ok(meta) if meta.is_file() => { + success_result(format!("Image available at {}", resolved.display())) + } + Ok(_) => error_result(format!("Path {} is not a file", resolved.display())), + Err(err) => error_result(format!("Failed to access {}: {err}", resolved.display())), + } + } +} + +fn parse_arguments(arguments: Option) -> Result +where + T: DeserializeOwned, +{ + let value = arguments.ok_or_else(|| "Missing arguments".to_string())?; + serde_json::from_value(value).map_err(|err| format!("Failed to parse arguments: {err}")) +} + +fn resolve_path(base: &Path, path: Option<&str>) -> PathBuf { + match path { + Some(p) => { + let candidate = Path::new(p); + if candidate.is_absolute() { + candidate.to_path_buf() + } else { + base.join(candidate) + } + } + None => base.to_path_buf(), + } +} + +fn shell_tool() -> Tool { + let properties = json!({ + "command": { + "type": "array", + "items": { "type": "string" }, + "description": "The command to execute" + }, + "workdir": { + "type": "string", + "description": "Working directory to run the command in" + }, + "timeout_ms": { + "type": "number", + "description": "Timeout for the command in milliseconds" + } + }); + Tool { + name: "shell".to_string(), + title: Some("Shell".to_string()), + description: Some("Run a shell command and return its output.".to_string()), + input_schema: ToolInputSchema { + r#type: "object".to_string(), + properties: Some(properties), + required: Some(vec!["command".to_string()]), + }, + output_schema: None, + annotations: None, + } +} + +fn apply_patch_tool() -> Tool { + let properties = json!({ + "input": { + "type": "string", + "description": "Unified diff describing the file changes" + } + }); + Tool { + name: "apply_patch".to_string(), + title: Some("Apply Patch".to_string()), + description: Some("Apply a unified diff to the local workspace.".to_string()), + input_schema: ToolInputSchema { + r#type: "object".to_string(), + properties: Some(properties), + required: Some(vec!["input".to_string()]), + }, + output_schema: None, + annotations: None, + } +} + +fn update_plan_tool() -> Tool { + let plan_item_schema = json!({ + "type": "object", + "properties": { + "step": { "type": "string" }, + "status": { + "type": "string", + "description": "One of: pending, in_progress, completed" + } + }, + "required": ["step", "status"], + "additionalProperties": false + }); + let properties = json!({ + "plan": { + "type": "array", + "description": "List of plan items", + "items": plan_item_schema + }, + "explanation": { + "type": "string", + "description": "Optional explanation of the plan" + } + }); + Tool { + name: "update_plan".to_string(), + title: Some("Update Plan".to_string()), + description: Some( + "Record the current task plan with step statuses for external observers.".to_string(), + ), + input_schema: ToolInputSchema { + r#type: "object".to_string(), + properties: Some(properties), + required: Some(vec!["plan".to_string()]), + }, + output_schema: None, + annotations: None, + } +} + +fn view_image_tool() -> Tool { + let properties = json!({ + "path": { + "type": "string", + "description": "Filesystem path to an image file" + } + }); + Tool { + name: "view_image".to_string(), + title: Some("View Image".to_string()), + description: Some( + "Attach a local image path so other agents can reference the file.".to_string(), + ), + input_schema: ToolInputSchema { + r#type: "object".to_string(), + properties: Some(properties), + required: Some(vec!["path".to_string()]), + }, + output_schema: None, + annotations: None, + } +} + +fn success_result(message: impl Into) -> CallToolResult { + CallToolResult { + content: vec![ContentBlock::TextContent(TextContent { + r#type: "text".to_string(), + text: message.into(), + annotations: None, + })], + is_error: Some(false), + structured_content: None, + } +} + +fn error_result(message: impl Into) -> CallToolResult { + CallToolResult { + content: vec![ContentBlock::TextContent(TextContent { + r#type: "text".to_string(), + text: message.into(), + annotations: None, + })], + is_error: Some(true), + structured_content: None, + } +} + +fn format_exec_output(output: &ExecToolCallOutput) -> String { + let mut sections: Vec = Vec::new(); + sections.push(format!( + "Exit code: {}\nDuration: {:.3} seconds", + output.exit_code, + output.duration.as_secs_f32() + )); + if output.timed_out { + sections.push("Command timed out".to_string()); + } + if !output.stdout.text.trim().is_empty() { + sections.push(format!("Stdout:\n{}", output.stdout.text)); + } + if let Some(lines) = output.stdout.truncated_after_lines { + sections.push(format!("Stdout truncated after {lines} lines")); + } + if !output.stderr.text.trim().is_empty() { + sections.push(format!("Stderr:\n{}", output.stderr.text)); + } + if let Some(lines) = output.stderr.truncated_after_lines { + sections.push(format!("Stderr truncated after {lines} lines")); + } + if !output.aggregated_output.text.trim().is_empty() { + sections.push(format!( + "Combined output:\n{}", + output.aggregated_output.text + )); + } + if let Some(lines) = output.aggregated_output.truncated_after_lines { + sections.push(format!("Combined output truncated after {lines} lines")); + } + sections.join("\n\n") +} diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index b1397846cac..08021ccd481 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -25,12 +25,14 @@ mod codex_tool_config; mod codex_tool_runner; mod error_code; mod exec_approval; +mod internal_tools; mod json_to_toml; pub(crate) mod message_processor; mod outgoing_message; mod patch_approval; use crate::message_processor::MessageProcessor; +pub use crate::message_processor::ServerMode; use crate::outgoing_message::OutgoingMessage; use crate::outgoing_message::OutgoingMessageSender; @@ -49,6 +51,7 @@ const CHANNEL_CAPACITY: usize = 128; pub async fn run_main( codex_linux_sandbox_exe: Option, cli_config_overrides: CliConfigOverrides, + server_mode: ServerMode, ) -> IoResult<()> { // Install a simple subscriber so `tracing` output is visible. Users can // control the log level with `RUST_LOG`. @@ -104,6 +107,7 @@ pub async fn run_main( outgoing_message_sender, codex_linux_sandbox_exe, std::sync::Arc::new(config), + server_mode, ); async move { while let Some(msg) = incoming_rx.recv().await { diff --git a/codex-rs/mcp-server/src/main.rs b/codex-rs/mcp-server/src/main.rs index 314944fab57..0857fd4ffb0 100644 --- a/codex-rs/mcp-server/src/main.rs +++ b/codex-rs/mcp-server/src/main.rs @@ -1,10 +1,16 @@ use codex_arg0::arg0_dispatch_or_else; use codex_common::CliConfigOverrides; +use codex_mcp_server::ServerMode; use codex_mcp_server::run_main; fn main() -> anyhow::Result<()> { arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move { - run_main(codex_linux_sandbox_exe, CliConfigOverrides::default()).await?; + run_main( + codex_linux_sandbox_exe, + CliConfigOverrides::default(), + ServerMode::CodexGateway, + ) + .await?; Ok(()) }) } diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index 5868d60fc09..2b099da48e9 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -7,6 +7,7 @@ use crate::codex_tool_config::CodexToolCallReplyParam; use crate::codex_tool_config::create_tool_for_codex_tool_call_param; use crate::codex_tool_config::create_tool_for_codex_tool_call_reply_param; use crate::error_code::INVALID_REQUEST_ERROR_CODE; +use crate::internal_tools::InternalToolRegistry; use crate::outgoing_message::OutgoingMessageSender; use codex_protocol::mcp_protocol::ClientRequest; use codex_protocol::mcp_protocol::ConversationId; @@ -37,13 +38,30 @@ use std::sync::Arc; use tokio::sync::Mutex; use tokio::task; +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum ServerMode { + CodexGateway, + InternalTools, +} + +impl ServerMode { + pub fn server_name(self) -> &'static str { + match self { + ServerMode::CodexGateway => "codex-mcp-server", + ServerMode::InternalTools => "codex-mcp-internal", + } + } +} + pub(crate) struct MessageProcessor { - codex_message_processor: CodexMessageProcessor, + codex_message_processor: Option, outgoing: Arc, initialized: bool, codex_linux_sandbox_exe: Option, - conversation_manager: Arc, + conversation_manager: Option>, running_requests_id_to_codex_uuid: Arc>>, + server_mode: ServerMode, + internal_tools: Option>, } impl MessageProcessor { @@ -53,17 +71,29 @@ impl MessageProcessor { outgoing: OutgoingMessageSender, codex_linux_sandbox_exe: Option, config: Arc, + server_mode: ServerMode, ) -> Self { let outgoing = Arc::new(outgoing); - let auth_manager = AuthManager::shared(config.codex_home.clone()); - let conversation_manager = Arc::new(ConversationManager::new(auth_manager.clone())); - let codex_message_processor = CodexMessageProcessor::new( - auth_manager, - conversation_manager.clone(), - outgoing.clone(), - codex_linux_sandbox_exe.clone(), - config, - ); + let internal_tools = match server_mode { + ServerMode::InternalTools => Some(Arc::new(InternalToolRegistry::new(config.clone()))), + ServerMode::CodexGateway => None, + }; + + let (codex_message_processor, conversation_manager) = match server_mode { + ServerMode::CodexGateway => { + let auth_manager = AuthManager::shared(config.codex_home.clone()); + let conversation_manager = Arc::new(ConversationManager::new(auth_manager.clone())); + let processor = CodexMessageProcessor::new( + auth_manager, + conversation_manager.clone(), + outgoing.clone(), + codex_linux_sandbox_exe.clone(), + config, + ); + (Some(processor), Some(conversation_manager)) + } + ServerMode::InternalTools => (None, None), + }; Self { codex_message_processor, outgoing, @@ -71,18 +101,19 @@ impl MessageProcessor { codex_linux_sandbox_exe, conversation_manager, running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())), + server_mode, + internal_tools, } } pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) { - if let Ok(request_json) = serde_json::to_value(request.clone()) + if let Some(codex_processor) = &mut self.codex_message_processor + && let Ok(request_json) = serde_json::to_value(request.clone()) && let Ok(codex_request) = serde_json::from_value::(request_json) { // If the request is a Codex request, handle it with the Codex // message processor. - self.codex_message_processor - .process_request(codex_request) - .await; + codex_processor.process_request(codex_request).await; return; } @@ -233,7 +264,7 @@ impl MessageProcessor { instructions: None, protocol_version: params.protocol_version.clone(), server_info: mcp_types::Implementation { - name: "codex-mcp-server".to_string(), + name: self.server_mode.server_name().to_string(), version: env!("CARGO_PKG_VERSION").to_string(), title: Some("Codex".to_string()), user_agent: Some(get_codex_user_agent()), @@ -318,11 +349,16 @@ impl MessageProcessor { params: ::Params, ) { tracing::trace!("tools/list -> {params:?}"); + let mut tools = Vec::new(); + if let Some(registry) = &self.internal_tools { + tools.extend(registry.list_tools()); + } + if self.codex_message_processor.is_some() { + tools.push(create_tool_for_codex_tool_call_param()); + tools.push(create_tool_for_codex_tool_call_reply_param()); + } let result = ListToolsResult { - tools: vec![ - create_tool_for_codex_tool_call_param(), - create_tool_for_codex_tool_call_reply_param(), - ], + tools, next_cursor: None, }; @@ -338,6 +374,16 @@ impl MessageProcessor { tracing::info!("tools/call -> params: {:?}", params); let CallToolRequestParams { name, arguments } = params; + if let Some(registry) = &self.internal_tools + && let Some(result) = registry + .try_call_tool(name.as_str(), arguments.clone()) + .await + { + self.send_response::(id, result) + .await; + return; + } + match name.as_str() { "codex" => self.handle_tool_call_codex(id, arguments).await, "codex-reply" => { @@ -360,6 +406,20 @@ impl MessageProcessor { } } async fn handle_tool_call_codex(&self, id: RequestId, arguments: Option) { + let Some(conversation_manager) = &self.conversation_manager else { + let result = CallToolResult { + content: vec![ContentBlock::TextContent(TextContent { + r#type: "text".to_owned(), + text: "Codex conversations are not available on this server".to_owned(), + annotations: None, + })], + is_error: Some(true), + structured_content: None, + }; + self.send_response::(id, result) + .await; + return; + }; let (initial_prompt, config): (String, Config) = match arguments { Some(json_val) => match serde_json::from_value::(json_val) { Ok(tool_cfg) => match tool_cfg.into_config(self.codex_linux_sandbox_exe.clone()) { @@ -416,7 +476,7 @@ impl MessageProcessor { // Clone outgoing and server to move into async task. let outgoing = self.outgoing.clone(); - let conversation_manager = self.conversation_manager.clone(); + let conversation_manager = conversation_manager.clone(); let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone(); // Spawn an async task to handle the Codex session so that we do not @@ -506,11 +566,22 @@ impl MessageProcessor { let outgoing = self.outgoing.clone(); let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone(); - let codex = match self - .conversation_manager - .get_conversation(conversation_id) - .await - { + let Some(conversation_manager) = &self.conversation_manager else { + let result = CallToolResult { + content: vec![ContentBlock::TextContent(TextContent { + r#type: "text".to_owned(), + text: "Codex conversations are not available on this server".to_owned(), + annotations: None, + })], + is_error: Some(true), + structured_content: None, + }; + self.send_response::(request_id, result) + .await; + return; + }; + + let codex = match conversation_manager.get_conversation(conversation_id).await { Ok(c) => c, Err(_) => { tracing::warn!("Session not found for conversation_id: {conversation_id}"); @@ -591,11 +662,12 @@ impl MessageProcessor { tracing::info!("conversation_id: {conversation_id}"); // Obtain the Codex conversation from the server. - let codex_arc = match self - .conversation_manager - .get_conversation(conversation_id) - .await - { + let Some(conversation_manager) = &self.conversation_manager else { + tracing::warn!("No Codex conversation manager available for cancel request"); + return; + }; + + let codex_arc = match conversation_manager.get_conversation(conversation_id).await { Ok(c) => c, Err(_) => { tracing::warn!("Session not found for conversation_id: {conversation_id}");