1. 项目概述:一个统一四大AI编程助手的Deno工具库

如果你和我一样,在日常开发中同时使用多个AI编程助手——比如Claude Code、Cursor、OpenCode和Codex——那你一定遇到过这样的烦恼:每个工具都有自己的命令行接口、调用方式、参数格式和事件流协议。想要在项目中集成它们,就得为每个工具写一套适配代码,调试起来简直是噩梦。

最近我在构建一个自动化工作流引擎时,就深陷这种“适配地狱”。于是,我决定动手解决这个问题,把重复的适配逻辑抽离出来,做成一个独立的工具库。这就是 @korchasa/ai-ide-cli 的由来。它是一个用Deno和TypeScript编写的轻量级包装器,核心目标只有一个: 让Claude Code、OpenCode、Cursor和Codex这四个AI编程助手的CLI调用变得完全统一

简单来说,这个库帮你做了三件事:

  1. 统一调用接口 :无论底层是哪个运行时,你都可以用完全相同的函数签名来调用。
  2. 标准化事件处理 :把各个运行时五花八门的NDJSON事件流,解析成统一的格式。
  3. 抽象核心能力 :会话管理、重试机制、HITL(人在回路)工具连接、能力探测等,全部封装好。

这个库最初是从我的另一个项目 @korchasa/flowai-workflow (一个DAG工作流引擎)中拆分出来的。拆分的原因很简单:很多开发者只需要CLI包装层,而不需要完整的DAG引擎。现在,你可以只依赖这个不到100KB的小包,而不是引入整个工作流引擎。

2. 核心设计思路:为什么选择“适配器模式”?

在设计这个库时,我面临几个关键选择。首先是架构模式的选择。我最终采用了经典的“适配器模式”,而不是更复杂的“桥接模式”或“策略模式”。原因很实际:适配器模式最适合这种“统一多个已有接口”的场景。

2.1 运行时差异的挑战

这四个运行时在底层实现上差异巨大:

  • Claude Code :基于标准的 claude CLI,支持流式JSON输入输出,有完善的权限控制系统。
  • OpenCode :通过 opencode serve 提供HTTP API,使用SSE(Server-Sent Events)传输事件。
  • Cursor :最特殊,它的“会话”实际上是每次调用都新建一个子进程,没有真正的长连接。
  • Codex :使用实验性的 codex app-server 和JSON-RPC协议,支持真正的双向通信。

面对这些差异,我设计了两个核心抽象:

  1. RuntimeAdapter :一次性调用的适配器,提供 invoke() 方法。
  2. RuntimeSession :长会话的适配器,提供 send() endInput() abort() 等方法。

2.2 能力探测机制

一个关键的设计决策是: 不假设所有运行时都支持所有功能 。相反,每个适配器都有一个 capabilities 对象,明确声明支持哪些功能。这样调用者可以在运行时检查,而不是在编译时硬编码。

// 检查某个运行时是否支持会话功能
const adapter = getRuntimeAdapter("cursor");
if (!adapter.capabilities.session) {
  // 回退到一次性调用
  return await adapter.invoke(options);
}

这种设计让库更加健壮。当某个运行时更新了功能,或者新的运行时加入时,只需要更新对应的适配器,不会破坏现有代码。

2.3 输出标准化

所有运行时调用都返回统一的 CliRunOutput 类型:

interface CliRunOutput {
  result?: string;           // 主要的文本输出
  permission_denials?: Array<{ // 权限拒绝(仅Claude)
    permission: string;
    reason: string;
  }>;
  total_cost_usd?: number;   // 本次调用的总成本
  session_id?: string;       // 会话ID,用于恢复
  // ... 其他标准化字段
}

这意味着下游代码可以完全不用关心底层是哪个运行时,只需要处理统一的数据结构。

3. 安装与基础使用:从零开始集成

3.1 环境准备与安装

首先确保你已经安装了Deno 1.40或更高版本。这个库完全基于Deno和TypeScript构建,利用了Deno的现代特性,比如内置的TypeScript支持、安全的权限系统和优秀的包管理。

安装非常简单:

# 使用Deno的包管理器添加依赖
deno add jsr:@korchasa/ai-ide-cli

# 或者直接在代码中导入
import { getRuntimeAdapter } from "jsr:@korchasa/ai-ide-cli/runtime";

注意 :这个库本身不包含任何AI运行时的二进制文件。你需要单独安装你想要使用的运行时:

  • Claude Code: npm install -g @anthropic-ai/claude
  • Cursor: 从官网下载安装
  • OpenCode: npm install -g opencode
  • Codex: npm install -g @cursorai/codex-cli

3.2 基础调用示例

让我们从一个最简单的例子开始。假设你想让AI解释React Hooks:

import { getRuntimeAdapter } from "jsr:@korchasa/ai-ide-cli/runtime";

async function explainReactHooks() {
  // 选择运行时(这里用Claude)
  const adapter = getRuntimeAdapter("claude");
  
  // 统一调用接口
  const { output, error } = await adapter.invoke({
    taskPrompt: "用两句话解释React Hooks的核心概念",
    timeoutSeconds: 30,
    maxRetries: 2,
    retryDelaySeconds: 1,
  });

  if (error) {
    console.error("调用失败:", error);
    return;
  }
  
  console.log("AI的回答:", output?.result);
}

