Claude Code Wiki
首页 深入解析 架构设计

查询引擎:QueryEngine 对话生命周期管理

高级 架构设计

QueryEngine 是 Claude Code 对话系统的核心编排器,负责管理完整的对话生命周期——从用户输入处理到 API 调用、工具执行、状态管理和错误恢复。作为状态机与事件流的结合体,它封装了与 Claude API 交互的所有复杂性,为上层提供清晰的异步生成器接口。

架构定位与核心职责

QueryEngine 位于系统的中间层,向下对接 Anthropic API 和工具执行系统,向上为 REPL 界面和 SDK 消费者提供统一的对话管理接口。每个 QueryEngine 实例对应一个完整的对话会话,通过 submitMessage() 方法启动新的对话轮次(turn),并在内部维护跨轮次的状态一致性。

Sources: QueryEngine.ts

三层架构模型

graph TB
    subgraph "上层消费者"
        REPL[REPL 界面<br/>screens/REPL.tsx]
        SDK[SDK 调用者<br/>headless/SDK]
        Agent[子代理<br/>AgentTool]
    end
    
    subgraph "QueryEngine 层"
        QE[QueryEngine<br/>状态管理器]
        Query[query()<br/>核心循环]
        Config[QueryConfig<br/>配置快照]
    end
    
    subgraph "下层服务"
        API[Anthropic API<br/>claude.ts]
        Tools[工具系统<br/>StreamingToolExecutor]
        Compact[压缩服务<br/>compact/reactive]
        Hooks[Stop Hooks<br/>stopHooks.ts]
    end
    
    REPL --> QE
    SDK --> QE
    Agent --> QE
    
    QE --> Query
    Query --> Config
    Query --> API
    Query --> Tools
    Query --> Compact
    Query --> Hooks
    
    style QE fill:#e1f5ff
    style Query fill:#fff4e6

QueryEngine 的核心职责包括:消息状态管理(维护跨轮次的对话历史、文件缓存、权限拒绝记录)、系统提示构建(整合自定义提示、内存机制、MCP 上下文)、查询循环协调(驱动 query() 函数执行多轮对话)、使用量追踪(累计 token 消耗、API 调用时长、成本计算)、持久化管理(会话存储、文件历史快照)、以及 SDK 消息转换(将内部消息格式转换为标准化的 SDK 事件流)。

Sources: QueryEngine.ts

QueryEngine 类设计

配置与初始化

QueryEngine 通过 QueryEngineConfig 接收完整的运行环境配置,包括工具集、命令列表、MCP 客户端、代理定义、权限检查函数、状态访问器等。构造函数初始化核心状态:消息数组、中止控制器、权限拒绝列表、总使用量统计、文件读取状态缓存。每个配置项都服务于特定的生命周期阶段——toolscommands 定义了可用的能力,canUseTool 实现权限验证钩子,getAppState/setAppState 提供全局状态访问,initialMessages 支持会话恢复。

export class QueryEngine {
  private config: QueryEngineConfig
  private mutableMessages: Message[]
  private abortController: AbortController
  private permissionDenials: SDKPermissionDenial[]
  private totalUsage: NonNullableUsage
  private readFileState: FileStateCache
  private discoveredSkillNames = new Set<string>()
  
  constructor(config: QueryEngineConfig) {
    this.config = config
    this.mutableMessages = config.initialMessages ?? []
    this.abortController = config.abortController ?? createAbortController()
    this.permissionDenials = []
    this.readFileState = config.readFileCache
    this.totalUsage = EMPTY_USAGE
  }
}

Sources: QueryEngine.ts

核心方法:submitMessage 生命周期

submitMessage() 是 QueryEngine 的主要入口点,实现了完整的对话轮次处理流程。该方法是一个异步生成器,逐步 yield SDK 消息事件,允许调用者实时接收进度更新和中间结果。整个流程可分为六个阶段:输入处理与系统提示构建查询循环执行消息流处理与转换预算与限制检查结果提取与最终化清理与持久化

