• 如何用 AsyncGenerator 实现流式响应
  • 如何管理对话状态和历史
  • 如何实现权限追踪和预算控制

⚠️ 声明:源码基于开源项目 claude-code,部分内部实现可能有所不同。


🏗️ 核心架构:QueryEngine 类

类设计概览

export class QueryEngine {
private config: QueryEngineConfig // 引擎配置
private mutableMessages: Message[] // 对话消息历史
private abortController: AbortController // 中断控制器
private permissionDenials: SDKPermissionDenial[] // 权限拒绝记录
private totalUsage: NonNullableUsage // 累计使用量
private discoveredSkillNames = new Set<string>()
private loadedNestedMemoryPaths = new Set<string>()

async *submitMessage(prompt, options?) {
// 核心处理逻辑
}
}

设计思想:每个 QueryEngine 实例对应一个会话,内部维护完整的对话状态。


🔄 submitMessage:核心处理流程

这是整个系统最核心的方法,采用 AsyncGenerator 模式实现流式处理:

async *submitMessage(
prompt: string | ContentBlockParam[],
options?: { uuid?: string; isMeta?: boolean },
): AsyncGenerator<SDKMessage, void, unknown> {

完整流程图

┌─────────────────────────────────────────────────────────────┐
│ submitMessage 完整流程 │
├─────────────────────────────────────────────────────────────┤
│ 1. 初始化配置 │
│ - 解构工具、命令、模型参数 │
│ - 设置工作目录 │
│ - 包装权限检查器(追踪拒绝) │
├─────────────────────────────────────────────────────────────┤
│ 2. 构建系统提示词 │
│ - fetchSystemPromptParts() │
│ - 加载内存机制提示词 │
│ - 合并自定义提示词 │
├─────────────────────────────────────────────────────────────┤
│ 3. 处理用户输入 │
│ - processUserInput() 处理 slash 命令 │
│ - 返回 shouldQuery 决定是否调用 API │
├─────────────────────────────────────────────────────────────┤
│ 4. 会话持久化 │
│ - 立即写入 transcript(防 kill 丢失) │
│ - fire-and-forget 优化性能 │
├─────────────────────────────────────────────────────────────┤
│ 5. 查询循环 │
│ - for await (message of query()) │
│ - 处理各种消息类型 │
├─────────────────────────────────────────────────────────────┤
│ 6. 预算与错误检查 │
│ - maxTurns / maxBudgetUsd / taskBudget │
├─────────────────────────────────────────────────────────────┤
│ 7. 返回结果 │
│ - 提取文本、累积使用量、返回 result │
└─────────────────────────────────────────────────────────────┘

🌀 第一阶段:初始化与配置

1.1 配置解构

const {
cwd,
commands,
tools,
mcpClients,
thinkingConfig,
maxTurns,
maxBudgetUsd,
taskBudget,
canUseTool,
customSystemPrompt,
appendSystemPrompt,
// ...
} = this.config

1.2 权限包装器

const wrappedCanUseTool: CanUseToolFn = async (
tool, input, toolUseContext, assistantMessage, toolUseID, forceDecision,
) => {
const result = await canUseTool(...)

// 追踪权限拒绝
if (result.behavior !== 'allow') {
this.permissionDenials.push({
tool_name: tool.name,
tool_use_id: toolUseID,
tool_input: input,
})
}

return result
}

设计亮点:在调用原始权限检查器的同时,自动记录所有被拒绝的调用,用于最终报告。

1.3 模型与思考配置

const initialMainLoopModel = userSpecifiedModel
? parseUserSpecifiedModel(userSpecifiedModel)
: getMainLoopModel()

const initialThinkingConfig: ThinkingConfig = thinkingConfig
? thinkingConfig
: shouldEnableThinkingByDefault() !== false
? { type: 'adaptive' }
: { type: 'disabled' }

📝 第二阶段:系统提示词构建

核心调用

const { defaultSystemPrompt, userContext, systemContext } =
await fetchSystemPromptParts({
tools,
mainLoopModel: initialMainLoopModel,
additionalWorkingDirectories: Array.from(
initialAppState.toolPermissionContext.additionalWorkingDirectories.keys()
),
mcpClients,
customSystemPrompt,
})

// 组装最终提示词
const systemPrompt = asSystemPrompt([
...(customPrompt !== undefined ? [customPrompt] : defaultSystemPrompt),
...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []),
...(appendSystemPrompt ? [appendSystemPrompt] : []),
])

⚙️ 第三阶段:用户输入处理

processUserInput 的职责

const {
messages: messagesFromUserInput, // 处理后的消息
shouldQuery, // 是否需要调用 LLM
allowedTools, // 允许的工具列表
model: modelFromUserInput, // 可能被 slash 命令修改的模型
resultText, // 本地命令的输出结果
} = await processUserInput({
input: prompt,
mode: 'prompt',
// ...
})

关键设计:shouldQuery

  • true:需要调用 LLM API 获取响应
  • false:本地命令已处理完成,直接返回结果

💡 例如:用户输入 /help,这是本地命令,不需要调用 LLM。


💾 第四阶段:会话持久化(关键设计!)

// 在 API 调用前就先写入 transcript
if (persistSession && messagesFromUserInput.length > 0) {
const transcriptPromise = recordTranscript(messages)

if (isBareMode()) {
// 脚本模式:fire-and-forget
void transcriptPromise
} else {
// 交互模式:等待写入完成
await transcriptPromise
if (isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH)) {
await flushSessionStorage()
}
}
}

