diff --git a/crates/g3-cli/src/coach_feedback.rs b/crates/g3-cli/src/coach_feedback.rs index 34a1a6d..630235a 100644 --- a/crates/g3-cli/src/coach_feedback.rs +++ b/crates/g3-cli/src/coach_feedback.rs @@ -3,9 +3,7 @@ //! Extracts feedback from the coach agent's session logs for the coach-player loop. use anyhow::Result; -use std::path::Path; - -use g3_core::Agent; +use g3_core::{Agent, FeedbackExtractionConfig}; use crate::simple_output::SimpleOutput; use crate::ui_writer_impl::ConsoleUiWriter; @@ -18,107 +16,25 @@ pub fn extract_from_logs( coach_agent: &Agent, output: &SimpleOutput, ) -> Result { - let session_id = coach_agent - .get_session_id() - .ok_or_else(|| anyhow::anyhow!("Coach agent has no session ID"))?; - - let log_file_path = resolve_log_path(&session_id); - - // Try to extract from session log - if let Some(feedback) = try_extract_from_log(&log_file_path) { - output.print(&format!("✅ Extracted coach feedback from session: {}", session_id)); - return Ok(feedback); + let extracted = g3_core::extract_coach_feedback( + coach_result, + coach_agent, + &FeedbackExtractionConfig::default(), + ); + + if extracted.is_fallback() { + return Err(anyhow::anyhow!( + "Could not extract coach feedback. Coach result response length: {} chars", + coach_result.response.len() + )); } - // Fallback: use the TaskResult's extract_summary method - let fallback = coach_result.extract_summary(); - if !fallback.is_empty() { + if let Some(session_id) = coach_agent.get_session_id() { output.print(&format!( - "✅ Extracted coach feedback from response: {} chars", - fallback.len() + "✅ Extracted coach feedback from session: {} ({:?})", + session_id, extracted.source )); - return Ok(fallback); - } - - Err(anyhow::anyhow!( - "Could not extract coach feedback from session: {}\n\ - Log file path: {:?}\n\ - Log file exists: {}\n\ - Coach result response length: {} chars", - session_id, - log_file_path, - log_file_path.exists(), - coach_result.response.len() - )) -} - -/// Resolve the log file path, trying new path first then falling back to old. -fn resolve_log_path(session_id: &str) -> std::path::PathBuf { - g3_core::get_session_file(session_id) -} - -/// Extract feedback from a session log file. -/// -/// Searches backwards for the last assistant message with substantial text content. -fn try_extract_from_log(log_file_path: &Path) -> Option { - if !log_file_path.exists() { - return None; } - let log_content = std::fs::read_to_string(log_file_path).ok()?; - let log_json: serde_json::Value = serde_json::from_str(&log_content).ok()?; - - let messages = log_json - .get("context_window")? - .get("conversation_history")? - .as_array()?; - - // Search backwards for the last assistant message with text content - for msg in messages.iter().rev() { - if let Some(feedback) = extract_assistant_text(msg) { - return Some(feedback); - } - } - - None -} - -/// Extract text content from an assistant message. -fn extract_assistant_text(msg: &serde_json::Value) -> Option { - let role = msg.get("role").and_then(|v| v.as_str())?; - if !role.eq_ignore_ascii_case("assistant") { - return None; - } - - let content = msg.get("content")?; - - // Handle string content - if let Some(content_str) = content.as_str() { - return filter_substantial_text(content_str); - } - - // Handle array content (native tool calling format) - if let Some(content_array) = content.as_array() { - for block in content_array { - if block.get("type").and_then(|v| v.as_str()) == Some("text") { - if let Some(text) = block.get("text").and_then(|v| v.as_str()) { - if let Some(result) = filter_substantial_text(text) { - return Some(result); - } - } - } - } - } - - None -} - -/// Filter out empty or very short responses (likely just tool calls). -fn filter_substantial_text(text: &str) -> Option { - let trimmed = text.trim(); - if !trimmed.is_empty() && trimmed.len() > 10 { - Some(trimmed.to_string()) - } else { - None - } + Ok(extracted.content) } diff --git a/crates/g3-core/src/feedback_extraction.rs b/crates/g3-core/src/feedback_extraction.rs index c7822b7..aedb4b5 100644 --- a/crates/g3-core/src/feedback_extraction.rs +++ b/crates/g3-core/src/feedback_extraction.rs @@ -94,17 +94,9 @@ pub fn extract_coach_feedback( config: &FeedbackExtractionConfig, ) -> ExtractedFeedback where - W: UiWriter + Clone + Send + Sync + 'static, + W: UiWriter + Send + Sync + 'static, { - // Try session log first - now looks for last assistant message (primary method) - if let Some(session_id) = agent.get_session_id() { - if let Some(feedback) = try_extract_last_assistant_message(&session_id, config) { - debug!("Extracted coach feedback from last assistant message: {} chars", feedback.len()); - return ExtractedFeedback::new(feedback, FeedbackSource::ConversationHistory); - } - } - - // Fallback: Try session log with final_output pattern (backwards compatibility) + // Try session log with verified final_output pattern first. if let Some(session_id) = agent.get_session_id() { if let Some(feedback) = try_extract_from_session_log(&session_id, config) { debug!("Extracted coach feedback from session log (final_output): {} chars", feedback.len()); @@ -128,11 +120,22 @@ where // Fallback: Try TaskResult parsing (extracts last text block) let extracted = coach_result.extract_final_output(); - if !extracted.is_empty() { + if !extracted.is_empty() && !is_meta_coach_text(&extracted) { debug!("Extracted coach feedback from task result: {} chars", extracted.len()); return ExtractedFeedback::new(extracted, FeedbackSource::TaskResultResponse); } + // Last resort: use the last assistant message, but skip obvious meta-review chatter. + if let Some(session_id) = agent.get_session_id() { + if let Some(feedback) = try_extract_last_assistant_message(&session_id, config) { + debug!( + "Extracted coach feedback from last assistant message: {} chars", + feedback.len() + ); + return ExtractedFeedback::new(feedback, FeedbackSource::ConversationHistory); + } + } + // Fallback to default warn!("Could not extract coach feedback, using default"); ExtractedFeedback::new(config.default_feedback.clone(), FeedbackSource::DefaultFallback) @@ -163,41 +166,76 @@ fn try_extract_last_assistant_message( .get("conversation_history")? .as_array()?; - // Search backwards for the last assistant message with text content + extract_last_meaningful_assistant_message(messages) +} + +fn extract_last_meaningful_assistant_message(messages: &[Value]) -> Option { for msg in messages.iter().rev() { let role = msg.get("role").and_then(|v| v.as_str())?; - + if role.eq_ignore_ascii_case("assistant") { - if let Some(content) = msg.get("content") { - // Handle string content - if let Some(content_str) = content.as_str() { - let trimmed = content_str.trim(); - // Skip empty or very short responses (likely just tool calls) + if let Some(text) = extract_assistant_text(msg) { + if !is_meta_coach_text(&text) { + return Some(text); + } + } + } + } + + None +} + +fn extract_assistant_text(msg: &Value) -> Option { + let content = msg.get("content")?; + + if let Some(content_str) = content.as_str() { + let trimmed = content_str.trim(); + if !trimmed.is_empty() && trimmed.len() > 10 { + return Some(trimmed.to_string()); + } + } + + if let Some(content_array) = content.as_array() { + for block in content_array { + if block.get("type").and_then(|v| v.as_str()) == Some("text") { + if let Some(text) = block.get("text").and_then(|v| v.as_str()) { + let trimmed = text.trim(); if !trimmed.is_empty() && trimmed.len() > 10 { return Some(trimmed.to_string()); } } - // Handle array content (native tool calling format) - // Look for text blocks in the array - if let Some(content_array) = content.as_array() { - for block in content_array { - if block.get("type").and_then(|v| v.as_str()) == Some("text") { - if let Some(text) = block.get("text").and_then(|v| v.as_str()) { - let trimmed = text.trim(); - if !trimmed.is_empty() && trimmed.len() > 10 { - return Some(trimmed.to_string()); - } - } - } - } - } } } } - + None } +fn is_meta_coach_text(text: &str) -> bool { + let normalized = text.trim().to_ascii_lowercase(); + let prefixes = [ + "let me ", + "now let me ", + "i'll ", + "i will ", + "first, let me ", + ]; + let meta_markers = [ + "run the acceptance commands", + "check the remaining", + "review the requirements more carefully", + "read the spec more carefully", + "check several important details", + "check the current working directory", + "find the actual workspace directory", + "deeper review of the implementation quality", + "more carefully", + ]; + + prefixes.iter().any(|p| normalized.starts_with(p)) + && meta_markers.iter().any(|m| normalized.contains(m)) +} + /// Try to extract feedback from session log file fn try_extract_from_session_log( session_id: &str, @@ -629,4 +667,56 @@ mod tests { assert!(!config.verbose); assert!(config.default_feedback.contains("review")); } + + #[test] + fn test_is_meta_coach_text_detects_review_chatter() { + assert!(is_meta_coach_text( + "Now let me check the attach_to_order method more carefully — specifically whether attaching to an order moves the state to linked." + )); + assert!(!is_meta_coach_text( + "Fix reopened history persistence and make attach_to_order persist linked state." + )); + } + + #[test] + fn test_extract_last_meaningful_assistant_message_skips_meta_review_chatter() { + let messages = vec![ + serde_json::json!({ + "role": "assistant", + "content": "Let me run the acceptance commands from the correct directory." + }), + serde_json::json!({ + "role": "assistant", + "content": "Now let me check the attach_to_order method more carefully — specifically whether attaching to an order moves the state to linked." + }), + serde_json::json!({ + "role": "assistant", + "content": "The implementation is missing reopened history preservation and attach_to_order does not persist linked state." + }), + ]; + + let extracted = extract_last_meaningful_assistant_message(&messages); + assert_eq!( + extracted.as_deref(), + Some( + "The implementation is missing reopened history preservation and attach_to_order does not persist linked state." + ) + ); + } + + #[test] + fn test_extract_last_meaningful_assistant_message_returns_none_for_only_meta_chatter() { + let messages = vec![ + serde_json::json!({ + "role": "assistant", + "content": "Let me check the current working directory and run the tests." + }), + serde_json::json!({ + "role": "assistant", + "content": "Now let me check the remaining acceptance commands individually and review the requirements more carefully." + }), + ]; + + assert_eq!(extract_last_meaningful_assistant_message(&messages), None); + } } diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index f3f28ae..3623cb1 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -6,13 +6,14 @@ pub mod context_window; pub mod error_handling; pub mod feedback_extraction; pub mod paths; -pub mod project; pub mod pending_research; +pub mod project; pub mod provider_config; pub mod provider_registration; pub mod retry; pub mod session; pub mod session_continuation; +pub mod skills; pub mod stats; pub mod streaming; pub mod streaming_parser; @@ -20,11 +21,10 @@ pub mod task_result; pub mod tool_definitions; pub mod tool_dispatch; pub mod tools; +pub mod toolsets; pub mod ui_writer; pub mod utils; pub mod webdriver_session; -pub mod skills; -pub mod toolsets; pub use feedback_extraction::{ extract_coach_feedback, ExtractedFeedback, FeedbackExtractionConfig, FeedbackSource, @@ -47,7 +47,7 @@ pub use prompts::{ }; // Export skills module -pub use skills::{Skill, discover_skills, generate_skills_prompt}; +pub use skills::{discover_skills, generate_skills_prompt, Skill}; #[cfg(test)] mod task_result_comprehensive_tests; @@ -66,6 +66,7 @@ use g3_config::Config; use g3_providers::{CacheControl, CompletionRequest, Message, MessageRole, ProviderRegistry}; use prompts::{get_system_prompt_for_native, get_system_prompt_for_non_native}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::time::{Duration, Instant}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; @@ -150,6 +151,12 @@ pub struct Agent { tool_call_count: usize, /// Tool calls made in the current turn (reset after each turn) tool_calls_this_turn: Vec, + /// Tool call fingerprints already executed in the current turn. + executed_tool_fingerprints_this_turn: HashSet, + /// Turn-scoped diagnostic counters for duplicate suppression. + dropped_duplicate_tool_calls_turn: u32, + dropped_duplicate_text_chunks_turn: u32, + dropped_duplicate_text_chars_turn: usize, requirements_sha: Option, /// Working directory for tool execution (set by --codebase-fast-start) working_dir: Option, @@ -213,6 +220,10 @@ impl Agent { webdriver_process: std::sync::Arc::new(tokio::sync::RwLock::new(None)), tool_call_count: 0, tool_calls_this_turn: Vec::new(), + executed_tool_fingerprints_this_turn: HashSet::new(), + dropped_duplicate_tool_calls_turn: 0, + dropped_duplicate_text_chunks_turn: 0, + dropped_duplicate_text_chars_turn: 0, requirements_sha: None, working_dir: None, background_process_manager: std::sync::Arc::new( @@ -246,7 +257,15 @@ impl Agent { project_context: Option, quiet: bool, ) -> Result { - Self::new_with_mode_and_project_context(config, ui_writer, false, project_context, quiet, None).await + Self::new_with_mode_and_project_context( + config, + ui_writer, + false, + project_context, + quiet, + None, + ) + .await } pub async fn new_autonomous_with_project_context_and_quiet( @@ -255,7 +274,15 @@ impl Agent { project_context: Option, quiet: bool, ) -> Result { - Self::new_with_mode_and_project_context(config, ui_writer, true, project_context, quiet, None).await + Self::new_with_mode_and_project_context( + config, + ui_writer, + true, + project_context, + quiet, + None, + ) + .await } /// Create a new agent with a custom system prompt (for agent mode) @@ -279,7 +306,7 @@ impl Agent { /// Create a new agent with a custom provider registry (for testing). /// This allows tests to inject mock providers without needing real API credentials. - /// + /// /// **Note**: This method is intended for testing only. Do not use in production code. #[doc(hidden)] pub async fn new_for_test( @@ -336,7 +363,8 @@ impl Agent { is_autonomous: bool, quiet: bool, ) -> Result { - Self::new_with_mode_and_project_context(config, ui_writer, is_autonomous, None, quiet, None).await + Self::new_with_mode_and_project_context(config, ui_writer, is_autonomous, None, quiet, None) + .await } async fn new_with_mode_and_project_context( @@ -626,9 +654,12 @@ impl Agent { } /// Get tool definitions including any dynamically loaded toolsets. - fn get_tool_definitions_with_loaded_toolsets(&self, tool_config: tool_definitions::ToolConfig) -> Vec { + fn get_tool_definitions_with_loaded_toolsets( + &self, + tool_config: tool_definitions::ToolConfig, + ) -> Vec { let mut tools = tool_definitions::create_tool_definitions(tool_config); - + // Add tools from loaded toolsets for toolset_name in &self.loaded_toolsets { if let Ok(toolset) = toolsets::get_toolset(toolset_name) { @@ -641,7 +672,7 @@ impl Agent { } } } - + tools } @@ -811,11 +842,11 @@ impl Agent { /// Returns the number of research results injected. pub fn inject_completed_research(&mut self) -> usize { let completed = self.pending_research_manager.take_completed(); - + if completed.is_empty() { return 0; } - + for task in &completed { let message_content = match task.status { pending_research::ResearchStatus::Complete => { @@ -836,19 +867,23 @@ impl Agent { } pending_research::ResearchStatus::Pending => continue, // Skip pending tasks }; - + // Inject as a user message so the agent sees and responds to it - let message = g3_providers::Message::new(g3_providers::MessageRole::User, message_content); + let message = + g3_providers::Message::new(g3_providers::MessageRole::User, message_content); self.context_window.add_message(message); } - + completed.len() } /// Subscribe to research completion notifications. /// /// Returns None if notifications are not enabled. - pub fn subscribe_research_notifications(&self) -> Option> { + pub fn subscribe_research_notifications( + &self, + ) -> Option> + { self.pending_research_manager.subscribe() } @@ -857,7 +892,9 @@ impl Agent { /// This replaces the internal research manager with one that sends notifications. /// Call this once during setup (e.g., in interactive mode) before any research tasks. /// Returns a receiver that will receive notifications when research tasks complete. - pub fn enable_research_notifications(&mut self) -> tokio::sync::broadcast::Receiver { + pub fn enable_research_notifications( + &mut self, + ) -> tokio::sync::broadcast::Receiver { let (manager, rx) = pending_research::PendingResearchManager::with_notifications(); self.pending_research_manager = manager; rx @@ -964,6 +1001,11 @@ impl Agent { // Reset the JSON tool call filter state at the start of each new task // This prevents the filter from staying in suppression mode between user interactions self.ui_writer.reset_json_filter(); + // Reset turn-scoped duplicate suppression state/counters. + self.executed_tool_fingerprints_this_turn.clear(); + self.dropped_duplicate_tool_calls_turn = 0; + self.dropped_duplicate_text_chunks_turn = 0; + self.dropped_duplicate_text_chars_turn = 0; // Validate that the system prompt is the first message (critical invariant) self.validate_system_prompt_is_first(); @@ -1046,9 +1088,8 @@ impl Agent { let _has_native_tool_calling = provider.has_native_tool_calling(); let _supports_cache_control = provider.supports_cache_control(); let tools = if provider.has_native_tool_calling() { - let tool_config = tool_definitions::ToolConfig::new( - self.config.computer_control.enabled, - ); + let tool_config = + tool_definitions::ToolConfig::new(self.config.computer_control.enabled); Some(self.get_tool_definitions_with_loaded_toolsets(tool_config)) } else { None @@ -1132,9 +1173,12 @@ impl Agent { self.ui_writer.print_g3_progress("compacting session"); match self.force_compact().await { Ok(true) => self.ui_writer.print_g3_status("compacting session", "done"), - Ok(false) => self.ui_writer.print_g3_status("compacting session", "failed"), + Ok(false) => self + .ui_writer + .print_g3_status("compacting session", "failed"), Err(e) => { - self.ui_writer.print_g3_status("compacting session", &format!("error: {}", e)); + self.ui_writer + .print_g3_status("compacting session", &format!("error: {}", e)); } } self.pending_90_compaction = false; @@ -1337,9 +1381,10 @@ impl Agent { use crate::compaction::{perform_compaction, CompactionConfig}; self.ui_writer.println(""); - self.ui_writer.print_g3_progress( - &format!("compacting session ({}%)", self.context_window.percentage_used() as u32) - ); + self.ui_writer.print_g3_progress(&format!( + "compacting session ({}%)", + self.context_window.percentage_used() as u32 + )); let provider_name = self.providers.get(None)?.name().to_string(); let latest_user_msg = request @@ -1371,7 +1416,8 @@ impl Agent { return Ok(true); } - self.ui_writer.print_g3_status("compacting session", "failed"); + self.ui_writer + .print_g3_status("compacting session", "failed"); Err(anyhow::anyhow!( "Context window at capacity and compaction failed. Please start a new session." )) @@ -1379,12 +1425,19 @@ impl Agent { /// Check if a tool call is a duplicate of the last tool call in the previous assistant message. /// Returns Some("DUP IN MSG") if it's a duplicate, None otherwise. - fn check_duplicate_in_previous_message(&self, tool_call: &ToolCall, history_cutoff: usize) -> Option { + fn check_duplicate_in_previous_message( + &self, + tool_call: &ToolCall, + history_cutoff: usize, + ) -> Option { // Find the most recent assistant message, but only look at messages that // existed before the current streaming iteration started. This prevents // tool calls within the same response from being marked as DUP IN MSG // against messages added during the current iteration's tool execution. - for msg in self.context_window.conversation_history[..history_cutoff].iter().rev() { + for msg in self.context_window.conversation_history[..history_cutoff] + .iter() + .rev() + { if !matches!(msg.role, MessageRole::Assistant) { continue; } @@ -1550,7 +1603,9 @@ impl Agent { /// Check if there is currently project content loaded. pub fn has_project_content(&self) -> bool { - self.context_window.conversation_history.get(1) + self.context_window + .conversation_history + .get(1) .map(|m| m.content.contains("=== ACTIVE PROJECT:")) .unwrap_or(false) } @@ -1701,7 +1756,10 @@ impl Agent { let Some(session_id) = &self.session_id else { return false; }; - read_plan(session_id).ok().flatten().map_or(false, |plan| plan.is_complete()) + read_plan(session_id) + .ok() + .flatten() + .map_or(false, |plan| plan.is_complete()) } // ========================================================================= @@ -1724,6 +1782,12 @@ impl Agent { first_token_time: Option, turn_accumulated_usage: &Option, ) -> TaskResult { + debug!( + "Turn dedup diagnostics: duplicate_tool_calls_dropped={}, duplicate_text_chunks_dropped={}, duplicate_text_chars_dropped={}", + self.dropped_duplicate_tool_calls_turn, + self.dropped_duplicate_text_chunks_turn, + self.dropped_duplicate_text_chars_turn + ); self.ui_writer.finish_streaming_markdown(); self.save_context_window("completed"); @@ -1908,7 +1972,8 @@ impl Agent { // Without the trailing newline, tool call JSON like `{"tool": "remember", ...}` would // appear on the same line as "Memory checkpoint:" and leak through to the UI. // See test: test_tool_call_not_at_line_start_passes_through in filter_json.rs - self.ui_writer.print_context_status("\nMemory checkpoint:\n"); + self.ui_writer + .print_context_status("\nMemory checkpoint:\n"); // Reset JSON filter state so it starts fresh for this response self.ui_writer.reset_json_filter(); @@ -1956,9 +2021,8 @@ Skip if nothing new. Be brief."#; let provider = self.providers.get(None)?; let provider_name = provider.name().to_string(); let tools = if provider.has_native_tool_calling() { - let tool_config = tool_definitions::ToolConfig::new( - self.config.computer_control.enabled, - ); + let tool_config = + tool_definitions::ToolConfig::new(self.config.computer_control.enabled); Some(self.get_tool_definitions_with_loaded_toolsets(tool_config)) } else { None @@ -2231,8 +2295,14 @@ Skip if nothing new. Be brief."#; // Inject any completed research results into the context let injected_count = self.inject_completed_research(); if injected_count > 0 { - debug!("Injected {} completed research result(s) into context", injected_count); - self.ui_writer.println(&format!("📋 {} research result(s) ready and injected into context", injected_count)); + debug!( + "Injected {} completed research result(s) into context", + injected_count + ); + self.ui_writer.println(&format!( + "📋 {} research result(s) ready and injected into context", + injected_count + )); } // Get provider info for logging, then drop it to avoid borrow issues @@ -2323,7 +2393,7 @@ Skip if nothing new. Be brief."#; if let Some(ref usage) = chunk.usage { iter.accumulated_usage = Some(usage.clone()); state.turn_accumulated_usage = Some(usage.clone()); - + // Update cumulative cache statistics self.cache_stats.total_calls += 1; self.cache_stats.total_input_tokens += usage.prompt_tokens as u64; @@ -2361,7 +2431,8 @@ Skip if nothing new. Be brief."#; chunk.tool_calls )); } else if iter.raw_chunks.len() == 20 { - iter.raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string()); + iter.raw_chunks + .push("... (chunks 21+ omitted for brevity) ...".to_string()); } // Record time to first token @@ -2401,7 +2472,10 @@ Skip if nothing new. Be brief."#; } } // Then check against messages from before this iteration - self.check_duplicate_in_previous_message(tc, history_len_before_iteration) + self.check_duplicate_in_previous_message( + tc, + history_len_before_iteration, + ) }); // Process each tool call @@ -2419,6 +2493,21 @@ Skip if nothing new. Be brief."#; ); continue; } + if !streaming::is_turn_dedup_exempt_tool(&tool_call.tool) { + let turn_fingerprint = + streaming::tool_fingerprint(&tool_call.tool, &tool_call.args); + if !self + .executed_tool_fingerprints_this_turn + .insert(turn_fingerprint.clone()) + { + self.dropped_duplicate_tool_calls_turn += 1; + debug!( + "Skipping duplicate tool call (DUP IN TURN): {} with fingerprint {}", + tool_call.tool, turn_fingerprint + ); + continue; + } + } // Flag for post-turn compaction if at 90% capacity if self.auto_compact && self.context_window.percentage_used() >= 90.0 { @@ -2438,7 +2527,9 @@ Skip if nothing new. Be brief."#; // Use only the text before tool calls for the log message. // This prevents duplicate tool call JSON from being stored // in the assistant message when the LLM stutters. - let raw_content_for_log = streaming::clean_llm_tokens(iter.parser.get_text_before_tool_calls()); + let raw_content_for_log = streaming::clean_llm_tokens( + iter.parser.get_text_before_tool_calls(), + ); let filtered_content = self.ui_writer.filter_json_tool_calls(&clean_content); let final_display_content = filtered_content.trim(); @@ -2461,9 +2552,21 @@ Skip if nothing new. Be brief."#; self.ui_writer.print_agent_prompt(); state.response_started = true; } - self.ui_writer.print_agent_response(&new_content); - self.ui_writer.flush(); - iter.current_response.push_str(&new_content); + let deduped_text = streaming::dedup_consecutive_text( + &iter.current_response, + &new_content, + ); + if deduped_text.dropped_events > 0 { + self.dropped_duplicate_text_chunks_turn += + deduped_text.dropped_events; + self.dropped_duplicate_text_chars_turn += + deduped_text.dropped_chars; + } + if !deduped_text.content.trim().is_empty() { + self.ui_writer.print_agent_response(&deduped_text.content); + self.ui_writer.flush(); + iter.current_response.push_str(&deduped_text.content); + } } self.ui_writer.finish_streaming_markdown(); @@ -2510,8 +2613,12 @@ Skip if nothing new. Be brief."#; { Ok(result) => result?, Err(_) => { - let timeout_mins = if tool_call.tool == "research" { 20 } else { 8 }; - warn!("Tool call {} timed out after {} minutes", tool_call.tool, timeout_mins); + let timeout_mins = + if tool_call.tool == "research" { 20 } else { 8 }; + warn!( + "Tool call {} timed out after {} minutes", + tool_call.tool, timeout_mins + ); format!( "❌ Tool execution timed out after {} minutes", timeout_mins @@ -2545,25 +2652,25 @@ Skip if nothing new. Be brief."#; streaming::ToolOutputFormat::SelfHandled => None, streaming::ToolOutputFormat::Compact(summary) => Some(summary), streaming::ToolOutputFormat::Regular => { - // Regular tools: show truncated output lines - let max_lines_to_show = - if wants_full { output_len } else { MAX_LINES }; - for (idx, line) in output_lines.iter().enumerate() { - if !wants_full && idx >= max_lines_to_show { - break; + // Regular tools: show truncated output lines + let max_lines_to_show = + if wants_full { output_len } else { MAX_LINES }; + for (idx, line) in output_lines.iter().enumerate() { + if !wants_full && idx >= max_lines_to_show { + break; + } + self.ui_writer.update_tool_output_line( + &streaming::truncate_line( + line, + MAX_LINE_WIDTH, + !wants_full, + ), + ); } - self.ui_writer.update_tool_output_line( - &streaming::truncate_line( - line, - MAX_LINE_WIDTH, - !wants_full, - ), - ); - } - if !wants_full && output_len > MAX_LINES { - self.ui_writer.print_tool_output_summary(output_len); - } - None + if !wants_full && output_len > MAX_LINES { + self.ui_writer.print_tool_output_summary(output_len); + } + None } } }; @@ -2572,10 +2679,7 @@ Skip if nothing new. Be brief."#; // This ensures the log file contains the true raw content including JSON tool calls let tool_message = { let text_content = raw_content_for_log.trim().to_string(); - let mut msg = Message::new( - MessageRole::Assistant, - text_content, - ); + let mut msg = Message::new(MessageRole::Assistant, text_content); // Store the tool call structurally so that providers can // emit proper tool_use blocks (e.g. Anthropic API) instead // of inline JSON text that confuses the model. @@ -2585,7 +2689,10 @@ Skip if nothing new. Be brief."#; // Anthropic API requires tool_use IDs matching ^[a-zA-Z0-9_-]+$ use std::sync::atomic::{AtomicU64, Ordering}; static FALLBACK_COUNTER: AtomicU64 = AtomicU64::new(0); - format!("tool_{}", FALLBACK_COUNTER.fetch_add(1, Ordering::SeqCst)) + format!( + "tool_{}", + FALLBACK_COUNTER.fetch_add(1, Ordering::SeqCst) + ) } else { tool_call.id.clone() }, @@ -2671,8 +2778,9 @@ Skip if nothing new. Be brief."#; let tool_config = tool_definitions::ToolConfig::new( self.config.computer_control.enabled, ); - request.tools = - Some(self.get_tool_definitions_with_loaded_toolsets(tool_config)); + request.tools = Some( + self.get_tool_definitions_with_loaded_toolsets(tool_config), + ); } // DO NOT add final_display_content to full_response here! @@ -2728,10 +2836,21 @@ Skip if nothing new. Be brief."#; self.ui_writer.print_agent_prompt(); state.response_started = true; } - - self.ui_writer.print_agent_response(&filtered_content); - self.ui_writer.flush(); - iter.current_response.push_str(&filtered_content); + let deduped_text = streaming::dedup_consecutive_text( + &iter.current_response, + &filtered_content, + ); + if deduped_text.dropped_events > 0 { + self.dropped_duplicate_text_chunks_turn += + deduped_text.dropped_events; + self.dropped_duplicate_text_chars_turn += + deduped_text.dropped_chars; + } + if !deduped_text.content.is_empty() { + self.ui_writer.print_agent_response(&deduped_text.content); + self.ui_writer.flush(); + iter.current_response.push_str(&deduped_text.content); + } } } } @@ -2759,7 +2878,9 @@ Skip if nothing new. Be brief."#; // Don't re-add text from parser buffer if we already displayed it // The parser buffer contains ALL accumulated text, but current_response // already has what was displayed during streaming - if iter.current_response.is_empty() && !text_content.trim().is_empty() { + if iter.current_response.is_empty() + && !text_content.trim().is_empty() + { // Only use parser text if we truly have no response // This should be rare - only if streaming failed to display anything debug!("Warning: Using parser buffer text as fallback - this may duplicate output"); @@ -2771,7 +2892,8 @@ Skip if nothing new. Be brief."#; self.ui_writer.filter_json_tool_calls(&clean_text); // Only use this if we truly have nothing else - if !filtered_text.trim().is_empty() && state.full_response.is_empty() + if !filtered_text.trim().is_empty() + && state.full_response.is_empty() { debug!( "Using filtered parser text as last resort: {} chars", @@ -2809,7 +2931,9 @@ Skip if nothing new. Be brief."#; debug!("Tools were executed in previous iterations, breaking to finalize"); // IMPORTANT: Save any text response to context window before breaking // This ensures text displayed after tool execution is not lost - if !iter.current_response.trim().is_empty() && !state.assistant_message_added { + if !iter.current_response.trim().is_empty() + && !state.assistant_message_added + { debug!("Saving current_response ({} chars) to context before finalization", iter.current_response.len()); let assistant_msg = Message::new( MessageRole::Assistant, @@ -2829,7 +2953,9 @@ Skip if nothing new. Be brief."#; // Save assistant message before returning (no tools were executed) // This ensures text-only responses are saved to context - if !iter.current_response.trim().is_empty() && !state.assistant_message_added { + if !iter.current_response.trim().is_empty() + && !state.assistant_message_added + { debug!("Saving current_response ({} chars) to context before early return", iter.current_response.len()); let assistant_msg = Message::new( MessageRole::Assistant, @@ -2922,7 +3048,8 @@ Skip if nothing new. Be brief."#; state.full_response.len() ); - let has_response = !iter.current_response.is_empty() || !state.full_response.is_empty(); + let has_response = + !iter.current_response.is_empty() || !state.full_response.is_empty(); // Check if the response is essentially empty (just whitespace or timing lines) // Check if there's an incomplete tool call in the buffer (for debugging) @@ -3024,12 +3151,12 @@ Skip if nothing new. Be brief."#; // Check plan approval gate after tool execution (only in plan mode) if self.in_plan_mode { if let Some(session_id) = &self.session_id { - if let ApprovalGateResult::Blocked { message } = - check_plan_approval_gate(session_id, working_dir, &self.baseline_dirty_files) - { - // Return the blocking message instead of the tool result - return Ok(message); - } + if let ApprovalGateResult::Blocked { message } = + check_plan_approval_gate(session_id, working_dir, &self.baseline_dirty_files) + { + // Return the blocking message instead of the tool result + return Ok(message); + } } } @@ -3183,7 +3310,12 @@ mod tool_timeout_tests { fn test_webdriver_tools_have_8_minute_timeout() { for tool in ["webdriver_start", "webdriver_navigate", "webdriver_click"] { let timeout = get_tool_timeout(tool); - assert_eq!(timeout, Duration::from_secs(8 * 60), "Tool {} should have 8 min timeout", tool); + assert_eq!( + timeout, + Duration::from_secs(8 * 60), + "Tool {} should have 8 min timeout", + tool + ); } } @@ -3191,11 +3323,25 @@ mod tool_timeout_tests { fn test_only_research_has_extended_timeout() { // List of all tools that should have 8-minute timeout let standard_tools = [ - "shell", "read_file", "write_file", "str_replace", "read_image", - "screenshot", "code_search", "todo_read", "todo_write", "remember", - "rehydrate", "coverage", "background_process", - "webdriver_start", "webdriver_navigate", "webdriver_click", - "webdriver_send_keys", "webdriver_find_element", "webdriver_quit", + "shell", + "read_file", + "write_file", + "str_replace", + "read_image", + "screenshot", + "code_search", + "todo_read", + "todo_write", + "remember", + "rehydrate", + "coverage", + "background_process", + "webdriver_start", + "webdriver_navigate", + "webdriver_click", + "webdriver_send_keys", + "webdriver_find_element", + "webdriver_quit", ]; for tool in standard_tools { diff --git a/crates/g3-core/src/streaming.rs b/crates/g3-core/src/streaming.rs index 76d8527..5e41692 100644 --- a/crates/g3-core/src/streaming.rs +++ b/crates/g3-core/src/streaming.rs @@ -7,6 +7,7 @@ use crate::context_window::ContextWindow; use crate::streaming_parser::StreamingToolParser; use crate::ToolCall; use g3_providers::{CompletionRequest, MessageRole}; +use serde_json::{Map, Value}; use std::time::{Duration, Instant}; use tracing::{debug, error}; @@ -47,7 +48,8 @@ impl StreamingState { } pub fn get_ttft(&self) -> Duration { - self.first_token_time.unwrap_or_else(|| self.stream_start.elapsed()) + self.first_token_time + .unwrap_or_else(|| self.stream_start.elapsed()) } } @@ -109,10 +111,9 @@ pub fn is_compact_tool(tool_name: &str) -> bool { } pub fn is_self_handled_tool(tool_name: &str) -> bool { - matches!(tool_name, - "todo_read" | "todo_write" | - "plan_read" | "plan_write" | - "write_envelope" + matches!( + tool_name, + "todo_read" | "todo_write" | "plan_read" | "plan_write" | "write_envelope" ) } @@ -210,7 +211,8 @@ impl IterationState { chunk.tool_calls )); } else if self.raw_chunks.len() == 20 { - self.raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string()); + self.raw_chunks + .push("... (chunks 21+ omitted for brevity) ...".to_string()); } self.chunks_received += 1; } @@ -261,10 +263,7 @@ pub fn format_timing_footer( // Add token usage info if available (dimmed) if let Some(tokens) = turn_tokens { - format!( - "{} {} ◉ | {:.0}%", - timing, tokens, context_percentage - ) + format!("{} {} ◉ | {:.0}%", timing, tokens, context_percentage) } else { format!("{} {:.0}%", timing, context_percentage) } @@ -286,15 +285,21 @@ pub fn log_stream_error( error!("Iteration: {}/{}", iteration_count, MAX_ITERATIONS); error!("Provider: {} (model: {})", provider_name, provider_model); error!("Chunks received: {}", chunks_received); - + error!("Parser state:"); error!(" - Text buffer length: {}", parser.text_buffer_len()); error!(" - Text buffer content: {:?}", parser.get_text_content()); - error!(" - Has incomplete tool call: {}", parser.has_incomplete_tool_call()); + error!( + " - Has incomplete tool call: {}", + parser.has_incomplete_tool_call() + ); error!(" - Message stopped: {}", parser.is_message_stopped()); error!(" - In JSON tool call: {}", parser.is_in_json_tool_call()); - error!(" - JSON tool start: {:?}", parser.json_tool_start_position()); - + error!( + " - JSON tool start: {:?}", + parser.json_tool_start_position() + ); + error!("Request details:"); error!(" - Messages count: {}", request.messages.len()); error!(" - Has tools: {}", request.tools.is_some()); @@ -335,7 +340,10 @@ pub fn log_stream_error( " - Used tokens: {}/{}", context_window.used_tokens, context_window.total_tokens ); - error!(" - Percentage used: {:.1}%", context_window.percentage_used()); + error!( + " - Percentage used: {:.1}%", + context_window.percentage_used() + ); error!( " - Conversation history length: {}", context_window.conversation_history.len() @@ -395,10 +403,15 @@ pub fn format_tool_arg_value(tool_name: &str, key: &str, value: &serde_json::Val } /// Format tool output lines for display, respecting machine vs human mode. -pub fn format_tool_output_summary(output: &str, max_lines: usize, max_width: usize, wants_full: bool) -> Vec { +pub fn format_tool_output_summary( + output: &str, + max_lines: usize, + max_width: usize, + wants_full: bool, +) -> Vec { let lines: Vec<&str> = output.lines().collect(); let limit = if wants_full { lines.len() } else { max_lines }; - + lines .iter() .take(limit) @@ -515,8 +528,14 @@ pub fn format_code_search_summary(result: &str) -> String { } else if let Some(json_start) = result.find('{') { // Try to parse the JSON to extract summary stats if let Ok(parsed) = serde_json::from_str::(&result[json_start..]) { - let matches = parsed.get("total_matches").and_then(|v| v.as_u64()).unwrap_or(0); - let files = parsed.get("total_files_searched").and_then(|v| v.as_u64()).unwrap_or(0); + let matches = parsed + .get("total_matches") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + let files = parsed + .get("total_files_searched") + .and_then(|v| v.as_u64()) + .unwrap_or(0); return format!("🔍 {} matches in {} files", matches, files); } "🔍 search complete".to_string() @@ -609,6 +628,105 @@ where result } +/// Build a deterministic fingerprint for a tool call using tool name and normalized args. +/// Object keys are sorted recursively so equivalent JSON payloads hash identically. +pub fn tool_fingerprint(tool_name: &str, args: &Value) -> String { + fn normalize(value: &Value) -> Value { + match value { + Value::Object(obj) => { + let mut keys: Vec<&String> = obj.keys().collect(); + keys.sort(); + let mut normalized = Map::new(); + for key in keys { + if let Some(val) = obj.get(key) { + normalized.insert(key.clone(), normalize(val)); + } + } + Value::Object(normalized) + } + Value::Array(arr) => Value::Array(arr.iter().map(normalize).collect()), + _ => value.clone(), + } + } + + let normalized_args = normalize(args); + let normalized_json = + serde_json::to_string(&normalized_args).unwrap_or_else(|_| String::from("{}")); + format!("{tool_name}|{normalized_json}") +} + +/// Tools that are expected to be called repeatedly with identical args as a polling pattern. +/// These should not be blocked by turn-scoped duplicate suppression. +pub fn is_turn_dedup_exempt_tool(tool_name: &str) -> bool { + matches!(tool_name, "research_status") +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct DedupTextResult { + pub content: String, + pub dropped_chars: usize, + pub dropped_events: u32, +} + +/// Conservative dedup for streaming text: +/// - drops exact consecutive duplicates already present at the tail of `previous`, +/// - collapses exact in-chunk repetition (e.g. "abcabc" -> "abc"), +/// - does not attempt overlap merge for partial matches. +pub fn dedup_consecutive_text(previous: &str, incoming: &str) -> DedupTextResult { + if incoming.is_empty() { + return DedupTextResult::default(); + } + + let mut dropped_chars = 0usize; + let mut dropped_events = 0u32; + + let (collapsed, internal_dropped_chars) = collapse_exact_repetition(incoming); + if internal_dropped_chars > 0 { + dropped_chars += internal_dropped_chars; + dropped_events += 1; + } + + if !collapsed.is_empty() && previous.ends_with(&collapsed) { + dropped_chars += collapsed.len(); + dropped_events += 1; + return DedupTextResult { + content: String::new(), + dropped_chars, + dropped_events, + }; + } + + DedupTextResult { + content: collapsed, + dropped_chars, + dropped_events, + } +} + +fn collapse_exact_repetition(incoming: &str) -> (String, usize) { + if incoming.is_empty() { + return (String::new(), 0); + } + + let len = incoming.len(); + let mut unit_boundaries: Vec = incoming.char_indices().map(|(i, _)| i).collect(); + unit_boundaries.push(len); + + for &unit_len in unit_boundaries.iter().skip(1) { + if unit_len == len || len % unit_len != 0 { + continue; + } + + let unit = &incoming[..unit_len]; + let repeats = len / unit_len; + if repeats > 1 && unit.repeat(repeats) == incoming { + return (unit.to_string(), len - unit_len); + } + } + + (incoming.to_string(), 0) +} + // ============================================================================= // Auto-Continue Decision Logic // ============================================================================= @@ -691,8 +809,14 @@ mod tests { assert_eq!(truncate_for_display("short", 10), "short"); assert_eq!(truncate_for_display("this is long", 5), "this ..."); // Multi-line input should only use first line - assert_eq!(truncate_for_display("first line\nsecond line", 20), "first line"); - assert_eq!(truncate_for_display("❌ Error\nDetails here", 10), "❌ Error"); + assert_eq!( + truncate_for_display("first line\nsecond line", 20), + "first line" + ); + assert_eq!( + truncate_for_display("❌ Error\nDetails here", 10), + "❌ Error" + ); } #[test] @@ -713,10 +837,16 @@ mod tests { #[test] fn test_format_tool_arg_value_shell_command() { let val = serde_json::json!("echo hello\necho world"); - assert_eq!(format_tool_arg_value("shell", "command", &val), "echo hello..."); - + assert_eq!( + format_tool_arg_value("shell", "command", &val), + "echo hello..." + ); + let single_line = serde_json::json!("ls -la"); - assert_eq!(format_tool_arg_value("shell", "command", &single_line), "ls -la"); + assert_eq!( + format_tool_arg_value("shell", "command", &single_line), + "ls -la" + ); } #[test] @@ -731,7 +861,10 @@ mod tests { #[test] fn test_format_read_file_summary() { assert_eq!(format_read_file_summary(42, 500), "42 lines (500 chars)"); - assert_eq!(format_read_file_summary(100, 1500), "100 lines (1.5k chars)"); + assert_eq!( + format_read_file_summary(100, 1500), + "100 lines (1.5k chars)" + ); } #[test] @@ -745,12 +878,20 @@ mod tests { #[test] fn test_deduplicate_tool_calls_no_duplicates() { let tools = vec![ - ToolCall { tool: "shell".to_string(), args: serde_json::json!({"command": "ls"}), id: String::new() }, - ToolCall { tool: "read_file".to_string(), args: serde_json::json!({"path": "foo.rs"}), id: String::new() }, + ToolCall { + tool: "shell".to_string(), + args: serde_json::json!({"command": "ls"}), + id: String::new(), + }, + ToolCall { + tool: "read_file".to_string(), + args: serde_json::json!({"path": "foo.rs"}), + id: String::new(), + }, ]; - + let result = deduplicate_tool_calls(tools, |_| None); - + assert_eq!(result.len(), 2); assert!(result[0].1.is_none()); assert!(result[1].1.is_none()); @@ -759,12 +900,20 @@ mod tests { #[test] fn test_deduplicate_tool_calls_sequential_duplicate() { let tools = vec![ - ToolCall { tool: "shell".to_string(), args: serde_json::json!({"command": "ls"}), id: String::new() }, - ToolCall { tool: "shell".to_string(), args: serde_json::json!({"command": "ls"}), id: String::new() }, + ToolCall { + tool: "shell".to_string(), + args: serde_json::json!({"command": "ls"}), + id: String::new(), + }, + ToolCall { + tool: "shell".to_string(), + args: serde_json::json!({"command": "ls"}), + id: String::new(), + }, ]; - + let result = deduplicate_tool_calls(tools, |_| None); - + assert_eq!(result.len(), 2); assert!(result[0].1.is_none(), "First should not be duplicate"); assert_eq!(result[1].1, Some("DUP IN CHUNK".to_string())); @@ -772,17 +921,76 @@ mod tests { #[test] fn test_deduplicate_tool_calls_previous_message_duplicate() { - let tools = vec![ - ToolCall { tool: "shell".to_string(), args: serde_json::json!({"command": "ls"}), id: String::new() }, - ]; - + let tools = vec![ToolCall { + tool: "shell".to_string(), + args: serde_json::json!({"command": "ls"}), + id: String::new(), + }]; + // Simulate finding a duplicate in previous message let result = deduplicate_tool_calls(tools, |_| Some("DUP IN MSG".to_string())); - + assert_eq!(result.len(), 1); assert_eq!(result[0].1, Some("DUP IN MSG".to_string())); } + #[test] + fn test_tool_fingerprint_normalizes_object_key_order() { + let left = serde_json::json!({ + "z": 1, + "a": {"b": 2, "a": 1}, + "arr": [{"y": 2, "x": 1}] + }); + let right = serde_json::json!({ + "arr": [{"x": 1, "y": 2}], + "a": {"a": 1, "b": 2}, + "z": 1 + }); + + assert_eq!( + tool_fingerprint("shell", &left), + tool_fingerprint("shell", &right) + ); + } + + #[test] + fn test_turn_dedup_exempt_tool() { + assert!(is_turn_dedup_exempt_tool("research_status")); + assert!(!is_turn_dedup_exempt_tool("shell")); + } + + #[test] + fn test_dedup_consecutive_text_drops_tail_duplicate() { + let result = dedup_consecutive_text("hello world", "world"); + assert_eq!(result.content, ""); + assert_eq!(result.dropped_events, 1); + assert_eq!(result.dropped_chars, "world".len()); + } + + #[test] + fn test_dedup_consecutive_text_collapses_internal_repeat() { + let result = dedup_consecutive_text("", "Yes. Yes. Yes. "); + assert_eq!(result.content, "Yes. "); + assert_eq!(result.dropped_events, 1); + assert_eq!( + result.dropped_chars, + "Yes. Yes. Yes. ".len() - "Yes. ".len() + ); + } + + #[test] + fn test_dedup_consecutive_text_keeps_non_duplicate_content() { + let result = dedup_consecutive_text("abc", "bcx"); + assert_eq!( + result, + DedupTextResult { + content: "bcx".to_string(), + dropped_chars: 0, + dropped_events: 0, + } + ); + } + #[test] fn test_should_auto_continue_not_autonomous() { // Never auto-continue in interactive mode @@ -793,19 +1001,31 @@ mod tests { #[test] fn test_should_auto_continue_autonomous() { use AutoContinueReason::*; - + // Tools executed - assert_eq!(should_auto_continue(true, true, false, false, false), Some(ToolsExecuted)); - + assert_eq!( + should_auto_continue(true, true, false, false, false), + Some(ToolsExecuted) + ); + // Incomplete tool call - assert_eq!(should_auto_continue(true, false, true, false, false), Some(IncompleteToolCall)); - + assert_eq!( + should_auto_continue(true, false, true, false, false), + Some(IncompleteToolCall) + ); + // Unexecuted tool call - assert_eq!(should_auto_continue(true, false, false, true, false), Some(UnexecutedToolCall)); - + assert_eq!( + should_auto_continue(true, false, false, true, false), + Some(UnexecutedToolCall) + ); + // Max tokens truncation - assert_eq!(should_auto_continue(true, false, false, false, true), Some(MaxTokensTruncation)); - + assert_eq!( + should_auto_continue(true, false, false, false, true), + Some(MaxTokensTruncation) + ); + // Nothing special - no auto-continue assert_eq!(should_auto_continue(true, false, false, false, false), None); }