sequenceDiagram
    participant User as 用户/SDK
    participant QE as QueryEngine
    participant PUI as processUserInput
    participant Query as query()
    participant API as Anthropic API
    participant Tools as 工具执行器
    
    User->>QE: submitMessage(prompt)
    QE->>PUI: 处理斜杠命令/附件
    PUI-->>QE: messagesFromUserInput
    QE->>QE: 构建系统提示
    QE->>QE: yield system_init 消息
    
    loop 查询循环 (每轮)
        QE->>Query: query(messages, config)
        Query->>Query: 压缩检查
        Query->>API: 流式 API 调用
        API-->>Query: 流式响应
        
        alt 有工具调用
            Query->>Tools: 执行工具
            Tools-->>Query: 工具结果
            Query->>Query: 准备下一轮
        else 无工具调用
            Query-->>QE: 终止状态
        end
        
        Query-->>QE: yield 消息流
        QE->>QE: 追踪使用量
        QE->>User: yield SDK 消息
    end
    
    QE->>QE: 提取文本结果
    QE->>QE: 持久化会话
    QE->>User: yield result 消息

Sources: QueryEngine.ts

query() 核心循环机制

query() 函数是 QueryEngine 的引擎核心,实现了无限循环状态机模式。循环通过 while (true) 持续运行,直到遇到明确的终止条件(错误、中止、最大轮次、完成)。每次迭代代表一个完整的对话轮次:压缩检查 → API 调用 → 工具执行 → 状态更新 → 继续判断。

状态管理与转换

query 循环维护一个 State 对象,封装了跨迭代的可变状态。通过在循环顶部解构状态、在继续点(continue site)重新赋值的方式,实现了状态的不可变更新模式。这种设计避免了分散的状态修改,使状态转换路径清晰可追踪。

type State = {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number
  hasAttemptedReactiveCompact: boolean
  maxOutputTokensOverride: number | undefined
  pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
  stopHookActive: boolean | undefined
  turnCount: number
  transition: Continue | undefined
}

// 循环内部的状态管理模式
let state: State = { /* 初始状态 */ }
while (true) {
  const { messages, toolUseContext, ... } = state  // 解构
  // ... 执行逻辑 ...
  state = { ...state, messages: newMessages }  // 更新并继续
  continue
}

Sources: query.ts

压缩策略层次

query 循环在 API 调用前依次应用四层压缩策略,每层针对不同的优化目标:

  1. Tool Result Budget:限制单个消息中的工具结果总大小,通过内容替换减少 token 消耗
  2. Snip Compact:移除僵尸消息和过期标记,释放内存并防止重复触发
  3. Microcompact:针对缓存工具调用进行优化,删除已失效的缓存条目
  4. Context Collapse:提供细粒度的上下文视图投影,支持渐进式压缩
  5. Auto Compact:当上下文接近模型限制时,自动生成对话摘要
graph LR
    A[原始消息] --> B[Tool Result Budget]
    B --> C[Snip Compact]
    C --> D[Microcompact]
    D --> E[Context Collapse]
    E --> F[Auto Compact]
    F --> G[压缩后消息]
    
    style A fill:#ffebee
    style G fill:#e8f5e9

每层压缩都是可选的,通过 feature flag 控制。压缩顺序精心设计:前序压缩的结果作为后续压缩的输入,避免重复工作;每层压缩后都会检查是否已经满足上下文限制,提前退出避免不必要的计算。

Sources: query.ts

API 调用与流式处理

压缩完成后,query 调用 deps.callModel()(即 queryModelWithStreaming)发起 Anthropic API 请求。这是一个异步生成器,逐步 yield 流式响应事件。核心特性包括:

  • Streaming Tool Execution:在模型输出 tool_use 块时立即开始执行工具,而非等待完整响应
  • Fallback Model:当主模型过载时自动切换到备用模型,通过 FallbackTriggeredError 异常触发
  • Withhold Recoverable Errors:对于可恢复的错误(prompt-too-long、max-output-tokens),先暂不 yield,尝试恢复路径
  • Tool Input Backfill:在 yield 前为工具输入补充可观察字段(如文件路径展开),确保 SDK 消费者看到完整数据
for await (const message of deps.callModel({
  messages: prependUserContext(messagesForQuery, userContext),
  systemPrompt: fullSystemPrompt,
  tools: toolUseContext.options.tools,
  options: { model, fastMode, taskBudget, ... }
})) {
  // 处理流式消息
  if (message.type === 'assistant') {
    // 检查是否需要 backfill
    // 检查是否需要 withhold
    if (!withheld) yield message
  }
}

Sources: query.ts

工具执行编排

