Skip to content

Latest commit

 

History

History
881 lines (707 loc) · 31.2 KB

File metadata and controls

881 lines (707 loc) · 31.2 KB

Claude Code v2.1.88 - 查询执行引擎分析

Architecture Diagram

Query Execution Flow

1. 架构总览

查询执行引擎是 Claude Code 的核心运行时,负责将用户输入转化为模型调用,协调工具执行, 管理上下文窗口,并处理各种错误恢复场景。引擎由以下关键模块组成:

src/
  query.ts              -- 查询主循环 (1729 行)
  QueryEngine.ts        -- 会话生命周期管理 (高层封装)
  query/
    config.ts           -- 运行时配置快照
    deps.ts             -- 依赖注入
    tokenBudget.ts      -- Token 预算追踪
    stopHooks.ts        -- Stop Hook 执行
  services/
    api/
      claude.ts         -- API 客户端 (流式/非流式)
      withRetry.ts      -- 重试与降级逻辑
      errors.ts         -- 错误分类与处理
    compact/
      autoCompact.ts    -- 自动压缩
      microCompact.ts   -- 微压缩 (工具结果裁剪)
      compact.ts        -- 完整压缩实现
    tools/
      StreamingToolExecutor.ts  -- 流式工具执行器
      toolOrchestration.ts      -- 工具编排 (并行/串行)
      toolExecution.ts          -- 单工具执行

2. 查询执行流程

2.1 入口与总体流程

QueryEngine.submitMessage() 到最终响应,请求经过以下完整路径:

QueryEngine.submitMessage()
  |
  v
query()  --  外层包装,管理 command lifecycle
  |
  v
queryLoop()  --  核心 while(true) 循环
  |
  +-- [每次迭代] -----------------------------------------------+
  |   1. 准备消息: applyToolResultBudget, snip, microcompact     |
  |   2. Context Collapse (可选)                                 |
  |   3. AutoCompact (超阈值时压缩)                               |
  |   4. 调用模型: deps.callModel (流式)                          |
  |   5. 流式接收 + 流式工具预执行 (StreamingToolExecutor)         |
  |   6. 错误恢复: PTL / max_output_tokens / 媒体错误            |
  |   7. Stop Hooks (无 tool_use 时)                             |
  |   8. Token Budget 检查 (无 tool_use 时)                      |
  |   9. 工具执行: runTools / getRemainingResults                 |
  |  10. 附件注入: memory, skill discovery, queued commands       |
  |  11. 刷新工具列表 (MCP 热加载)                                |
  |  12. maxTurns 检查                                           |
  |  13. 更新 state, continue                                    |
  +--------------------------------------------------------------+

2.2 ASCII 时序图

User Input         QueryEngine         query()          API            Tools
    |                   |                  |               |              |
    |--submitMessage--->|                  |               |              |
    |                   |---query()------->|               |              |
    |                   |                  |               |              |
    |                   |          [microcompact]          |              |
    |                   |          [autocompact]           |              |
    |                   |                  |               |              |
    |                   |                  |--callModel--->|              |
    |                   |                  |<--stream------|              |
    |                   |                  |               |              |
    |                   |                  |  [StreamingToolExecutor:     |
    |                   |                  |   addTool() 边流边执行]      |
    |                   |                  |               |              |
    |                   |                  |<--stream end--|              |
    |                   |                  |                              |
    |                   |                  |  needsFollowUp?             |
    |                   |                  |  NO: stopHooks -> return    |
    |                   |                  |  YES:                       |
    |                   |                  |---runTools/remaining-------->|
    |                   |                  |<--tool_results--------------|
    |                   |                  |                              |
    |                   |                  |  [attachments, budget check] |
    |                   |                  |  [state = next; continue]    |
    |                   |                  |                              |
    |<--yield messages--|<--yield---------|               |              |

3. QueryParams 与入口类型

export type QueryParams = {
  messages: Message[]
  systemPrompt: SystemPrompt
  userContext: { [k: string]: string }
  systemContext: { [k: string]: string }
  canUseTool: CanUseToolFn
  toolUseContext: ToolUseContext
  fallbackModel?: string
  querySource: QuerySource
  maxOutputTokensOverride?: number
  maxTurns?: number
  skipCacheWrite?: boolean
  taskBudget?: { total: number }
  deps?: QueryDeps               // 依赖注入,可选
}