await explainReactHooks();

这段代码有几个关键点:

  1. 超时控制 timeoutSeconds: 30 确保调用不会无限期挂起。
  2. 自动重试 maxRetries: 2 在网络波动或临时错误时自动重试。
  3. 统一的错误处理 :所有运行时都返回相同的 { output, error } 结构。

3.3 各运行时特性支持矩阵

在实际使用前,了解每个运行时的特性支持很重要。下面这个表格是我在实际测试中整理的:

特性 Claude Code OpenCode Cursor Codex
权限模式 完整支持 完整支持 --yolo 模式 完整支持
HITL支持 通过权限拒绝 通过MCP服务器 不支持 通过MCP服务器
会话恢复 --resume 参数 --session 参数 --resume 参数 resume <id> 命令
交互式TUI 支持 支持 不支持 支持
工具使用观察 支持 支持 不支持 支持
技能加载 ~/.claude/skills/ .claude/skills/ 不支持 ~/.agents/skills/
模型选择 完整支持 完整支持 部分支持 完整支持

实操心得 :Cursor的特性支持相对较少,特别是缺少真正的HITL和工具使用观察。如果你的工作流重度依赖这些功能,建议优先考虑Claude或Codex。

4. 高级功能详解:会话管理、事件处理与HITL

4.1 流式输入会话(Streaming-Input Sessions)

对于需要多轮对话的场景,一次性调用就不够用了。这时可以使用 openSession 方法创建长会话。所有运行时都实现了相同的会话接口:

import { getRuntimeAdapter } from "jsr:@korchasa/ai-ide-cli/runtime";

async function multiTurnConversation() {
  const adapter = getRuntimeAdapter("claude");
  
  // 检查是否支持会话
  if (!adapter.capabilities.session) {
    throw new Error("该运行时不支持会话功能");
  }

  // 创建会话
  const session = await adapter.openSession!({
    systemPrompt: "你是一个专业的TypeScript开发者,回答要简洁准确。",
  });

  try {
    // 发送第一条消息
    await session.send("请解释TypeScript中的泛型(Generics)");
    
    // 发送第二条消息(不需要等待第一条完成)
    await session.send("再举个例子说明如何在React组件中使用泛型");
    
    // 告知AI没有更多输入了
    await session.endInput();
    
    // 处理事件流
    for await (const event of session.events) {
      switch (event.type) {
        case "thinking":
          console.log("AI正在思考...");
          break;
        case "result":
          console.log("AI回复:", event.raw);
          break;
        case "error":
          console.error("发生错误:", event.raw);
          break;
      }
    }
    
    // 等待会话完全结束
    const status = await session.done;
    console.log("会话结束,退出码:", status.exitCode);
    
  } finally {
    // 确保资源被清理
    await session.abort("用户主动结束");
  }
}

这里有几个重要的设计决策:

  1. 非阻塞发送 send() 方法立即返回,不等待AI回复。这允许你实现“流式对话”,用户可以连续发送多条消息。
  2. 统一的事件接口 :无论底层是Claude的JSON流、OpenCode的SSE还是Codex的JSON-RPC,都转换成相同的事件格式。
  3. 安全的资源管理 :使用 try...finally 确保会话总是被正确清理。

4.2 运行时特定的会话实现

虽然接口统一,但每个运行时的底层实现不同:

  • Claude :真正的流式输入,通过管道(pipe)与 claude 进程通信。
  • OpenCode :基于HTTP和SSE,需要启动 opencode serve 服务。
  • Cursor :模拟会话,每次 send() 都创建一个新的子进程。
  • Codex :使用实验性的 codex app-server 和JSON-RPC。

如果你需要访问运行时特定的信息,可以导入具体的会话类:

import { ClaudeSession } from "jsr:@korchasa/ai-ide-cli/claude/session";

const session = await adapter.openSession!({ /* options */ }) as ClaudeSession;
console.log("Claude进程ID:", session.pid);
console.log("会话ID:", session.sessionId);

4.3 自定义环境与生命周期钩子

这个库提供了丰富的自定义选项。比如,你可以为Claude创建一个干净的配置环境:

import { invokeClaudeCli } from "jsr:@korchasa/ai-ide-cli/claude/process";

// 创建一个临时目录作为干净的配置环境
const tempDir = await Deno.makeTempDir();

const { output } = await invokeClaudeCli({
  taskPrompt: "分析这段代码的复杂度",
  timeoutSeconds: 60,
  
  // 自定义环境变量
  env: {
    CLAUDE_CONFIG_DIR: tempDir,  // 使用临时配置目录
    CLAUDE_API_KEY: "your-api-key-here",
  },
  
  // 原始事件处理(逃生舱口)
  onEvent: (rawEvent) => {
    // 可以在这里记录日志或做自定义处理
    console.log("原始事件:", JSON.stringify(rawEvent));
  },
  
  // 类型化的生命周期钩子
  hooks: {
    onInit: (info) => {
      console.log("会话初始化完成,ID:", info.sessionId);
    },
    onResult: (output) => {
      console.log("本次调用成本: $", output.total_cost_usd?.toFixed(4));
    },
    onError: (error) => {
      console.error("调用失败:", error.message);
    },
  },
  
  // Claude特有的设置源控制
  settingSources: ["user"], // 只使用用户设置,忽略项目设置
});