当 API 响应包含 tool_use 块时,query 进入工具执行阶段。根据配置选择两种执行模式:

  • StreamingToolExecutor:并行执行多个工具,在工具完成时立即 yield 结果,减少端到端延迟
  • runTools:顺序执行工具,等待所有工具完成后再继续

工具执行结果被收集到 toolResults 数组,并用于构建下一轮查询的消息列表。工具执行期间如果发生中止(用户点击停止),会为所有未完成的工具生成合成工具结果,确保消息链完整性。

const toolUpdates = streamingToolExecutor
  ? streamingToolExecutor.getRemainingResults()
  : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)

for await (const update of toolUpdates) {
  if (update.message) {
    yield update.message
    toolResults.push(normalizeMessageForAPI(update.message))
  }
  if (update.newContext) {
    updatedToolUseContext = { ...update.newContext, queryTracking }
  }
}

Sources: query.ts

终止条件与循环继续

query 循环通过明确的终止条件退出,每个条件对应不同的对话结束状态:

终止原因触发条件后续行为
completed无工具调用,模型返回文本响应执行 stop hooks,返回最终结果
aborted用户中止或超时生成中断消息,清理资源
max_turns达到配置的最大轮次限制yield max_turns_reached 附件
stop_hook_preventedStop hook 阻止继续立即返回,不进入下一轮
prompt_too_long上下文超限且无法恢复yield 错误消息,退出循环
model_errorAPI 调用失败yield 错误消息,记录诊断信息

如果工具执行后未触发任何终止条件,循环通过更新 state 对象并 continue 进入下一轮。新的状态包含更新后的消息列表(原始消息 + assistant 消息 + 工具结果)、递增的轮次计数、重置的恢复标志。

Sources: query.ts

错误恢复与降级策略

QueryEngine 实现了多层错误恢复机制,确保对话在遇到异常时能够优雅降级而非直接失败。

Model Fallback 机制

当主模型(如 claude-sonnet-4)过载时,API 返回特定的过载错误,触发 FallbackTriggeredError。query 循环捕获此异常后,清空当前轮次的 assistant 消息和工具结果,切换到备用模型(如 claude-opus),并使用 stripSignatureBlocks() 移除思考签名块(因为签名与模型绑定,跨模型重放会导致 API 错误)。

try {
  for await (const message of deps.callModel(...)) {
    // 处理流式响应
  }
} catch (innerError) {
  if (innerError instanceof FallbackTriggeredError && fallbackModel) {
    currentModel = fallbackModel
    yield* yieldMissingToolResultBlocks(assistantMessages, 'Model fallback triggered')
    assistantMessages.length = 0
    messagesForQuery = stripSignatureBlocks(messagesForQuery)
    yield createSystemMessage(`Switched to ${fallbackModel}...`, 'warning')
    continue  // 使用备用模型重试
  }
  throw innerError
}

Sources: query.ts

Reactive Compact 恢复

当遇到 prompt_too_long 错误时,如果启用了 reactive compact,系统会尝试动态压缩对话历史。与主动压缩(在 API 调用前执行)不同,reactive compact 在错误发生后触发,使用更激进的压缩策略。压缩成功后,使用压缩后的消息列表重新进入循环;如果压缩失败,yield 被暂存的错误消息并退出。

if (isWithheld413 && reactiveCompact) {
  const compacted = await reactiveCompact.tryReactiveCompact({
    hasAttempted: hasAttemptedReactiveCompact,
    messages: messagesForQuery,
    cacheSafeParams: { systemPrompt, userContext, ... }
  })
  
  if (compacted) {
    const postCompactMessages = buildPostCompactMessages(compacted)
    yield* postCompactMessages
    state = { messages: postCompactMessages, hasAttemptedReactiveCompact: true, ... }
    continue
  }
  
  yield lastMessage  // 恢复失败,yield 错误
  return { reason: 'prompt_too_long' }
}

Sources: query.ts

Max Output Tokens 恢复

当模型输出达到 max_output_tokens 限制时,query 循环采用升级重试策略:首先尝试使用更大的输出 token 限制(64k)重新调用 API,如果仍然超限,则注入恢复消息(“Output token limit hit. Resume directly…”)并继续对话,让模型从中断点继续。恢复尝试限制为 3 次,避免无限循环。