query() 函数是一个 AsyncGenerator,通过 yield 逐步输出流事件、消息和工具结果:

export async function* query(
  params: QueryParams,
): AsyncGenerator<
  | StreamEvent
  | RequestStartEvent
  | Message
  | TombstoneMessage
  | ToolUseSummaryMessage,
  Terminal                        // 返回值:终止原因
>

4. QueryConfig -- 运行时门控

QueryConfig 在每次 query() 入口快照一次,整个查询循环期间不变:

export type QueryConfig = {
  sessionId: SessionId

  gates: {
    streamingToolExecution: boolean   // Statsig 门控:流式工具执行
    emitToolUseSummaries: boolean     // 环境变量:是否生成工具摘要
    isAnt: boolean                    // 内部用户标识
    fastModeEnabled: boolean          // Fast Mode 是否启用
  }
}

设计决策

  • Statsig 门控通过 CACHED_MAY_BE_STALE 读取,允许一定的过期性,因此在查询入口 快照一次是安全的。
  • feature() 门控故意不纳入 QueryConfig,因为它们是编译时 tree-shaking 边界, 必须保持内联以确保死代码消除。
  • buildQueryConfig() 是纯函数,无副作用,每次调用返回全新对象。

5. QueryDeps -- 依赖注入

export type QueryDeps = {
  callModel: typeof queryModelWithStreaming   // 模型调用
  microcompact: typeof microcompactMessages   // 微压缩
  autocompact: typeof autoCompactIfNeeded     // 自动压缩
  uuid: () => string                          // UUID 生成
}

export function productionDeps(): QueryDeps {
  return {
    callModel: queryModelWithStreaming,
    microcompact: microcompactMessages,
    autocompact: autoCompactIfNeeded,
    uuid: randomUUID,
  }
}

设计意图

引入 QueryDeps 的核心动机是可测试性。源码注释指出,在引入此模式前,callModelautocompact 各在 6-8 个测试文件中通过 spyOn + 模块导入的样板代码进行 mock。 通过在 QueryParams.deps 中传入 fake,测试可以直接注入替代实现。

当前范围故意收窄为 4 个依赖,作为模式验证。后续 PR 可扩展至 runToolshandleStopHookslogEvent 等。

6. 查询循环状态 (State)

type State = {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number
  hasAttemptedReactiveCompact: boolean
  maxOutputTokensOverride: number | undefined
  pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
  stopHookActive: boolean | undefined
  turnCount: number
  transition: Continue | undefined   // 上一次迭代的 continue 原因
}

每次 continue 时,通过赋值全新 State 对象(不可变模式)更新状态,而非逐字段修改。 transition 字段记录了上一次迭代为何 continue,用于测试断言和恢复路径判断。

Continue 原因枚举

根据代码中所有 state = { ... transition: ... } 赋值点,共有以下 continue 原因:

transition.reason 触发场景
next_turn 正常工具执行后进入下一轮
stop_hook_blocking Stop Hook 返回 blocking error
token_budget_continuation Token 预算未用完,注入 nudge 继续
reactive_compact_retry 响应式压缩后重试
collapse_drain_retry Context Collapse 排空后重试
max_output_tokens_escalate 输出 token 上限从 8k 升级到 64k
max_output_tokens_recovery 输出截断恢复(最多 3 次)

Terminal 返回原因

查询循环通过 return { reason: ... } 终止:

reason 含义
completed 正常完成
aborted_streaming 流式阶段被用户中断
aborted_tools 工具执行阶段被用户中断
blocking_limit 触发硬阻塞限制 (auto-compact 关闭时)
prompt_too_long PTL 恢复失败
image_error 图片/媒体错误
model_error 模型调用异常
max_turns 达到最大轮次限制
hook_stopped 工具 Hook 阻止继续
stop_hook_prevented Stop Hook 阻止继续

7. Token 预算管理

7.1 BudgetTracker

export type BudgetTracker = {
  continuationCount: number       // 已 continue 次数
  lastDeltaTokens: number         // 上一次检查的 delta
  lastGlobalTurnTokens: number    // 上一次检查时的总 token
  startedAt: number               // 追踪开始时间
}

7.2 决策逻辑

export function checkTokenBudget(
  tracker: BudgetTracker,
  agentId: string | undefined,
  budget: number | null,
  globalTurnTokens: number,
): TokenBudgetDecision

