diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index c611eb20a51..4ea8d0d2ca9 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -5,6 +5,7 @@ use eventsource_stream::Eventsource; use futures::Stream; use futures::StreamExt; use futures::TryStreamExt; +use regex_lite::Regex; use reqwest::StatusCode; use serde_json::json; use std::pin::Pin; @@ -36,6 +37,7 @@ pub(crate) async fn stream_chat_completions( model_family: &ModelFamily, client: &reqwest::Client, provider: &ModelProviderInfo, + parallel_tool_calls: bool, ) -> Result { // Build messages array let mut messages = Vec::::new(); @@ -277,6 +279,13 @@ pub(crate) async fn stream_chat_completions( "tools": tools_json, }); + if parallel_tool_calls && let Some(obj) = payload.as_object_mut() { + obj.insert( + "parallel_tool_calls".to_string(), + serde_json::Value::Bool(true), + ); + } + if let Some(schema) = &prompt.output_schema && let Some(obj) = payload.as_object_mut() { @@ -602,9 +611,52 @@ async fn process_chat_sse( let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; } "stop" => { + let mut reasoning_emitted = false; + + if !fn_call_state.active && !assistant_text.is_empty() { + let (cleaned_text, parsed_calls) = + extract_embedded_tool_calls(&assistant_text); + + if !parsed_calls.is_empty() { + assistant_text = cleaned_text; + + if !reasoning_text.is_empty() { + let item = ResponseItem::Reasoning { + id: String::new(), + summary: Vec::new(), + content: Some(vec![ReasoningItemContent::ReasoningText { + text: std::mem::take(&mut reasoning_text), + }]), + encrypted_content: None, + }; + let _ = tx_event + .send(Ok(ResponseEvent::OutputItemDone(item))) + .await; + reasoning_emitted = true; + } + + for call in parsed_calls { + let call_id = call + .call_id + .unwrap_or_else(|| format!("tool_call_{}", Uuid::new_v4())); + let item = ResponseItem::FunctionCall { + id: None, + name: call.name, + arguments: call.arguments, + call_id, + }; + let _ = tx_event + .send(Ok(ResponseEvent::OutputItemDone(item))) + .await; + } + } + } + // Regular turn without tool-call. Emit the final assistant message // as a single OutputItemDone so non-delta consumers see the result. - if !assistant_text.is_empty() { + let has_message_content = + assistant_text.chars().any(|c| !c.is_whitespace()); + if has_message_content { let item = ResponseItem::Message { role: "assistant".to_string(), content: vec![ContentItem::OutputText { @@ -613,9 +665,12 @@ async fn process_chat_sse( id: None, }; let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await; + } else { + assistant_text.clear(); } + // Also emit a terminal Reasoning item so UIs can finalize raw reasoning. - if !reasoning_text.is_empty() { + if !reasoning_text.is_empty() && !reasoning_emitted { let item = ResponseItem::Reasoning { id: String::new(), summary: Vec::new(), @@ -647,6 +702,62 @@ async fn process_chat_sse( } } +#[derive(Debug)] +struct EmbeddedToolCall { + name: String, + arguments: String, + call_id: Option, +} + +fn extract_embedded_tool_calls(text: &str) -> (String, Vec) { + let regex = match Regex::new(r"\s*([\s\S]*?)\s*") { + Ok(regex) => regex, + Err(_) => return (text.to_string(), Vec::new()), + }; + let mut cleaned = String::with_capacity(text.len()); + let mut tool_calls = Vec::new(); + let mut last_index = 0; + + for capture in regex.captures_iter(text) { + if let Some(m) = capture.get(0) { + cleaned.push_str(&text[last_index..m.start()]); + + let inner = capture.get(1).map(|c| c.as_str().trim()).unwrap_or(""); + match serde_json::from_str::(inner) { + Ok(obj) => { + let name = obj.get("name").and_then(|v| v.as_str()); + let arguments_value = obj.get("arguments"); + if let (Some(name), Some(arguments_value)) = (name, arguments_value) { + let arguments = if let Some(s) = arguments_value.as_str() { + s.to_string() + } else { + serde_json::to_string(arguments_value).unwrap_or_default() + }; + let call_id = obj + .get("id") + .and_then(|v| v.as_str()) + .map(std::string::ToString::to_string); + tool_calls.push(EmbeddedToolCall { + name: name.to_string(), + arguments, + call_id, + }); + } else { + cleaned.push_str(m.as_str()); + } + } + Err(_) => cleaned.push_str(m.as_str()), + } + + last_index = m.end(); + } + } + + cleaned.push_str(&text[last_index..]); + + (cleaned, tool_calls) +} + #[cfg(test)] mod tests { use super::*; @@ -693,6 +804,51 @@ mod tests { "unexpected fallback call_id prefix: {call_id}" ); } + + #[tokio::test] + async fn converts_embedded_tool_call_tags_into_function_calls() { + let chunks = vec![ + Ok::(Bytes::from_static( + b"data: {\"choices\":[{\"delta\":{\"content\":\"{\\\"name\\\":\\\"run\\\",\\\"arguments\\\":{\\\"cmd\\\":\\\"echo\\\"}}<\\/tool_call>\"}}]}\n\n", + )), + Ok::(Bytes::from_static( + b"data: {\"choices\":[{\"finish_reason\":\"stop\"}]}\n\n", + )), + ]; + + let stream = stream::iter(chunks); + let (tx, mut rx) = mpsc::channel(8); + let handle = tokio::spawn(async move { + process_chat_sse(stream, tx, Duration::from_secs(5)).await; + }); + + let mut observed = Vec::new(); + while let Some(event) = rx.recv().await { + match event.expect("stream event") { + ResponseEvent::OutputItemDone(ResponseItem::FunctionCall { + name, + arguments, + call_id, + .. + }) => { + observed.push((name, arguments, call_id)); + } + ResponseEvent::OutputItemDone(ResponseItem::Message { .. }) => { + panic!("unexpected assistant message emitted instead of tool call"); + } + ResponseEvent::Completed { .. } => break, + _ => {} + } + } + + handle.await.expect("process_chat_sse task"); + + assert_eq!(observed.len(), 1); + let (name, arguments, call_id) = observed.into_iter().next().unwrap(); + assert_eq!(name, "run"); + assert_eq!(arguments, "{\"cmd\":\"echo\"}"); + assert!(call_id.starts_with("tool_call_")); + } } /// Optional client-side aggregation helper diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index e8aca68fe8b..ed0dff11701 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -127,6 +127,7 @@ impl ModelClient { &self.config.model_family, &self.client, &self.provider, + self.config.parallel_tool_calls, ) .await?; @@ -216,7 +217,7 @@ impl ModelClient { input: &input_with_instructions, tools: &tools_json, tool_choice: "auto", - parallel_tool_calls: false, + parallel_tool_calls: self.config.parallel_tool_calls, reasoning, store: azure_workaround, stream: true, diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index b695581deb2..740be76af93 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -21,6 +21,8 @@ use tokio::sync::mpsc; /// Review thread system prompt. Edit `core/src/review_prompt.md` to customize. pub const REVIEW_PROMPT: &str = include_str!("../review_prompt.md"); +const QWEN3_CODER_SYSTEM_INSTRUCTIONS: &str = "When tools are available, call them directly using the `tool_calls` format without extra explanation. Do not output `` blocks."; + /// API request payload for a single model turn #[derive(Default, Debug, Clone)] pub struct Prompt { @@ -53,14 +55,36 @@ impl Prompt { OpenAiTool::Freeform(f) => f.name == "apply_patch", _ => false, }); - if self.base_instructions_override.is_none() + let mut instructions = if self.base_instructions_override.is_none() && model.needs_special_apply_patch_instructions && !is_apply_patch_tool_present { Cow::Owned(format!("{base}\n{APPLY_PATCH_TOOL_INSTRUCTIONS}")) } else { Cow::Borrowed(base) + }; + + if let Some(extra) = extra_instructions_for_model(model) { + instructions = match instructions { + Cow::Borrowed(base) => { + let mut owned = base.to_string(); + if !owned.ends_with('\n') { + owned.push('\n'); + } + owned.push_str(extra); + Cow::Owned(owned) + } + Cow::Owned(mut owned) => { + if !owned.ends_with('\n') { + owned.push('\n'); + } + owned.push_str(extra); + Cow::Owned(owned) + } + }; } + + instructions } pub(crate) fn get_formatted_input(&self) -> Vec { @@ -68,6 +92,13 @@ impl Prompt { } } +fn extra_instructions_for_model(model: &ModelFamily) -> Option<&'static str> { + match model.slug.as_str() { + "qwen/qwen3-coder-30b" => Some(QWEN3_CODER_SYSTEM_INSTRUCTIONS), + _ => None, + } +} + #[derive(Debug)] pub enum ResponseEvent { Created, @@ -368,4 +399,18 @@ mod tests { let v = serde_json::to_value(&req).expect("json"); assert!(v.get("text").is_none()); } + + #[test] + fn adds_qwen3_coder_system_instructions() { + let prompt = Prompt::default(); + let model_family = find_family_for_model("qwen/qwen3-coder-30b").expect("known model"); + + let instructions = prompt.get_full_instructions(&model_family); + let instructions = instructions.as_ref(); + + assert!( + instructions.ends_with(QWEN3_CODER_SYSTEM_INSTRUCTIONS), + "expected qwen-specific guidance to be appended" + ); + } } diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs index 292b9f7b51a..8e0032d1a69 100644 --- a/codex-rs/core/src/config.rs +++ b/codex-rs/core/src/config.rs @@ -178,6 +178,9 @@ pub struct Config { /// model family's default preference. pub include_apply_patch_tool: bool, + /// Enable the parallel tool call mode when supported by the provider. + pub parallel_tool_calls: bool, + pub tools_web_search_request: bool, pub use_experimental_streamable_shell_tool: bool, @@ -694,6 +697,9 @@ pub struct ConfigToml { /// Optional verbosity control for GPT-5 models (Responses API `text.verbosity`). pub model_verbosity: Option, + /// Allow the model to request multiple tools within a single turn. + pub parallel_tool_calls: Option, + /// Override to force-enable reasoning summaries for the configured model. pub model_supports_reasoning_summaries: Option, @@ -857,6 +863,7 @@ pub struct ConfigOverrides { pub include_plan_tool: Option, pub include_apply_patch_tool: Option, pub include_view_image_tool: Option, + pub parallel_tool_calls: Option, pub show_raw_agent_reasoning: Option, pub tools_web_search_request: Option, } @@ -885,6 +892,7 @@ impl Config { include_plan_tool, include_apply_patch_tool, include_view_image_tool, + parallel_tool_calls, show_raw_agent_reasoning, tools_web_search_request: override_tools_web_search_request, } = overrides; @@ -960,6 +968,10 @@ impl Config { .or(cfg.tools.as_ref().and_then(|t| t.view_image)) .unwrap_or(true); + let parallel_tool_calls = parallel_tool_calls + .or(cfg.parallel_tool_calls) + .unwrap_or(false); + let model = model .or(config_profile.model) .or(cfg.model) @@ -1052,6 +1064,7 @@ impl Config { .unwrap_or("https://chatgpt.com/backend-api/".to_string()), include_plan_tool: include_plan_tool.unwrap_or(false), include_apply_patch_tool: include_apply_patch_tool.unwrap_or(false), + parallel_tool_calls, tools_web_search_request, use_experimental_streamable_shell_tool: cfg .experimental_use_exec_command_tool @@ -1801,6 +1814,7 @@ model_verbosity = "high" base_instructions: None, include_plan_tool: false, include_apply_patch_tool: false, + parallel_tool_calls: false, tools_web_search_request: false, use_experimental_streamable_shell_tool: false, use_experimental_unified_exec_tool: false, @@ -1860,6 +1874,7 @@ model_verbosity = "high" base_instructions: None, include_plan_tool: false, include_apply_patch_tool: false, + parallel_tool_calls: false, tools_web_search_request: false, use_experimental_streamable_shell_tool: false, use_experimental_unified_exec_tool: false, @@ -1934,6 +1949,7 @@ model_verbosity = "high" base_instructions: None, include_plan_tool: false, include_apply_patch_tool: false, + parallel_tool_calls: false, tools_web_search_request: false, use_experimental_streamable_shell_tool: false, use_experimental_unified_exec_tool: false, @@ -1994,6 +2010,7 @@ model_verbosity = "high" base_instructions: None, include_plan_tool: false, include_apply_patch_tool: false, + parallel_tool_calls: false, tools_web_search_request: false, use_experimental_streamable_shell_tool: false, use_experimental_unified_exec_tool: false, diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index cf15e02d29f..c9219f3f5da 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -983,7 +983,9 @@ async fn usage_limit_error_emits_rate_limit_event() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn azure_overrides_assign_properties_used_for_responses_url() { skip_if_no_network!(); - let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" }; + let (env_key, env_value) = std::env::vars() + .find(|(key, _)| key != "CODEX_SANDBOX_NETWORK_DISABLED") + .expect("tests require at least one environment variable"); // Mock server let server = MockServer::start().await; @@ -1000,11 +1002,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() { .and(header_regex("Custom-Header", "Value")) .and(header_regex( "Authorization", - format!( - "Bearer {}", - std::env::var(existing_env_var_with_random_value).unwrap() - ) - .as_str(), + format!("Bearer {env_value}").as_str(), )) .respond_with(first) .expect(1) @@ -1014,8 +1012,8 @@ async fn azure_overrides_assign_properties_used_for_responses_url() { let provider = ModelProviderInfo { name: "custom".to_string(), base_url: Some(format!("{}/openai", server.uri())), - // Reuse the existing environment variable to avoid using unsafe code - env_key: Some(existing_env_var_with_random_value.to_string()), + // Reuse an existing environment variable to avoid unsafe overrides + env_key: Some(env_key.clone()), query_params: Some(std::collections::HashMap::from([( "api-version".to_string(), "2025-04-01-preview".to_string(), @@ -1060,7 +1058,9 @@ async fn azure_overrides_assign_properties_used_for_responses_url() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn env_var_overrides_loaded_auth() { skip_if_no_network!(); - let existing_env_var_with_random_value = if cfg!(windows) { "USERNAME" } else { "USER" }; + let (env_key, env_value) = std::env::vars() + .find(|(key, _)| key != "CODEX_SANDBOX_NETWORK_DISABLED") + .expect("tests require at least one environment variable"); // Mock server let server = MockServer::start().await; @@ -1077,11 +1077,7 @@ async fn env_var_overrides_loaded_auth() { .and(header_regex("Custom-Header", "Value")) .and(header_regex( "Authorization", - format!( - "Bearer {}", - std::env::var(existing_env_var_with_random_value).unwrap() - ) - .as_str(), + format!("Bearer {env_value}").as_str(), )) .respond_with(first) .expect(1) @@ -1091,8 +1087,8 @@ async fn env_var_overrides_loaded_auth() { let provider = ModelProviderInfo { name: "custom".to_string(), base_url: Some(format!("{}/openai", server.uri())), - // Reuse the existing environment variable to avoid using unsafe code - env_key: Some(existing_env_var_with_random_value.to_string()), + // Reuse an existing environment variable to avoid unsafe overrides + env_key: Some(env_key.clone()), query_params: Some(std::collections::HashMap::from([( "api-version".to_string(), "2025-04-01-preview".to_string(), diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 033020b7e69..34baebda1c7 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -189,6 +189,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any include_plan_tool: Some(include_plan_tool), include_apply_patch_tool: None, include_view_image_tool: None, + parallel_tool_calls: None, show_raw_agent_reasoning: using_oss.then_some(true), tools_web_search_request: None, }; diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index eec3a62a2c8..1ee2d914d11 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -1271,6 +1271,7 @@ fn derive_config_from_params( include_plan_tool, include_apply_patch_tool, include_view_image_tool: None, + parallel_tool_calls: None, show_raw_agent_reasoning: None, tools_web_search_request: None, }; diff --git a/codex-rs/mcp-server/src/codex_tool_config.rs b/codex-rs/mcp-server/src/codex_tool_config.rs index d90924aa942..70845085251 100644 --- a/codex-rs/mcp-server/src/codex_tool_config.rs +++ b/codex-rs/mcp-server/src/codex_tool_config.rs @@ -163,6 +163,7 @@ impl CodexToolCallParam { include_plan_tool, include_apply_patch_tool: None, include_view_image_tool: None, + parallel_tool_calls: None, show_raw_agent_reasoning: None, tools_web_search_request: None, }; diff --git a/codex-rs/tui/src/cli.rs b/codex-rs/tui/src/cli.rs index b72e91c5cc7..be2c4a473ac 100644 --- a/codex-rs/tui/src/cli.rs +++ b/codex-rs/tui/src/cli.rs @@ -78,6 +78,10 @@ pub struct Cli { #[arg(long = "search", default_value_t = false)] pub web_search: bool, + /// Allow the model to request multiple tools within a single turn when supported. + #[arg(long = "parallel-tool-calls", default_value_t = false)] + pub parallel_tool_calls: bool, + #[clap(skip)] pub config_overrides: CliConfigOverrides, } diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 7ade187916a..d47801b82ba 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -135,7 +135,10 @@ pub async fn run_main( let resolved = match codex_lmstudio::resolve_model_identifier(model.as_deref()) { Ok(model) => model, Err(err) => { - eprintln!("{err}"); + #[allow(clippy::print_stderr)] + { + eprintln!("{err}"); + } std::process::exit(1); } }; @@ -160,6 +163,7 @@ pub async fn run_main( include_plan_tool: Some(true), include_apply_patch_tool: None, include_view_image_tool: None, + parallel_tool_calls: cli.parallel_tool_calls.then_some(true), show_raw_agent_reasoning: using_oss.then_some(true), tools_web_search_request: cli.web_search.then_some(true), };