// 清理临时目录
await Deno.remove(tempDir, { recursive: true });

注意事项 settingSources 是Claude特有的选项。其他运行时忽略这个参数。这种设计体现了“渐进增强”的思想:在通用接口上提供运行时特定的扩展点。

4.4 HITL(人在回路)MCP自启动契约

HITL是高级工作流中的关键功能。这个库为OpenCode和Codex提供了MCP(Model Context Protocol)服务器集成。但这里有一个重要的设计: 库本身不包含MCP服务器二进制文件 ,而是要求调用者提供如何启动服务器的指令。

// 在你的CLI入口文件中
import { runOpenCodeHitlMcpServer } from "jsr:@korchasa/ai-ide-cli/opencode/hitl-mcp";

// 处理内部标志
if (Deno.args.includes("--internal-opencode-hitl-mcp")) {
  await runOpenCodeHitlMcpServer();
  Deno.exit(0);
}

// 在调用OpenCode时
import { invokeOpenCodeCli } from "jsr:@korchasa/ai-ide-cli/opencode/process";

await invokeOpenCodeCli({
  taskPrompt: "重构这个函数",
  hitlConfig: {
    ask_script: "./scripts/ask-user.sh",  // 询问用户的脚本
    check_script: "./scripts/check-auth.sh", // 检查权限的脚本
    poll_interval: 30,  // 每30秒检查一次
    timeout: 3600,      // 超时1小时
  },
  hitlMcpCommandBuilder: () => [
    Deno.execPath(),  // 当前Deno可执行文件
    "run",
    "-A",  // 所有权限(生产环境应该更严格)
    import.meta.url,  // 当前模块
    "--internal-opencode-hitl-mcp",
  ],
});

这种设计有几个好处:

  1. 安全性 :调用者控制MCP服务器的启动方式和权限。
  2. 灵活性 :可以在不同的环境中使用不同的启动参数。
  3. 可调试性 :MCP服务器作为独立进程运行,便于监控和调试。

踩坑记录 :最初我尝试在库内部直接启动MCP服务器,但遇到了权限管理和进程生命周期的问题。现在的设计虽然让调用代码稍微复杂一点,但更加健壮和灵活。

5. 能力探测与运行时发现

5.1 动态能力探测

在实际的自动化工作流中,我们经常需要知道当前环境中有哪些AI运行时可用,以及它们支持哪些功能。这个库提供了 fetchCapabilitiesSlow 方法:

async function discoverAvailableRuntimes() {
  const runtimes = ["claude", "opencode", "cursor", "codex"] as const;
  const results = [];
  
  for (const runtime of runtimes) {
    try {
      const adapter = getRuntimeAdapter(runtime);
      
      // 检查是否支持能力探测
      if (!adapter.capabilities.capabilityInventory) {
        console.log(`${runtime}: 不支持能力探测`);
        continue;
      }
      
      // 探测能力(注意:这是耗时操作)
      const inventory = await adapter.fetchCapabilitiesSlow!({
        cwd: Deno.cwd(),  // 在当前目录探测
      });
      
      results.push({
        runtime,
        skills: inventory.skills.length,
        commands: inventory.commands.length,
        hasSession: adapter.capabilities.session,
        hasHitl: adapter.capabilities.hitl,
      });
      
    } catch (error) {
      console.log(`${runtime}: 不可用 (${error.message})`);
    }
  }
  
  return results;
}

const available = await discoverAvailableRuntimes();
console.table(available);

这个方法之所以叫 Slow ,是因为它实际上会启动一个完整的AI会话来探测能力。在Claude或Codex上,这可能需要几秒钟,并且会产生API调用成本。

5.2 技能和斜杠命令枚举

能力探测返回的 inventory 对象包含两个重要数组:

interface CapabilityInventory {
  skills: Array<{
    name: string;      // 技能名称,如 "refactor"
    plugin?: string;   // 来自哪个插件
    description?: string; // 技能描述
  }>;
  commands: Array<{
    name: string;      // 命令名称,如 "/explain"
    plugin?: string;   // 来自哪个插件
    usage?: string;    // 使用说明
  }>;
}

这个信息对于构建智能IDE插件或工作流编排器非常有用。比如,你可以基于可用的技能动态生成上下文菜单:

// 在IDE插件中动态生成重构选项
const inventory = await adapter.fetchCapabilitiesSlow!({ cwd: currentFileDir });

const refactorSkills = inventory.skills.filter(skill => 
  skill.name.includes("refactor") || 
  skill.description?.includes("重构")
);

if (refactorSkills.length > 0) {
  // 显示重构菜单
  showContextMenu("选择重构方式", refactorSkills.map(skill => ({
    label: skill.description || skill.name,
    action: () => invokeRefactor(skill.name),
  })));
}

5.3 缓存策略建议

由于能力探测成本较高,在实际应用中必须实现缓存:

class RuntimeCapabilityCache {
  private cache = new Map<string, { inventory: CapabilityInventory, timestamp: number }>();
  private readonly TTL = 5 * 60 * 1000; // 5分钟
  