if (isWithheldMaxOutputTokens(lastMessage)) {
  // 策略1:升级到 64k 输出限制
  if (maxOutputTokensOverride === undefined) {
    state = { ...state, maxOutputTokensOverride: ESCALATED_MAX_TOKENS }
    continue
  }
  
  // 策略2:注入恢复消息
  if (maxOutputTokensRecoveryCount < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT) {
    const recoveryMessage = createUserMessage({
      content: "Output token limit hit. Resume directly...",
      isMeta: true
    })
    state = {
      ...state,
      messages: [...messagesForQuery, ...assistantMessages, recoveryMessage],
      maxOutputTokensRecoveryCount: maxOutputTokensRecoveryCount + 1
    }
    continue
  }
  
  yield lastMessage  // 恢复失败
}

Sources: query.ts

Context Collapse Drain

当启用 Context Collapse 特性时,如果遇到 413 错误,系统会尝试排空已暂存的压缩操作。这些压缩操作在之前的迭代中被识别但未提交(为了避免过早丢失上下文)。排空后重新进入循环,使用更精简的上下文重试 API 调用。

Sources: query.ts

Stop Hooks 与后处理

每轮对话结束后(模型返回响应且无工具调用),query 执行 stop hooks 进行后处理和决策。Stop hooks 是可扩展的钩子系统,用于实现各种跨切面关注点:内存提取、任务列表更新、提示建议生成、模板分类等。

Stop Hook 执行流程

graph TB
    A[对话轮次结束] --> B{有 assistant 消息?}
    B -->|是| C[执行 stop hooks]
    B -->|否| D[跳过 hooks]
    
    C --> E{Hook 返回结果}
    E -->|preventContinuation| F[返回 stop_hook_prevented]
    E -->|blockingErrors| G[注入错误消息<br/>继续下一轮]
    E -->|正常| H[检查 token budget]
    
    H --> I{Budget 检查}
    I -->|需要继续| J[注入 nudge 消息<br/>继续下一轮]
    I -->|正常| K[返回 completed]
    
    style C fill:#fff9c4
    style F fill:#ffcdd2
    style G fill:#ffccbc
    style J fill:#c8e6c9

Stop hooks 可以返回三种结果:preventContinuation(完全阻止对话继续,用于强制终止场景)、blockingErrors(注入错误消息并继续,用于提示模型修正行为)、正常完成(允许对话自然结束或根据 token budget 决定是否继续)。

Sources: stopHooks.ts

Token Budget 管理

Token budget 是一个实验性特性,用于在长对话中自动管理 token 消耗。当单轮对话消耗的 token 达到预算的某个百分比时,系统会注入一个 nudge 消息,提示模型总结当前进展并准备结束,避免无限制地消耗 token。

if (feature('TOKEN_BUDGET')) {
  const decision = checkTokenBudget(
    budgetTracker,
    agentId,
    getCurrentTurnTokenBudget(),
    getTurnOutputTokens()
  )
  
  if (decision.action === 'continue') {
    state = {
      ...state,
      messages: [...messages, createUserMessage({
        content: decision.nudgeMessage,
        isMeta: true
      })]
    }
    continue
  }
}

Sources: query.ts

依赖注入与可测试性

query 函数通过 QueryDeps 接口接收外部依赖,实现生产代码与测试代码的解耦。这种设计允许测试用例注入模拟的 API 调用器、压缩函数、UUID 生成器,从而隔离测试 query 循环的逻辑而不依赖真实的外部服务。

export type QueryDeps = {
  callModel: typeof queryModelWithStreaming
  microcompact: typeof microcompactMessages
  autocompact: typeof autoCompactIfNeeded
  uuid: () => string
}

export function productionDeps(): QueryDeps {
  return {
    callModel: queryModelWithStreaming,
    microcompact: microcompactMessages,
    autocompact: autoCompactIfNeeded,
    uuid: randomUUID
  }
}

// 测试时注入模拟依赖
const testDeps = {
  callModel: mockCallModel,
  microcompact: jest.fn(),
  autocompact: jest.fn(),
  uuid: () => 'test-uuid'
}

const generator = query({ ...params, deps: testDeps })

Sources: deps.ts

性能优化与监控

检查点追踪