设计意图

即使在 API 响应回来之前进程被 kill(如用户点击 Stop),--resume 也能恢复会话!

这是 Claude Code 能实现「可恢复会话」的核心保障。


🔁 第五阶段:查询循环

调用 query() 生成器

for await (const message of query({
messages,
systemPrompt,
userContext,
systemContext,
canUseTool: wrappedCanUseTool,
toolUseContext: processUserInputContext,
fallbackModel,
maxTurns,
taskBudget,
})) {
// 处理各种消息类型
}

消息类型分发

switch (message.type) {
case 'tombstone':
// 删除消息控制信号
break

case 'assistant':
// AI 响应
this.mutableMessages.push(message)
yield* normalizeMessage(message)
break

case 'progress':
// 进度通知
this.mutableMessages.push(message)
yield* normalizeMessage(message)
break

case 'user':
turnCount++
break

case 'stream_event':
// 流式事件
if (message.event.type === 'message_start') {
// 重置当前消息使用统计
currentMessageUsage = EMPTY_USAGE
}
if (message.event.type === 'message_delta') {
// 更新使用量 + stop_reason
currentMessageUsage = updateUsage(currentMessageUsage, message.event.usage)
}
if (message.event.type === 'message_stop') {
// 累积到总使用量
this.totalUsage = accumulateUsage(this.totalUsage, currentMessageUsage)
}
break

case 'attachment':
// 附件(structured_output, max_turns_reached)
break

case 'system':
// 系统消息(compact_boundary, api_error)
break

case 'tool_use_summary':
// 工具调用摘要
yield { type: 'tool_use_summary', ... }
break
}

🛡️ 第六阶段:预算与错误检查

三重保护机制

// 1. USD 预算检查
if (maxBudgetUsd !== undefined && getTotalCost() >= maxBudgetUsd) {
yield { type: 'result', subtype: 'error_max_budget_usd', ... }
return
}

// 2. 最大轮次检查
if (message.attachment?.type === 'max_turns_reached') {
yield { type: 'result', subtype: 'error_max_turns', ... }
return
}

// 3. 结构化输出重试次数检查
if (jsonSchema && callsThisQuery >= maxRetries) {
yield { type: 'result', subtype: 'error_max_structured_output_retries', ... }
return
}

🎯 第七阶段:返回结果

// 提取文本结果
let textResult = ''
if (result.type === 'assistant') {
const lastContent = last(result.message.content)
if (lastContent?.type === 'text') {
textResult = lastContent.text
}
}

// 返回成功结果
yield {
type: 'result',
subtype: 'success',
result: textResult,
duration_ms: Date.now() - startTime,
duration_api_ms: getTotalAPIDuration(),
num_turns: turnCount,
total_cost_usd: getTotalCost(),
usage: this.totalUsage,
permission_denials: this.permissionDenials,
// ...
}

🔑 核心设计亮点

1. AsyncGenerator 模式

async *submitMessage(...) {
// 初始化
// ...
// 边处理边返回
for await (const message of query(...)) {
yield* normalizeMessage(message)
}
// 最终结果
yield { type: 'result', ... }
}

优势

  • 流式响应,用户体验更好
  • 内存效率高,不需要等完整响应
  • 天然支持中断

2. 权限追踪

// 包装 canUseTool,自动记录拒绝
const wrappedCanUseTool = async (...) => {
const result = await canUseTool(...)
if (result.behavior !== 'allow') {
this.permissionDenials.push(...)
}
return result
}

最终返回给调用者,包含完整的权限拒绝记录。

3. 会话预持久化

// 在 API 调用前就写入!
if (persistSession) {
await recordTranscript(messages) // 用户消息已到达
}
// ... 然后才开始 API 调用
for await (const message of query(...)) {
// 处理响应
}

这确保了即使 API 从未返回,会话也可恢复。

4. 历史压缩边界

if (message.subtype === 'compact_boundary') {
// 释放压缩前的消息,节省内存
const boundaryIdx = this.mutableMessages.length - 1
if (boundaryIdx > 0) {
this.mutableMessages.splice(0, boundaryIdx)
}
}

通过 compact_boundary 消息触发历史压缩,防止内存无限增长。


📊 数据流总结

用户输入
↓
processUserInput() → shouldQuery?
↓
┌──────────────────────────────────────────┐
│ YES → query() 调用 LLM │
│ NO → 本地命令执行,直接返回结果 │
└──────────────────────────────────────────┘
↓
for await message of query():
↓
├→ assistant → normalizeMessage → yield
├→ progress → normalizeMessage → yield
├→ stream_event → 更新使用量
├→ system (compact_boundary) → 压缩历史
└→ ...其他类型
↓
预算/轮次检查
↓
返回 result

📝 总结

QueryEngine 展示了构建生产级 AI 对话系统的完整范式:

特性 实现方式 价值
流式处理 AsyncGenerator 实时响应、内存高效
权限追踪 包装 canUseTool 透明记录、可审计
会话恢复 预持久化 transcript 可中断、可恢复
预算控制 多重检查机制 成本可控
历史压缩 compact_boundary 长会话内存不爆炸
错误处理 分类重试、结构化输出 鲁棒性
Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