  async getInventory(
    runtime: string, 
    cwd: string
  ): Promise<CapabilityInventory> {
    const key = `${runtime}:${cwd}`;
    const cached = this.cache.get(key);
    
    // 检查缓存是否有效
    if (cached && Date.now() - cached.timestamp < this.TTL) {
      return cached.inventory;
    }
    
    // 重新探测
    const adapter = getRuntimeAdapter(runtime as any);
    if (!adapter.capabilities.capabilityInventory) {
      throw new Error(`${runtime}不支持能力探测`);
    }
    
    const inventory = await adapter.fetchCapabilitiesSlow!({ cwd });
    
    // 更新缓存
    this.cache.set(key, { inventory, timestamp: Date.now() });
    
    return inventory;
  }
  
  // 当检测到配置变化时清除缓存
  invalidate(runtime: string, cwd: string) {
    const key = `${runtime}:${cwd}`;
    this.cache.delete(key);
  }
}

性能优化提示 :在实际部署中,可以考虑将缓存持久化到磁盘或共享存储中,这样多个进程可以共享探测结果,避免重复的昂贵调用。

6. 错误处理与调试技巧

6.1 统一的错误处理模式

所有运行时调用都遵循相同的错误处理模式:

async function safeInvoke(adapter: RuntimeAdapter, options: InvokeOptions) {
  try {
    const { output, error } = await adapter.invoke(options);
    
    if (error) {
      // 处理调用错误
      return await handleInvocationError(error, options);
    }
    
    if (!output) {
      // 理论上不会发生,但保持防御性编程
      throw new Error("调用成功但没有输出");
    }
    
    // 处理成功输出
    return processOutput(output);
    
  } catch (error) {
    // 处理未预期的错误
    return await handleUnexpectedError(error, options);
  }
}

错误分为几个层次:

  1. 调用错误 :AI运行时返回的错误,如权限拒绝、超时等。
  2. 适配器错误 :库本身的错误,如参数验证失败。
  3. 系统错误 :如文件系统错误、网络错误等。

6.2 详细的调试日志

在开发阶段,启用详细的日志记录非常重要:

import { getRuntimeAdapter } from "jsr:@korchasa/ai-ide-cli/runtime";

const adapter = getRuntimeAdapter("claude");

// 启用调试模式
const { output, error } = await adapter.invoke({
  taskPrompt: "调试任务",
  timeoutSeconds: 60,
  
  // 原始事件处理器,用于调试
  onEvent: (event) => {
    console.debug(`[${event.runtime}] ${event.type}:`, 
      typeof event.raw === 'string' ? event.raw : JSON.stringify(event.raw));
  },
  
  // 额外的运行时参数(透传给底层CLI)
  extraArgs: ["--verbose", "--log-level=debug"],
});

// 检查详细的错误信息
if (error) {
  console.error("错误类型:", error.type);
  console.error("错误消息:", error.message);
  console.error("原始错误:", error.originalError);
  
  if (error.type === "permission_denied") {
    const denials = (error as any).permissionDenials;
    console.error("被拒绝的权限:", denials);
  }
}

6.3 常见问题排查表

在实际使用中,我遇到了各种问题。下面这个排查表总结了最常见的问题和解决方案:

问题现象 可能原因 解决方案
Claude调用返回权限错误 1. 缺少API密钥
2. 配置目录权限问题
3. 网络代理问题
1. 检查 CLAUDE_API_KEY 环境变量
2. 检查 CLAUDE_CONFIG_DIR 目录权限
3. 使用 env: { HTTP_PROXY: "..." } 设置代理
OpenCode会话无法建立 1. opencode serve 未运行
2. 端口冲突
3. 防火墙阻止
1. 手动运行 opencode serve 测试
2. 检查默认端口3000是否被占用
3. 临时关闭防火墙测试
Cursor调用超时 1. Cursor未安装或路径错误
2. 许可证问题
3. 子进程创建失败
1. 确认Cursor已安装且在PATH中
2. 检查Cursor许可证状态
3. 检查系统资源限制
Codex JSON-RPC错误 1. codex app-server 版本不兼容
2. JSON-RPC协议错误
3. 传输层问题
1. 升级到 codex-cli >= 0.121.0
2. 检查JSON序列化/反序列化
3. 使用 extraArgs: ["--debug-transport"]
所有运行时都失败 1. Deno权限不足
2. 系统资源耗尽
3. 依赖冲突
1. 使用 --allow-all 运行(仅开发)
2. 检查内存和文件描述符限制
3. 清理Deno缓存 deno cache --reload

6.4 超时与重试策略

在网络不稳定的环境中,合理的超时和重试策略至关重要:

interface RetryStrategy {
  maxRetries: number;
  retryDelaySeconds: number;
  retryableErrors: string[];
  backoffMultiplier: number;
}

const defaultStrategy: RetryStrategy = {
  maxRetries: 3,
  retryDelaySeconds: 1,
  retryableErrors: [
    "timeout",
    "network_error", 
    "rate_limit",
    "server_error"
  ],
  backoffMultiplier: 2, // 指数退避
};