决策流程:

1. agentId 存在 / budget 为 null / budget <= 0 → stop (不适用)
2. 计算 pct = turnTokens / budget * 100
3. 计算 deltaSinceLastCheck
4. 检测 diminishing returns:
   - continuationCount >= 3
   - 连续两次 delta < 500 tokens
5. 如果非 diminishing 且 turnTokens < budget * 90% → continue
   - 注入 nudge message: "N% 已用, X/Y tokens"
6. 否则 → stop

关键阈值:

  • COMPLETION_THRESHOLD = 0.9 (90%):超过此比例停止
  • DIMINISHING_THRESHOLD = 500:连续两次增量小于 500 则视为收益递减

7.3 与 taskBudget 的区别

tokenBudget 是客户端的自动 continue 机制(TOKEN_BUDGET feature flag 控制), 用于在模型自然停止但 budget 未用完时自动注入 nudge 让模型继续。

taskBudget 是 API 层面的 output_config.task_budget,由服务端管理。客户端只在 压缩发生时计算 remaining(因为压缩后服务端看不到被摘要掉的历史)。

8. 消息压缩策略

8.1 压缩执行顺序

在每次循环迭代中,压缩按以下严格顺序执行:

applyToolResultBudget     -- 按 budget 裁剪工具结果大小
        |
        v
snipCompact               -- 历史裁剪 (HISTORY_SNIP feature)
        |
        v
microcompactMessages      -- 微压缩 (旧工具结果清理)
        |
        v
applyCollapsesIfNeeded    -- Context Collapse (CONTEXT_COLLAPSE feature)
        |
        v
autoCompactIfNeeded       -- 自动压缩 (全量摘要)

8.2 MicroCompact (微压缩)

文件: services/compact/microCompact.ts

微压缩针对特定工具的输出结果进行裁剪,不触发完整的模型摘要调用。

可压缩工具列表:

  • Read, Bash, Grep, Glob, WebSearch, WebFetch, Edit, Write

工作方式:

  1. 扫描消息历史中的 tool_result
  2. 对于上述工具类型的旧结果,按 token 预算裁剪或替换为 [Old tool result content cleared]
  3. 图片结果限制为 2000 tokens

Cached MicroCompact (CACHED_MICROCOMPACT feature):通过 API 的 cache editing 机制,直接在缓存层删除旧工具结果,避免重新发送。使用 cache_deleted_input_tokens 指标追踪实际删除量。

8.3 AutoCompact (自动压缩)

文件: services/compact/autoCompact.ts

当上下文 token 数超过阈值时,触发完整的对话摘要压缩。

阈值计算:

function getAutoCompactThreshold(model: string): number {
  const effectiveContextWindow = getEffectiveContextWindowSize(model)
  return effectiveContextWindow - AUTOCOMPACT_BUFFER_TOKENS  // 13,000
}

function getEffectiveContextWindowSize(model: string): number {
  const contextWindow = getContextWindowForModel(model)
  return contextWindow - reservedTokensForSummary  // 20,000
}

Token 告警阶梯:

级别 Buffer 含义
WARNING 20,000 tokens 接近上限,UI 显示警告
ERROR 20,000 tokens 临界状态
AUTOCOMPACT 13,000 tokens 触发自动压缩
BLOCKING 3,000 tokens 硬阻塞,禁止继续查询

失败熔断: 连续失败 3 次后停止重试 (MAX_CONSECUTIVE_AUTOCOMPACT_FAILURES = 3)。 此机制基于生产数据:曾有 1,279 个会话出现 50+ 次连续失败(最高 3,272 次), 每天浪费约 25 万次 API 调用。

压缩后状态更新:

tracking = {
  compacted: true,
  turnId: deps.uuid(),    // 新的 turn ID
  turnCounter: 0,         // 重置计数器
  consecutiveFailures: 0, // 重置失败计数
}
messagesForQuery = buildPostCompactMessages(compactionResult)

8.4 Reactive Compact (响应式压缩)

通过 feature('REACTIVE_COMPACT') 门控,动态导入 reactiveCompact.ts

与 AutoCompact 的区别:AutoCompact 是预防性的(在请求前检查阈值), Reactive Compact 是恢复性的(在 API 返回 prompt-too-long 错误后触发)。

触发路径:

API 返回 prompt-too-long
    |
    v
isWithheldPromptTooLong() == true  (错误被暂扣,不立即 yield)
    |
    v
!needsFollowUp (模型未请求工具)
    |
    v
先尝试 Context Collapse drain (如果启用)
    |
    v
tryReactiveCompact()
    |
    +-- 成功: buildPostCompactMessages, state.transition = reactive_compact_retry
    +-- 失败: yield 原错误消息, return prompt_too_long

重要约束: hasAttemptedReactiveCompact 标志确保每轮只尝试一次响应式压缩, 防止无限循环。此标志在 stop_hook_blocking 的 continue 路径中被保留 (不重置为 false),否则会导致:compact -> still too long -> error -> stop hook blocking -> compact -> ... 的死循环。

9. 工具编排

9.1 两种执行模式

传统模式 (runTools)

文件: services/tools/toolOrchestration.ts

export async function* runTools(
  toolUseMessages: ToolUseBlock[],
  assistantMessages: AssistantMessage[],
  canUseTool: CanUseToolFn,
  toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void>

通过 partitionToolCalls 将工具调用分为两类批次:

type Batch = { isConcurrencySafe: boolean; blocks: ToolUseBlock[] }

分区规则:

  1. 并发安全的连续工具 → 合并为一个批次,并行执行
  2. 非并发安全的工具 → 单独成批,串行执行
[Read, Read, Grep, Bash, Read, Read]
  |            |      |       |
  v            v      v       v
 Batch 1     Batch 2  Batch 3
 (parallel)  (serial) (parallel)

并发上限: CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY 环境变量,默认 10。

并发安全判定: 调用 tool.isConcurrencySafe(parsedInput) 方法。如果输入 解析失败或方法抛异常,保守地视为不安全。

流式执行模式 (StreamingToolExecutor)

文件: services/tools/StreamingToolExecutor.ts

streamingToolExecution Statsig 门控控制。核心区别:在模型流式响应期间, tool_use 块一出现就立即开始执行,不等待完整响应。

export class StreamingToolExecutor {
  private tools: TrackedTool[] = []
  private siblingAbortController: AbortController

  addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void
  getCompletedResults(): MessageUpdate[]      // 已完成结果(流中拉取)
  getRemainingResults(): AsyncGenerator<...>  // 流结束后获取剩余结果
  discard(): void                             // 降级时丢弃所有待执行任务
}

工具状态机:

queued --> executing --> completed --> yielded
                |
                +--> (error: sibling_error | user_interrupted | streaming_fallback)

并发控制:

private canExecuteTool(isConcurrencySafe: boolean): boolean {
  const executingTools = this.tools.filter(t => t.status === 'executing')
  return (
    executingTools.length === 0 ||
    (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
  )
}

即:如果当前没有工具在执行,或者当前所有执行中工具和待执行工具都是并发安全的, 则允许执行。非并发安全工具必须独占执行。

Sibling Abort: 当一个 Bash 工具出错时,通过 siblingAbortController 取消同批次的并行工具,而取消父级 abortController(不结束整个 turn)。

Fallback 处理: 当流式降级发生时,调用 discard() 丢弃所有已排队和进行中的 工具结果,创建新的 StreamingToolExecutor 实例,避免旧 tool_use_id 的结果泄漏。

9.2 两阶段结果收集

在流式执行模式下,工具结果分两阶段收集:

阶段 1 -- 流中收集 (streaming loop 内):

for (const result of streamingToolExecutor.getCompletedResults()) {
  if (result.message) {
    yield result.message
    toolResults.push(...)
  }
}

阶段 2 -- 流后收集 (streaming loop 后):

const toolUpdates = streamingToolExecutor
  ? streamingToolExecutor.getRemainingResults()
  : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)

for await (const update of toolUpdates) { ... }

10. API 客户端

10.1 queryModelWithStreaming

文件: services/api/claude.ts

export async function* queryModelWithStreaming({
  messages, systemPrompt, thinkingConfig,
  tools, signal, options,
}: {...}): AsyncGenerator<
  StreamEvent | AssistantMessage | SystemAPIErrorMessage,
  void
>

这是 QueryDeps.callModel 的生产实现。通过 withStreamingVCR 包装, 支持 VCR(请求录制/回放)功能。内部调用 queryModel 执行实际 API 请求。

10.2 重试逻辑 (withRetry)

文件: services/api/withRetry.ts

export async function* withRetry<T>(
  getClient: () => Promise<Anthropic>,
  operation: (client: Anthropic, attempt: number, context: RetryContext) => Promise<T>,
  options: RetryOptions,
): AsyncGenerator<SystemAPIErrorMessage, T>

关键常量:

  • DEFAULT_MAX_RETRIES = 10
  • BASE_DELAY_MS = 500
  • MAX_529_RETRIES = 3

重试策略:

错误类型 行为
529 (过载) 最多重试 3 次,仅前台查询源
429 (限流) 标准重试
401 (认证) 刷新 OAuth token 后重试
403 (token revoked) 刷新 token 后重试
ECONNRESET/EPIPE 禁用 keep-alive 后重试
prompt-too-long 不重试,返回合成错误消息

529 前台查询源白名单:

const FOREGROUND_529_RETRY_SOURCES = new Set<QuerySource>([
  'repl_main_thread', 'sdk', 'agent:custom', 'agent:default',
  'agent:builtin', 'compact', 'hook_agent', 'hook_prompt',
  'verification_agent', 'side_question', 'auto_mode', ...
])

非前台任务(摘要、标题生成、建议等)遇到 529 立即失败,避免在容量级联时 产生 3-10 倍的网关放大。

持久重试模式 (CLAUDE_CODE_UNATTENDED_RETRY):用于无人值守会话, 对 429/529 无限重试,最大退避 5 分钟,每 30 秒发送心跳,总上限 6 小时。

10.3 FallbackTriggeredError

export class FallbackTriggeredError extends Error {
  constructor(
    public readonly originalModel: string,
    public readonly fallbackModel: string,
  ) { ... }
}

当模型降级触发时,queryLoop 捕获此错误:

  1. 为所有孤立的 assistant messages 生成 tombstone
  2. 清空 assistantMessages、toolResults、toolUseBlocks
  3. 丢弃 StreamingToolExecutor 并重建
  4. 剥离 thinking signatures(Ant 内部模型特有)
  5. 使用 fallback model 重试

10.4 错误恢复矩阵

+------------------------+---------------------------+-------------------+
| 错误类型                | 恢复策略                    | 最大尝试次数       |
+------------------------+---------------------------+-------------------+
| prompt_too_long        | 1. Context Collapse drain  | 各 1 次           |
|                        | 2. Reactive Compact        |                   |
+------------------------+---------------------------+-------------------+
| max_output_tokens      | 1. Escalate 8k -> 64k     | 1 次              |
|                        | 2. 注入恢复消息重试          | 3 次              |
+------------------------+---------------------------+-------------------+
| media_size_error       | Reactive Compact (裁图)    | 1 次              |
+------------------------+---------------------------+-------------------+
| 529 过载               | 指数退避重试                | 3 次 (前台)       |
+------------------------+---------------------------+-------------------+
| model_fallback         | 切换到 fallback model      | 1 次              |
+------------------------+---------------------------+-------------------+

11. Stop Hooks

文件: query/stopHooks.ts

11.1 触发时机

Stop Hooks 在以下条件全部满足时执行:

  • needsFollowUp === false (模型未请求工具调用)
  • 最后一条消息不是 API 错误
  • 所有 PTL/max_output 恢复尝试已结束

11.2 函数签名

export async function* handleStopHooks(
  messagesForQuery: Message[],
  assistantMessages: AssistantMessage[],
  systemPrompt: SystemPrompt,
  userContext: { [k: string]: string },
  systemContext: { [k: string]: string },
  toolUseContext: ToolUseContext,
  querySource: QuerySource,
  stopHookActive?: boolean,
): AsyncGenerator<..., StopHookResult>

返回类型:

type StopHookResult = {
  blockingErrors: Message[]       // 需要模型处理的阻塞错误
  preventContinuation: boolean    // 是否阻止继续
}

11.3 执行内容

Stop Hooks 阶段执行大量后台任务:

同步等待的任务:

  1. executeStopHooks() -- 用户配置的 Stop hooks(shell 命令)
  2. Job 分类器 (classifyAndWriteState) -- 模板作业的状态分类,超时 60s

Fire-and-forget 任务:

  1. executePromptSuggestion() -- 提示建议生成
  2. executeExtractMemories() -- 记忆提取 (EXTRACT_MEMORIES feature)
  3. executeAutoDream() -- 自动梦境分析
  4. cleanupComputerUseAfterTurn() -- Computer Use 清理 (CHICAGO_MCP feature)

Teammate 扩展 hooks (isTeammate() 时):

  1. executeTaskCompletedHooks() -- 对每个 in_progress 状态的任务
  2. executeTeammateIdleHooks() -- 队友空闲通知

11.4 Stop Hook 与循环交互

stopHookResult.preventContinuation == true
  --> return { reason: 'stop_hook_prevented' }

stopHookResult.blockingErrors.length > 0
  --> state.transition = stop_hook_blocking
  --> state.stopHookActive = true  // 标记 hook 已激活
  --> continue  // 回到循环,让模型处理 blocking errors

死循环防护: Stop hooks 产生 blocking error 时,下一轮的 hasAttemptedReactiveCompact 被保留(不重置),防止 compact -> PTL -> stop hook blocking -> compact -> ... 循环。

11.5 跳过 Stop Hooks 的场景

  • --bare 模式:跳过 prompt suggestion、memory extraction、auto dream
  • API 错误消息:调用 executeStopFailureHooks 替代
  • 用户中断 (abort):直接返回

12. 查询循环详细流程

12.1 迭代前准备

1. Destructure state (messages, toolUseContext, tracking, ...)
2. Skill discovery prefetch (异步启动)
3. yield { type: 'stream_request_start' }
4. Initialize/increment queryTracking (chainId, depth)
5. messagesForQuery = getMessagesAfterCompactBoundary(messages)

12.2 消息预处理管线

messagesForQuery
  |
  v  applyToolResultBudget()        -- 裁剪过大的工具结果
  |
  v  snipCompactIfNeeded()          -- HISTORY_SNIP: 裁剪旧历史
  |
  v  deps.microcompact()            -- 微压缩旧工具输出
  |
  v  applyCollapsesIfNeeded()       -- CONTEXT_COLLAPSE: 折叠上下文
  |
  v  deps.autocompact()             -- 自动压缩 (如果超阈值)
  |
  v  messagesForQuery (最终版)

12.3 模型调用

for await (const message of deps.callModel({
  messages: prependUserContext(messagesForQuery, userContext),
  systemPrompt: fullSystemPrompt,
  thinkingConfig: toolUseContext.options.thinkingConfig,
  tools: toolUseContext.options.tools,
  signal: toolUseContext.abortController.signal,
  options: {
    model: currentModel,
    fastMode: appState.fastMode,
    fallbackModel,
    querySource,
    maxOutputTokensOverride,
    taskBudget: { total, remaining },
    effortValue: appState.effortValue,
    // ... 更多选项
  },
})) {
  // 处理每条流消息
}

12.4 流消息处理

对每条流消息执行:

  1. Fallback 检测streamingFallbackOccured 标志触发时,清空所有已收集数据
  2. backfillObservableInput:为 tool_use 块补充可观察字段(如展开文件路径)
  3. 错误暂扣:PTL、max_output_tokens、媒体错误消息暂不 yield
  4. 工具块收集tool_use 类型块加入 toolUseBlocks,设置 needsFollowUp = true
  5. 流式工具启动streamingToolExecutor.addTool() 立即开始执行
  6. 流中结果收集getCompletedResults() 拉取已完成的工具结果

12.5 流后分支

needsFollowUp?
  |
  +-- false (无工具调用) --------------------------+
  |   |                                           |
  |   +-- isWithheld413?                          |
  |   |   +-- Context Collapse drain -> continue  |
  |   |   +-- Reactive Compact -> continue        |
  |   |   +-- 无法恢复 -> yield error, return     |
  |   |                                           |
  |   +-- isWithheldMaxOutputTokens?              |
  |   |   +-- Escalate 8k->64k -> continue        |
  |   |   +-- Recovery message -> continue (<=3x) |
  |   |   +-- 恢复耗尽 -> yield error             |
  |   |                                           |
  |   +-- isApiErrorMessage? -> return completed  |
  |   |                                           |
  |   +-- handleStopHooks() -> 见 Stop Hooks 节   |
  |   |                                           |
  |   +-- checkTokenBudget() -> continue / stop   |
  |   |                                           |
  |   +-- return { reason: 'completed' }          |
  |                                               |
  +-- true (有工具调用) --------------------------+
      |
      +-- 执行剩余工具 (getRemainingResults / runTools)
      +-- 生成 tool use summary (异步)
      +-- 检查 abort
      +-- 检查 shouldPreventContinuation
      +-- 注入 attachments (memory, skills, commands)
      +-- 刷新工具列表
      +-- 检查 maxTurns
      +-- state = { ...next }; continue

12.6 Thinking 规则

源码中有一段"巫师注释"记录了 thinking blocks 的三条规则:

  1. 包含 thinking/redacted_thinking 的消息必须属于 max_thinking_length > 0 的查询
  2. thinking block 不能是消息中的最后一个块
  3. thinking blocks 必须在整个 assistant trajectory 期间保留 (单个 turn,或 tool_use -> tool_result -> 后续 assistant 的完整链)

违反这些规则会导致 API 400 错误("thinking blocks cannot be modified")。 Fallback 时通过 stripSignatureBlocks 清除以避免跨模型签名不兼容。

13. 工具使用摘要

当以下条件满足时,在工具执行完成后异步生成摘要:

  • emitToolUseSummaries 门控开启
  • 有工具调用 (toolUseBlocks.length > 0)
  • 未中断
  • 非子代理

摘要使用 Haiku 模型(约 1 秒),在下一次模型流式响应(5-30 秒)期间并行解析。 在下一次迭代开头,通过 pendingToolUseSummary await 并 yield。

nextPendingToolUseSummary = generateToolUseSummary({
  tools: toolInfoForSummary,
  signal: toolUseContext.abortController.signal,
  lastAssistantText,
}).then(summary => createToolUseSummaryMessage(summary, toolUseIds))
  .catch(() => null)

14. 附件注入

在工具执行之后、进入下一轮之前,引擎注入以下附件:

  1. 排队命令:通过 getCommandsByMaxPriority 获取待处理的通知

    • sleep 工具运行后拉取优先级 later 的命令
    • 否则只拉取 next 优先级
    • 斜杠命令被排除(必须通过 processSlashCommand 处理)
    • 子代理只拉取自己的 task-notification
  2. 记忆附件pendingMemoryPrefetch 在首次迭代启动,每次迭代尝试消费

    • 零等待:如果 prefetch 未完成则跳过,下次迭代再试
    • filterDuplicateMemoryAttachments 过滤已 Read/Write/Edit 的记忆文件
  3. 技能发现collectSkillDiscoveryPrefetch 消费预取结果

  4. 文件变更附件:编辑过的文件的差异信息

15. taskBudget 跨压缩追踪

let taskBudgetRemaining: number | undefined = undefined

taskBudget.remaining压缩发生时total 中扣除压缩前的上下文 token 数。 未压缩时为 undefined(服务端可以看到完整历史自行计算)。

逻辑:

压缩前: remaining = total (或上次计算的 remaining)
压缩后: remaining = max(0, remaining - preCompactContextTokens)

此值通过 configureTaskBudgetParams 传入 API 请求参数。

16. 性能优化要点

  1. 流式工具预执行:tool_use 块出现即开始执行,与模型流并行
  2. 摘要异步生成:Haiku 摘要在主模型流期间完成
  3. Memory prefetch:相关记忆在查询开始时即发起预取
  4. Skill prefetch:技能发现在每次迭代开始时启动
  5. DumpPromptsFetch 复用:每查询创建一次,避免内存积累(约 700KB vs 500MB)
  6. MCP 工具热刷新:通过 refreshTools() 在 turn 间更新
  7. 微压缩 + 自动压缩分层:轻量裁剪优先于重量级摘要

17. 小结

查询执行引擎的核心是一个 while(true) 循环,其复杂性来源于:

  • 多层压缩策略的精确协调(snip -> micro -> collapse -> auto -> reactive)
  • 流式执行与传统执行的统一抽象
  • 错误恢复的多级梯度(escalation -> recovery -> reactive compact -> bail)
  • 不可变状态管理:每次 continue 通过全新 State 对象传递
  • 依赖注入QueryDeps 使核心逻辑可测试

该引擎是典型的"状态机 + 生成器"模式:AsyncGeneratoryield 提供了 天然的暂停/恢复点,State 类型编码了所有跨迭代状态,transition 字段 记录了状态转移的原因,使得调试和测试都能精确追踪执行路径。