query 循环在关键路径上插入 queryCheckpoint 调用,用于性能分析和延迟追踪。检查点包括:循环入口、压缩开始/结束、API 调用开始/结束、工具执行开始/结束、递归调用等。这些检查点数据被收集到性能分析器中,帮助识别瓶颈和优化机会。

Sources: query.ts

预取优化

为减少用户感知的延迟,query 循环在等待 API 响应和工具执行的同时,并行启动内存预取技能发现预取。这些预取操作利用空闲时间提前加载可能需要的资源,当真正需要时直接从缓存读取,避免阻塞主路径。

// 在循环开始时启动预取
using pendingMemoryPrefetch = startRelevantMemoryPrefetch(messages, toolUseContext)
const pendingSkillPrefetch = skillPrefetch?.startSkillDiscoveryPrefetch(messages, toolUseContext)

// ... API 调用和工具执行 ...

// 在轮次结束前消费预取结果
if (pendingMemoryPrefetch.settledAt !== null) {
  const memoryAttachments = await pendingMemoryPrefetch.promise
  // 注入到工具结果中
}

Sources: query.ts

使用量累计与成本追踪

QueryEngine 在整个会话生命周期中持续累计 API 使用量(input_tokens、output_tokens、cache_read_tokens、cache_creation_tokens)。每次 API 响应的 message_delta 事件更新当前消息的使用量,message_stop 事件将其累计到会话总量。最终在 result 消息中返回完整的成本统计。

if (message.event.type === 'message_delta') {
  currentMessageUsage = updateUsage(currentMessageUsage, message.event.usage)
}
if (message.event.type === 'message_stop') {
  this.totalUsage = accumulateUsage(this.totalUsage, currentMessageUsage)
}

Sources: QueryEngine.ts

持久化与会话恢复

QueryEngine 在关键节点执行会话持久化,确保对话在进程崩溃或用户中止后能够恢复。持久化策略根据运行模式调整:bare 模式(脚本化调用)使用 fire-and-forget 异步写入,避免阻塞;交互模式等待写入完成后再继续,确保数据一致性。

if (persistSession && messagesFromUserInput.length > 0) {
  const transcriptPromise = recordTranscript(messages)
  if (isBareMode()) {
    void transcriptPromise  // 不等待
  } else {
    await transcriptPromise
    if (isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH)) {
      await flushSessionStorage()
    }
  }
}

压缩边界消息触发增量持久化:在写入压缩边界前,先 flush 边界之前的所有消息,确保 resume 时能够正确重建保留段(preserved segment)。

Sources: QueryEngine.ts

与其他组件的交互

QueryEngine 作为对话管理的核心,与多个子系统紧密协作:

  • Tool 系统:通过 canUseTool 权限检查函数和 StreamingToolExecutor 执行器协调工具调用
  • Compact 服务:在上下文增长时自动触发压缩,管理压缩边界和摘要生成
  • MCP 服务:集成 MCP 服务器的工具和资源,在系统提示中注入 MCP 上下文
  • Bridge 系统:通过状态访问器与 IDE 集成,同步文件变更和权限决策
  • Analytics 服务:在关键事件点记录遥测数据,用于产品改进和问题诊断
  • Session Storage:管理会话持久化,支持 —resume 功能

Sources: QueryEngine.ts

关键设计模式总结

QueryEngine 的设计体现了多个高级架构模式:

  1. 异步生成器模式:通过 AsyncGenerator 实现流式处理,调用者可以实时消费中间结果
  2. 状态机模式:query 循环通过显式状态转换管理复杂的多轮对话流程
  3. 依赖注入模式:通过 QueryDeps 解耦外部依赖,提升可测试性
  4. 策略模式:多层压缩策略、多种错误恢复策略可插拔组合
  5. 观察者模式:通过 yield 机制向多个消费者广播消息事件
  6. 模板方法模式:submitMessage 定义了对话处理的骨架,具体步骤由子组件实现

这些模式的组合使 QueryEngine 在保持代码可维护性的同时,具备了处理复杂异步流程、优雅错误恢复、灵活扩展的能力。理解 QueryEngine 的架构是掌握 Claude Code 整体设计的关键一步,它为工具系统、命令系统、界面层提供了坚实的对话管理基础。

建议继续阅读 工具架构:Tool 接口与权限模型 了解 QueryEngine 如何协调工具执行,以及 压缩服务:上下文压缩与内存优化策略 深入理解多层压缩机制的实现细节。