async function invokeWithRetry(
  adapter: RuntimeAdapter,
  options: InvokeOptions,
  strategy: RetryStrategy = defaultStrategy
) {
  let lastError: Error | null = null;
  
  for (let attempt = 0; attempt <= strategy.maxRetries; attempt++) {
    try {
      const { output, error } = await adapter.invoke({
        ...options,
        timeoutSeconds: options.timeoutSeconds || 30,
      });
      
      if (error) {
        // 检查是否可重试的错误
        if (strategy.retryableErrors.some(pattern => 
          error.message.includes(pattern))) {
          lastError = new Error(`Attempt ${attempt + 1} failed: ${error.message}`);
          
          // 指数退避
          const delay = strategy.retryDelaySeconds * 
            Math.pow(strategy.backoffMultiplier, attempt);
          
          if (attempt < strategy.maxRetries) {
            console.log(`重试中... (${delay}秒后)`);
            await new Promise(resolve => setTimeout(resolve, delay * 1000));
            continue;
          }
        }
        
        // 不可重试的错误或重试次数用尽
        throw error;
      }
      
      return output!;
      
    } catch (error) {
      lastError = error as Error;
      
      if (attempt === strategy.maxRetries) {
        break;
      }
    }
  }
  
  throw new Error(`所有重试尝试均失败: ${lastError?.message}`);
}

经验分享 :对于生产环境,我建议实现更智能的重试策略,比如基于错误类型的差异化重试、熔断器模式(circuit breaker)避免雪崩,以及将重试日志记录到监控系统。

7. 实际应用案例与集成模式

7.1 构建自动化代码审查机器人

让我们看一个实际的应用案例:构建一个自动化的代码审查机器人。这个机器人会监听GitHub的webhook,当有新的PR时,自动用AI运行时代码审查。

// 代码审查机器人核心逻辑
import { getRuntimeAdapter } from "jsr:@korchasa/ai-ide-cli/runtime";

class CodeReviewBot {
  private adapter: RuntimeAdapter;
  
  constructor(runtime: "claude" | "opencode" | "cursor" | "codex" = "claude") {
    this.adapter = getRuntimeAdapter(runtime);
  }
  
  async reviewPullRequest(prInfo: {
    title: string;
    description: string;
    diffUrl: string;
    files: Array<{ path: string; changes: string }>;
  }) {
    // 构建审查提示
    const prompt = this.buildReviewPrompt(prInfo);
    
    // 调用AI进行审查
    const { output, error } = await this.adapter.invoke({
      taskPrompt: prompt,
      timeoutSeconds: 120, // 代码审查可能需要更长时间
      maxRetries: 2,
      env: {
        // 设置GitHub token用于获取代码
        GITHUB_TOKEN: Deno.env.get("GITHUB_TOKEN") || "",
      },
      hooks: {
        onInit: (info) => {
          console.log(`开始审查PR: ${prInfo.title} [会话: ${info.sessionId}]`);
        },
        onResult: (output) => {
          console.log(`审查完成,成本: $${output.total_cost_usd?.toFixed(4)}`);
        },
      },
    });
    
    if (error) {
      throw new Error(`代码审查失败: ${error.message}`);
    }
    
    // 解析审查结果
    return this.parseReviewResult(output!.result || "");
  }
  
  private buildReviewPrompt(prInfo: any): string {
    return `
请审查以下Pull Request:

标题: ${prInfo.title}
描述: ${prInfo.description}

修改的文件:
${prInfo.files.map(f => `- ${f.path}:\n${f.changes}`).join('\n')}

请从以下角度进行审查:
1. 代码质量(可读性、可维护性)
2. 潜在bug或安全问题
3. 性能影响
4. 测试覆盖
5. 是否符合项目编码规范

请用Markdown格式回复,包含具体的代码示例和改进建议。
`;
  }
  
  private parseReviewResult(result: string): ReviewResult {
    // 解析AI返回的Markdown格式审查结果
    // 这里可以添加更复杂的解析逻辑
    return {
      summary: this.extractSummary(result),
      issues: this.extractIssues(result),
      suggestions: this.extractSuggestions(result),
      confidence: this.calculateConfidence(result),
    };
  }
}

7.2 与现有工作流引擎集成

这个库最初就是从工作流引擎中拆分出来的,所以与工作流引擎的集成非常自然。下面是一个简单的集成示例:

// 自定义工作流节点:AI代码生成
import { Node, WorkflowContext } from "your-workflow-engine";
import { getRuntimeAdapter } from "jsr:@korchasa/ai-ide-cli/runtime";

export class AICodeGenerationNode extends Node {
  private adapter: RuntimeAdapter;
  
  constructor(config: {
    runtime: string;
    model?: string;
    temperature?: number;
  }) {
    super("ai-code-generation");
    this.adapter = getRuntimeAdapter(config.runtime as any);
  }
  
