diff --git a/docs/plans/2026-02-21-tool-validation-loop-and-streaming-corruption-fix.md b/docs/plans/2026-02-21-tool-validation-loop-and-streaming-corruption-fix.md new file mode 100644 index 00000000..ee923cf2 --- /dev/null +++ b/docs/plans/2026-02-21-tool-validation-loop-and-streaming-corruption-fix.md @@ -0,0 +1,119 @@ +# Fix Tool Validation Error Loops and Anthropic Streaming Corruption + +**Date:** 2026-02-21 +**Status:** Proposed +**Trigger:** Lab-bench pipeline run `01KJ1T17CWJEXEPGZNAGCA2REA` failed at `consolidate_dod` due to a tool schema validation loop, and `dod_b` failed due to Anthropic API context corruption. + +## Problem Statement + +Two distinct failure modes caused a lab-bench pipeline run to fail and waste tokens/time: + +1. **Anthropic streaming codec corruption** — The `dod_b` branch failed with: `tool_use ids were found without tool_result blocks immediately after`. The conversation history became malformed (a tool call existed without a corresponding tool result), and the Anthropic API rejected further requests with a 400 error. + +2. **Tool schema validation loop** — The `consolidate_dod` agent repeatedly called the `shell` tool without the required `command` parameter. Each call failed schema validation and the error was reported back to the model, but the model kept making the same malformed call. The run was eventually killed by signal termination. + +## Root Cause Analysis + +### Failure 1: Anthropic streaming adapter concatenation bug + +**Location:** `internal/llm/providers/anthropic/adapter.go` (tool_use streaming, ~lines 392-450) +**Related investigation:** `docs/plans/2026-02-10-kimi-tool-call-investigation-notes.md` + +When the Anthropic provider emits both `content_block_start.tool_use.input` and `content_block_delta.input_json_delta.partial_json`, the adapter concatenates both values, producing invalid JSON. This causes: + +1. Tool result validation fails on the malformed JSON +2. Error reported back to model as truncated tool result +3. Model retries with further malformed args +4. Context accumulates orphaned `tool_use` blocks without `tool_result` blocks +5. Anthropic API rejects the entire conversation with HTTP 400 + +**Why it doesn't recover:** The retry policy in `internal/llm/retry_util.go` (2 retries, 1–60s exponential backoff) classifies HTTP 400 as a non-retryable `InvalidRequestError`. There is no special handling for this specific error pattern. + +### Failure 2: No tool-level circuit breaker + +**Locations:** +- Tool validation: `internal/agent/tool_registry.go` (lines 97–126) +- Shell tool schema: `internal/agent/profile.go` (lines 291–306) +- Stall watchdog: `internal/attractor/engine/engine.go` (lines 1602–1635) + +The `shell` tool requires a `command` field. When the model omits it, `ExecuteCall()` validates against the JSON schema, fails, and returns the error message to the model with `IsError=true`. However: + +- **No loop detection** — Each validation failure is handled independently. There is no tracking of consecutive identical failures for the same tool. +- **No circuit breaker** — The model can loop on the same malformed call indefinitely, burning tokens. +- **Stall watchdog is the only safety net** — If configured, the engine's stall watchdog detects no progress and cancels the run with SIGTERM. But `StallTimeout` may not be set in all run configs. + +### Parallel branch failure propagation + +**Location:** `internal/attractor/engine/parallel_handlers.go` (lines 490–627) + +When `dod_b` failed, the `FanInHandler` still received its result (with `StatusFail`). The consolidation step proceeded with the successful branches (`dod_a`, `dod_c`), but the consolidator itself then hit the schema validation loop described above. The branch failure did not directly cause the consolidator failure — these are independent issues that compounded. + +## Proposed Fixes + +### Fix 1: Anthropic streaming adapter — implement Option A (source precedence) + +**File:** `internal/llm/providers/anthropic/adapter.go` + +This bug was fully reproduced and localized in `docs/plans/2026-02-10-kimi-tool-call-investigation-notes.md`, which includes regression tests and two proposed fix options. Implement **Option A (source precedence)**: track per-tool-call arg source (`start_input` vs `delta_stream`) separately, and at finalize prefer the delta buffer if any deltas were received, else fall back to start-input. + +The Kimi investigation already provides the failing test fixtures (`internal/llm/providers/anthropic/testdata/kimi_tool_call_sequences.ndjson`) and regression tests that should pass once the fix is applied. + +### Fix 2: Tool validation circuit breaker + +**File:** `internal/agent/tool_registry.go` + +Add a per-tool consecutive failure counter to `ExecuteCall()`. After N consecutive schema validation failures for the same tool (suggest N=3), escalate the error: + +- Inject a stronger error message: `"Tool '{name}' has failed schema validation {N} times consecutively. The required fields are: {required_fields}. Do NOT call this tool again without providing all required fields."` +- Optionally: fail the stage deterministically with `failure_class=deterministic` to trigger the graph's retry/postmortem routing instead of looping further. + +The counter resets on any successful tool call. + +### Fix 3: Ensure StallTimeout is configured + +**File:** Run config YAML files and/or engine defaults + +The stall watchdog in `engine.go` (lines 1602–1635) is the last line of defense against infinite loops, but it requires `StallTimeout` to be set. Either: + +- Set a default `StallTimeout` of 5–10 minutes in the engine if none is configured +- Add `stall_timeout_ms: 300000` to the standard run config templates + +### Fix 4: Classify tool_use/tool_result mismatch as recoverable + +**File:** `internal/llm/retry_util.go` or `internal/llm/providers/anthropic/adapter.go` + +Detect the specific Anthropic 400 error pattern (`tool_use ids were found without tool_result blocks`) and handle it specially: + +- Option A: Retry by reconstructing the conversation — drop the orphaned `tool_use` block from the message history and re-send +- Option B: Classify as `transient_infra` so the stage-level retry policy can restart the stage cleanly + +This prevents a single streaming glitch from permanently failing a branch. + +## Priority + +| Fix | Impact | Effort | Priority | +|-----|--------|--------|----------| +| Fix 1: Streaming adapter | Prevents context corruption entirely | Low | **P0** | +| Fix 2: Circuit breaker | Prevents token waste on validation loops | Medium | **P1** | +| Fix 3: StallTimeout default | Ensures hung runs are killed | Low | **P1** | +| Fix 4: Recoverable mismatch | Allows branches to self-heal | Medium | **P2** | + +## Verification + +1. Run the lab-bench pipeline end-to-end and confirm all 3 DoD branches complete without API corruption errors +2. Simulate a schema validation loop (model calling `shell` without `command`) and confirm the circuit breaker fires after 3 attempts +3. Confirm `StallTimeout` is set in run configs and that a stalled stage is killed within the configured window +4. Simulate an orphaned `tool_use` block and confirm the adapter either prevents it (Fix 1) or recovers from it (Fix 4) + +## Key File References + +| Component | File | Lines | +|-----------|------|-------| +| Tool validation | `internal/agent/tool_registry.go` | 97–142 | +| Shell tool def | `internal/agent/profile.go` | 291–306 | +| Anthropic adapter | `internal/llm/providers/anthropic/adapter.go` | ~392–450 | +| Retry policy | `internal/llm/retry_util.go` | 27–40, 50–87 | +| Stall watchdog | `internal/attractor/engine/engine.go` | 1602–1635 | +| Parallel fan-in | `internal/attractor/engine/parallel_handlers.go` | 490–627 | +| Process termination | `internal/agent/env_local_unix.go` | 14–26 | +| Prior investigation | `docs/plans/2026-02-10-kimi-tool-call-investigation-notes.md` | — | diff --git a/internal/agent/tool_registry.go b/internal/agent/tool_registry.go index ad875cec..d8b8d5d1 100644 --- a/internal/agent/tool_registry.go +++ b/internal/agent/tool_registry.go @@ -50,12 +50,16 @@ type RegisteredTool struct { } type ToolRegistry struct { - mu sync.RWMutex - tools map[string]RegisteredTool + mu sync.RWMutex + tools map[string]RegisteredTool + validationFailures map[string]int // consecutive validation failures per tool } func NewToolRegistry() *ToolRegistry { - return &ToolRegistry{tools: map[string]RegisteredTool{}} + return &ToolRegistry{ + tools: map[string]RegisteredTool{}, + validationFailures: map[string]int{}, + } } func (r *ToolRegistry) Register(t RegisteredTool) error { @@ -113,6 +117,7 @@ func (r *ToolRegistry) ExecuteCall(ctx context.Context, env ExecutionEnvironment if len(call.Arguments) > 0 { if err := json.Unmarshal(call.Arguments, &args); err != nil { msg := fmt.Sprintf("invalid tool arguments JSON: %v", err) + msg = r.recordValidationFailure(name, msg, t) return truncateResult(name, callID, msg, true, t.Limit) } } @@ -122,9 +127,12 @@ func (r *ToolRegistry) ExecuteCall(ctx context.Context, env ExecutionEnvironment if err := t.Schema.Validate(args); err != nil { msg := fmt.Sprintf("tool args schema validation failed: %v", err) + msg = r.recordValidationFailure(name, msg, t) return truncateResult(name, callID, msg, true, t.Limit) } + r.resetValidationFailures(name) + v, err := t.Exec(ctx, env, args) if err != nil { full := "" @@ -141,6 +149,57 @@ func (r *ToolRegistry) ExecuteCall(ctx context.Context, env ExecutionEnvironment return truncateResult(name, callID, full, false, t.Limit) } +const circuitBreakerThreshold = 3 + +// recordValidationFailure increments the consecutive validation failure counter +// for the given tool and returns the error message, potentially with an escalation +// suffix if the threshold has been reached. +func (r *ToolRegistry) recordValidationFailure(name, msg string, t RegisteredTool) string { + r.mu.Lock() + if r.validationFailures == nil { + r.validationFailures = map[string]int{} + } + r.validationFailures[name]++ + count := r.validationFailures[name] + r.mu.Unlock() + + if count >= circuitBreakerThreshold { + fields := requiredFields(t.Definition.Parameters) + msg += fmt.Sprintf("\nCIRCUIT BREAKER: Tool '%s' has failed validation %d times consecutively. Required fields: %s. Do NOT call this tool again without providing all required fields.", name, count, strings.Join(fields, ", ")) + } + return msg +} + +func (r *ToolRegistry) resetValidationFailures(name string) { + r.mu.Lock() + delete(r.validationFailures, name) + r.mu.Unlock() +} + +// requiredFields extracts the "required" array from a tool's parameter schema. +func requiredFields(params map[string]any) []string { + if params == nil { + return nil + } + req, ok := params["required"] + if !ok { + return nil + } + switch v := req.(type) { + case []string: + return v + case []any: + out := make([]string, 0, len(v)) + for _, item := range v { + if s, ok := item.(string); ok { + out = append(out, s) + } + } + return out + } + return nil +} + func truncateResult(toolName, callID, full string, isErr bool, lim ToolOutputLimit) ToolExecResult { out := full out = truncateChars(out, lim.MaxChars, lim.Strategy) diff --git a/internal/agent/tool_registry_test.go b/internal/agent/tool_registry_test.go index 063a6d3a..c0cd09bf 100644 --- a/internal/agent/tool_registry_test.go +++ b/internal/agent/tool_registry_test.go @@ -306,6 +306,83 @@ func TestDefaultToolLimit_MatchesSpecTable(t *testing.T) { } } +func TestToolRegistry_CircuitBreaker_EscalatesAfterConsecutiveFailures(t *testing.T) { + r := NewToolRegistry() + if err := r.Register(RegisteredTool{ + Definition: llm.ToolDefinition{ + Name: "t", + Parameters: map[string]any{ + "type": "object", + "properties": map[string]any{ + "path": map[string]any{"type": "string"}, + "content": map[string]any{"type": "string"}, + }, + "required": []any{"path", "content"}, + }, + }, + Exec: func(ctx context.Context, env ExecutionEnvironment, args map[string]any) (any, error) { + return "ok", nil + }, + }); err != nil { + t.Fatalf("Register: %v", err) + } + env := NewLocalExecutionEnvironment(t.TempDir()) + + // First two failures: no circuit breaker message. + for i := 0; i < 2; i++ { + res := r.ExecuteCall(context.Background(), env, llm.ToolCallData{ + ID: fmt.Sprintf("c%d", i), + Name: "t", + Arguments: json.RawMessage(`{}`), + }) + if !res.IsError { + t.Fatalf("call %d: expected error", i) + } + if strings.Contains(res.Output, "CIRCUIT BREAKER") { + t.Fatalf("call %d: unexpected circuit breaker message", i) + } + } + + // Third failure: should trigger circuit breaker. + res := r.ExecuteCall(context.Background(), env, llm.ToolCallData{ + ID: "c2", + Name: "t", + Arguments: json.RawMessage(`{}`), + }) + if !res.IsError { + t.Fatalf("call 2: expected error") + } + if !strings.Contains(res.Output, "CIRCUIT BREAKER") { + t.Fatalf("expected CIRCUIT BREAKER in output, got: %q", res.Output) + } + if !strings.Contains(res.Output, "path") || !strings.Contains(res.Output, "content") { + t.Fatalf("expected required fields in output, got: %q", res.Output) + } + + // Successful call resets the counter. + res = r.ExecuteCall(context.Background(), env, llm.ToolCallData{ + ID: "c3", + Name: "t", + Arguments: json.RawMessage(`{"path":"a","content":"b"}`), + }) + if res.IsError { + t.Fatalf("expected success, got error: %q", res.Output) + } + + // Next failure should NOT trigger circuit breaker (counter was reset). + res = r.ExecuteCall(context.Background(), env, llm.ToolCallData{ + ID: "c4", + Name: "t", + Arguments: json.RawMessage(`{}`), + }) + if !res.IsError { + t.Fatalf("expected error") + } + if strings.Contains(res.Output, "CIRCUIT BREAKER") { + t.Fatalf("unexpected CIRCUIT BREAKER after reset, got: %q", res.Output) + } +} + func TestCompileSchema_DoesNotDependOnGetwd(t *testing.T) { temp := t.TempDir() oldWD, err := os.Getwd() diff --git a/internal/llm/errors.go b/internal/llm/errors.go index b9e58541..e2d4c65c 100644 --- a/internal/llm/errors.go +++ b/internal/llm/errors.go @@ -122,6 +122,9 @@ func classifyByMessage(base httpErrorBase) error { return &NotFoundError{base} case strings.Contains(lower, "unauthorized") || strings.Contains(lower, "invalid key"): return &AuthenticationError{base} + case strings.Contains(lower, "tool_use ids were found without tool_result"): + base.retryable = true + return &ServerError{base} } return nil } diff --git a/internal/llm/errors_test.go b/internal/llm/errors_test.go index 0d8ffbae..4f0c61bc 100644 --- a/internal/llm/errors_test.go +++ b/internal/llm/errors_test.go @@ -119,6 +119,20 @@ func TestQuotaExceededError_ImplementsErrorInterface(t *testing.T) { } } +func TestErrorFromHTTPStatus_ToolUseMismatch_IsRetryable(t *testing.T) { + err := ErrorFromHTTPStatus("anthropic", 400, "tool_use ids were found without tool_result blocks immediately after", nil, nil) + e, ok := err.(Error) + if !ok { + t.Fatalf("not an llm.Error: %T", err) + } + if !e.Retryable() { + t.Fatalf("expected retryable, got non-retryable") + } + if _, ok := err.(*ServerError); !ok { + t.Fatalf("expected *ServerError, got %T", err) + } +} + func TestErrorFromHTTPStatus_MessageBasedClassification(t *testing.T) { cases := []struct { name string