  async execute(context: WorkflowContext): Promise<any> {
    const { prompt, context: codeContext } = context.input;
    
    // 构建完整的代码生成提示
    const fullPrompt = this.buildCodePrompt(prompt, codeContext);
    
    // 执行AI调用
    const { output, error } = await this.adapter.invoke({
      taskPrompt: fullPrompt,
      timeoutSeconds: context.config.timeout || 60,
      maxRetries: context.config.maxRetries || 1,
      
      // 传递工作流上下文
      extraArgs: [
        "--context", JSON.stringify({
          workflowId: context.workflowId,
          nodeId: this.id,
          timestamp: new Date().toISOString(),
        }),
      ],
      
      // 事件处理
      onEvent: (event) => {
        // 将事件发送到工作流事件总线
        context.emit("ai-event", {
          nodeId: this.id,
          eventType: event.type,
          data: event.raw,
          timestamp: Date.now(),
        });
      },
    });
    
    if (error) {
      throw new Error(`AI代码生成失败: ${error.message}`);
    }
    
    // 提取生成的代码
    const generatedCode = this.extractCode(output!.result || "");
    
    // 验证代码(可选)
    const validationResult = await this.validateCode(generatedCode);
    
    return {
      generatedCode,
      validationResult,
      cost: output!.total_cost_usd,
      sessionId: output!.session_id,
    };
  }
}

7.3 多运行时负载均衡

在生产环境中,你可能需要同时使用多个AI运行时,实现负载均衡和故障转移:

class AIRuntimeLoadBalancer {
  private adapters: Map<string, RuntimeAdapter>;
  private metrics: Map<string, RuntimeMetrics>;
  
  constructor() {
    this.adapters = new Map();
    this.metrics = new Map();
    
    // 初始化所有可用的运行时
    const availableRuntimes = this.detectAvailableRuntimes();
    for (const runtime of availableRuntimes) {
      this.adapters.set(runtime, getRuntimeAdapter(runtime));
      this.metrics.set(runtime, {
        totalRequests: 0,
        successfulRequests: 0,
        totalCost: 0,
        averageLatency: 0,
        lastError: null,
        lastSuccess: null,
      });
    }
  }
  
  async invokeWithLoadBalancing(
    options: InvokeOptions,
    strategy: "round-robin" | "least-cost" | "fastest" = "round-robin"
  ): Promise<CliRunOutput> {
    // 根据策略选择运行时
    const selectedRuntime = this.selectRuntime(strategy);
    
    if (!selectedRuntime) {
      throw new Error("没有可用的AI运行时");
    }
    
    const adapter = this.adapters.get(selectedRuntime)!;
    const startTime = Date.now();
    
    try {
      const { output, error } = await adapter.invoke(options);
      const endTime = Date.now();
      const latency = endTime - startTime;
      
      // 更新指标
      this.updateMetrics(selectedRuntime, {
        success: !error,
        latency,
        cost: output?.total_cost_usd || 0,
      });
      
      if (error) {
        throw error;
      }
      
      return output!;
      
    } catch (error) {
      // 记录错误
      this.recordError(selectedRuntime, error as Error);
      
      // 根据策略决定是否重试其他运行时
      if (this.shouldRetryWithOtherRuntime(error as Error)) {
        console.log(`运行时 ${selectedRuntime} 失败,尝试其他运行时`);
        return this.invokeWithLoadBalancing(options, strategy);
      }
      
      throw error;
    }
  }
  
  private selectRuntime(strategy: string): string | null {
    const available = Array.from(this.adapters.keys());
    
    if (available.length === 0) return null;
    
    switch (strategy) {
      case "round-robin":
        return this.roundRobinSelect(available);
      case "least-cost":
        return this.leastCostSelect(available);
      case "fastest":
        return this.fastestSelect(available);
      default:
        return available[0];
    }
  }
  
  private roundRobinSelect(runtimes: string[]): string {
    // 简单的轮询选择
    const lastUsed = this.metrics.get("_last_used")?.lastRuntime || 0;
    const nextIndex = (lastUsed + 1) % runtimes.length;
    
    this.metrics.set("_last_used", { lastRuntime: nextIndex } as any);
    return runtimes[nextIndex];
  }
  
  private leastCostSelect(runtimes: string[]): string {
    // 选择历史平均成本最低的
    return runtimes.reduce((cheapest, current) => {
      const cheapestMetrics = this.metrics.get(cheapest);
      const currentMetrics = this.metrics.get(current);
      
      if (!cheapestMetrics || !currentMetrics) return cheapest;
      
      return currentMetrics.totalCost / currentMetrics.totalRequests <
             cheapestMetrics.totalCost / cheapestMetrics.totalRequests
        ? current : cheapest;
    });
  }
}

这种负载均衡器特别适合需要高可用性的生产环境。你可以根据成本、延迟或成功率来智能选择运行时。

8. 性能优化与最佳实践

8.1 连接池与会话复用

对于高频调用的场景,创建和销毁进程的开销可能成为瓶颈。虽然这个库本身不提供连接池,但你可以基于它构建自己的池化机制:

class SessionPool {
  private pool: Map<string, RuntimeSession[]> = new Map();
  private maxPoolSize: number;
  private creationQueue: Map<string, Promise<RuntimeSession>> = new Map();
  
  constructor(maxPoolSize: number = 5) {
    this.maxPoolSize = maxPoolSize;
  }
  
  async acquire(
    runtime: string,
    options: SessionOptions
  ): Promise<{ session: RuntimeSession; release: () => void }> {
    const key = this.getPoolKey(runtime, options);
    
    // 从池中获取可用会话
    const session = this.getFromPool(key);
    if (session) {
      return { session, release: () => this.release(key, session) };
    }
    
    // 创建新会话(避免重复创建)
    const newSession = await this.createSession(runtime, options, key);
    return { session: newSession, release: () => this.release(key, newSession) };
  }
  
  private getFromPool(key: string): RuntimeSession | null {
    const pool = this.pool.get(key) || [];
    
    // 查找可用的会话
    for (let i = 0; i < pool.length; i++) {
      const session = pool[i];
      if (this.isSessionUsable(session)) {
        // 从池中移除
        pool.splice(i, 1);
        this.pool.set(key, pool);
        return session;
      }
    }
    
    return null;
  }
  
  private async createSession(
    runtime: string,
    options: SessionOptions,
    key: string
  ): Promise<RuntimeSession> {
    // 检查是否已经在创建中
    if (this.creationQueue.has(key)) {
      return this.creationQueue.get(key)!;
    }
    
    const creationPromise = (async () => {
      try {
        const adapter = getRuntimeAdapter(runtime as any);
        if (!adapter.capabilities.session) {
          throw new Error(`${runtime}不支持会话`);
        }
        
        const session = await adapter.openSession!(options);
        
        // 监听会话结束,自动清理
        session.done.then(() => {
          this.removeFromPool(key, session);
        }).catch(() => {
          this.removeFromPool(key, session);
        });
        
        return session;
      } finally {
        this.creationQueue.delete(key);
      }
    })();
    
    this.creationQueue.set(key, creationPromise);
    return creationPromise;
  }
  
  private release(key: string, session: RuntimeSession) {
    const pool = this.pool.get(key) || [];
    
    // 检查会话是否仍然可用
    if (!this.isSessionUsable(session)) {
      // 会话已失效,直接销毁
      session.abort("会话池释放").catch(() => {});
      return;
    }
    
    // 检查池是否已满
    if (pool.length >= this.maxPoolSize) {
      // 池已满,销毁最旧的会话
      const oldest = pool.shift();
      oldest?.abort("池已满").catch(() => {});
    }
    
    // 放回池中
    pool.push(session);
    this.pool.set(key, pool);
  }
  
  private isSessionUsable(session: RuntimeSession): boolean {
    // 检查会话是否仍然活跃
    // 这里需要根据具体的运行时实现来检查
    return true; // 简化实现
  }
}

8.2 批量处理与并发控制

当需要处理大量任务时,合理的并发控制很重要:

class BatchAITaskProcessor {
  private concurrencyLimit: number;
  private activeTasks: Set<Promise<any>> = new Set();
  
  constructor(concurrencyLimit: number = 3) {
    this.concurrencyLimit = concurrencyLimit;
  }
  
  async processBatch(
    tasks: Array<{ id: string; prompt: string }>,
    runtime: string = "claude"
  ): Promise<Map<string, string>> {
    const results = new Map<string, string>();
    const adapter = getRuntimeAdapter(runtime);
    
    // 使用信号量控制并发
    const semaphore = new Semaphore(this.concurrencyLimit);
    
    const promises = tasks.map(async (task) => {
      await semaphore.acquire();
      
      try {
        console.log(`开始处理任务: ${task.id}`);
        const startTime = Date.now();
        
        const { output, error } = await adapter.invoke({
          taskPrompt: task.prompt,
          timeoutSeconds: 30,
        });
        
        const duration = Date.now() - startTime;
        console.log(`任务 ${task.id} 完成,耗时: ${duration}ms`);
        
        if (error) {
          results.set(task.id, `错误: ${error.message}`);
        } else {
          results.set(task.id, output?.result || "");
        }
        
      } catch (error) {
        results.set(task.id, `异常: ${(error as Error).message}`);
      } finally {
        semaphore.release();
      }
    });
    
    await Promise.all(promises);
    return results;
  }
}

// 简单的信号量实现
class Semaphore {
  private permits: number;
  private queue: Array<() => void> = [];
  
  constructor(permits: number) {
    this.permits = permits;
  }
  
  acquire(): Promise<void> {
    if (this.permits > 0) {
      this.permits--;
      return Promise.resolve();
    }
    
    return new Promise(resolve => {
      this.queue.push(resolve);
    });
  }
  
  release(): void {
    this.permits++;
    const nextResolver = this.queue.shift();
    if (nextResolver) {
      nextResolver();
    }
  }
}

8.3 监控与指标收集

在生产环境中,监控AI运行时的使用情况非常重要:

interface AIMetrics {
  invocationCount: number;
  successCount: number;
  failureCount: number;
  totalCost: number;
  totalLatency: number;
  byRuntime: Map<string, RuntimeMetrics>;
  byHour: Map<string, HourlyMetrics>;
}

class AIMonitor {
  private metrics: AIMetrics;
  private exporters: MetricsExporter[] = [];
  
  constructor() {
    this.metrics = {
      invocationCount: 0,
      successCount: 0,
      failureCount: 0,
      totalCost: 0,
      totalLatency: 0,
      byRuntime: new Map(),
      byHour: new Map(),
    };
  }
  
  recordInvocationStart(runtime: string, options: InvokeOptions) {
    const invocationId = crypto.randomUUID();
    const startTime = Date.now();
    
    // 记录开始时间
    const hourKey = this.getHourKey();
    this.ensureHourlyMetrics(hourKey);
    
    return {
      invocationId,
      onComplete: (result: { output?: CliRunOutput; error?: any }) => {
        const endTime = Date.now();
        const latency = endTime - startTime;
        const cost = result.output?.total_cost_usd || 0;
        const success = !result.error;
        
        this.updateMetrics(runtime, hourKey, {
          success,
          latency,
          cost,
        });
        
        // 导出到监控系统
        this.exportMetrics({
          invocationId,
          runtime,
          success,
          latency,
          cost,
          timestamp: new Date().toISOString(),
          taskLength: options.taskPrompt.length,
        });
      },
    };
  }
  
  private updateMetrics(
    runtime: string,
    hourKey: string,
    data: { success: boolean; latency: number; cost: number }
  ) {
    // 更新全局指标
    this.metrics.invocationCount++;
    if (data.success) {
      this.metrics.successCount++;
    } else {
      this.metrics.failureCount++;
    }
    this.metrics.totalCost += data.cost;
    this.metrics.totalLatency += data.latency;
    
    // 更新运行时特定指标
    const runtimeMetrics = this.metrics.byRuntime.get(runtime) || {
      invocationCount: 0,
      successCount: 0,
      failureCount: 0,
      totalCost: 0,
      totalLatency: 0,
    };
    
    runtimeMetrics.invocationCount++;
    if (data.success) {
      runtimeMetrics.successCount++;
    } else {
      runtimeMetrics.failureCount++;
    }
    runtimeMetrics.totalCost += data.cost;
    runtimeMetrics.totalLatency += data.latency;
    
    this.metrics.byRuntime.set(runtime, runtimeMetrics);
    
    // 更新小时级指标
    const hourlyMetrics = this.metrics.byHour.get(hourKey)!;
    hourlyMetrics.invocationCount++;
    hourlyMetrics.totalCost += data.cost;
    hourlyMetrics.totalLatency += data.latency;
  }
  
  getMetrics(): AIMetrics {
    return {
      ...this.metrics,
      // 计算衍生指标
      successRate: this.metrics.successCount / this.metrics.invocationCount,
      averageLatency: this.metrics.totalLatency / this.metrics.invocationCount,
      averageCost: this.metrics.totalCost / this.metrics.invocationCount,
    };
  }
}

8.4 配置管理与环境隔离

在实际部署中,不同的环境(开发、测试、生产)可能需要不同的配置:

interface RuntimeConfig {
  defaultRuntime: string;
  timeouts: {
    development: number;
    testing: number;
    production: number;
  };
  retryStrategies: {
    [runtime: string]: RetryStrategy;
  };
  costLimits: {
    daily: number;
    perInvocation: number;
  };
}

class ConfigManager {
  private config: RuntimeConfig;
  private environment: string;
  
  constructor(environment: string = Deno.env.get("NODE_ENV") || "development") {
    this.environment = environment;
    this.config = this.loadConfig();
  }
  
  private loadConfig(): RuntimeConfig {
    // 从多个来源加载配置
    const defaults: RuntimeConfig = {
      defaultRuntime: "claude",
      timeouts: {
        development: 30,
        testing: 60,
        production: 120,
      },
      retryStrategies: {
        claude: { maxRetries: 3, retryDelaySeconds: 1 },
        opencode: { maxRetries: 2, retryDelaySeconds: 2 },
        cursor: { maxRetries: 1, retryDelaySeconds: 5 },
        codex: { maxRetries: 3, retryDelaySeconds: 1 },
      },
      costLimits: {
        daily: 100, // 每天最多100美元
        perInvocation: 10, // 每次调用最多10美元
      },
    };
    
    // 从环境变量覆盖
    const envConfig = this.loadFromEnv();
    
    // 从配置文件覆盖
    const fileConfig = this.loadFromFile();
    
    return this.mergeConfigs(defaults, envConfig, fileConfig);
  }
  
  getInvocationOptions(baseOptions: InvokeOptions): InvokeOptions {
    const timeout = this.config.timeouts[this.environment as keyof typeof this.config.timeouts];
    const retryStrategy = this.config.retryStrategies[baseOptions.runtime || this.config.defaultRuntime];
    
    return {
      ...baseOptions,
      timeoutSeconds: baseOptions.timeoutSeconds || timeout,
      maxRetries: baseOptions.maxRetries || retryStrategy.maxRetries,
      retryDelaySeconds: baseOptions.retryDelaySeconds || retryStrategy.retryDelaySeconds,
      
      // 添加环境标签
      extraArgs: [
        ...(baseOptions.extraArgs || []),
        `--environment=${this.environment}`,
      ],
    };
  }
  
  checkCostLimit(cost: number): boolean {
    const dailyCost = this.getDailyCost();
    return dailyCost + cost <= this.config.costLimits.daily && 
           cost <= this.config.costLimits.perInvocation;
  }
}

这些最佳实践来自我在实际项目中的经验总结。特别是在处理高并发和成本控制时,这些模式被证明非常有效。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