上周三帮朋友的创业团队搞一个客服对话系统,需求很简单——用 Claude Opus 4.7 做流式输出,让用户看到"打字机效果",别干等着一整坨文字刷出来。我心想这不就改个参数的事嘛,结果折腾了大半天,踩了好几个坑。把过程记录下来,省得你们再走弯路。

Claude API 的 streaming 走的是 SSE(Server-Sent Events)协议,逐 token 往回吐内容。你可以用 Anthropic 原生 SDK,也可以用 OpenAI 兼容协议来调。两种方式我都测了,下面直接上代码和实测数据。

先说结论

维度 Anthropic 原生 SDK OpenAI 兼容协议
首 token 延迟 380ms 左右 420ms 左右
代码改动量 需要学 Anthropic 的事件类型 改个 base_url 就行
工具生态 Anthropic 专用 Cursor/Cherry Studio 等通用
错误处理 事件粒度更细 和 OpenAI 一样的 try-catch

如果你项目里已经在用 OpenAI SDK,走兼容协议最省事。如果你需要 Anthropic 独有的功能(比如 content_block_delta 级别的事件控制),就用原生 SDK。

环境准备

Python 3.10+,装两个包:

pip install anthropic openai

Node.js 那边:

npm install @anthropic-ai/sdk openai

API Key 的话,你可以直接用 Anthropic 官方的,也可以用聚合平台的。我这边测试用的是 ofox.ai 的 Key,它同时支持 Anthropic 原生协议和 OpenAI 兼容协议,改 base_url 就能切换,OpenRouter 也类似但会额外收 5.5% 手续费。

整体调用链路长这样:

sequenceDiagram
 participant Client as 你的代码
 participant Gateway as API 网关
 participant Claude as Claude Opus 4.7

 Client->>Gateway: POST /v1/messages (stream=true)
 Gateway->>Claude: 转发请求
 Claude-->>Gateway: SSE: content_block_start
 Gateway-->>Client: SSE: content_block_start
 loop 逐 token 返回
 Claude-->>Gateway: SSE: content_block_delta
 Gateway-->>Client: SSE: content_block_delta
 end
 Claude-->>Gateway: SSE: message_stop
 Gateway-->>Client: SSE: message_stop

方案一:Anthropic 原生 SDK(Python)

这是最"正统"的写法。Anthropic 的 streaming 事件类型比 OpenAI 多不少,一开始看文档有点懵。

import anthropic

client = anthropic.Anthropic(
 api_key="your-api-key",
 base_url="https://api.ofox.ai/anthropic" # 也可以用官方 https://api.anthropic.com
)

# 流式调用
with client.messages.stream(
 model="claude-sonnet-4-20250514",
 max_tokens=1024,
 messages=[
 {"role": "user", "content": "用 Python 写一个快速排序,加上详细注释"}
 ]
) as stream:
 for text in stream.text_stream:
 print(text, end="", flush=True)

print() # 换行

# 拿最终的完整 message 对象
final_message = stream.get_final_message()
print(f"\n总 token: {final_message.usage.input_tokens + final_message.usage.output_tokens}")

跑一下,效果很丝滑。但我一开始犯了个蠢——把 stream=True 当参数传给 messages.create(),结果返回的是原始 SSE 事件流,得自己解析。正确的姿势是用 .stream() 这个上下文管理器,SDK 帮你把事件解析好了。

如果你需要更细粒度的事件控制(比如监听 input_json 用于 tool use streaming),可以用事件回调:

with client.messages.stream(
 model="claude-sonnet-4-20250514",
 max_tokens=1024,
 messages=[{"role": "user", "content": "北京今天天气怎么样?"}]
) as stream:
 for event in stream:
 if event.type == "content_block_delta":
 if event.delta.type == "text_delta":
 print(event.delta.text, end="", flush=True)
 elif event.type == "message_stop":
 print("\n--- 生成结束 ---")

实测 Claude Sonnet 4.6 生成 500 token 的代码片段,首 token 延迟 P50 在 340ms,P95 大概 520ms。Opus 4.7 慢一点,P50 约 580ms——模型大,没办法。

方案二:OpenAI 兼容协议

如果你项目里已经有一堆 OpenAI 的调用代码,不想改太多,直接走兼容协议最省事。

Python 版

from openai import OpenAI

client = OpenAI(
 api_key="your-api-key",
 base_url="https://api.ofox.ai/v1"
)

stream = client.chat.completions.create(
 model="claude-sonnet-4-20250514",
 messages=[
 {"role": "system", "content": "你是一个资深 Python 开发者"},
 {"role": "user", "content": "解释一下 Python 的 GIL 锁,说人话"}
 ],
 stream=True,
 max_tokens=1024
)

full_content = ""
for chunk in stream:
 if chunk.choices[0].delta.content:
 content = chunk.choices[0].delta.content
 print(content, end="", flush=True)
 full_content += content

print(f"\n\n生成了约 {len(full_content)} 个字符")

Node.js 版

import OpenAI from 'openai';

const client = new OpenAI({
 apiKey: 'your-api-key',
 baseURL: 'https://api.ofox.ai/v1',
});

async function streamChat() {
 const stream = await client.chat.completions.create({
 model: 'claude-sonnet-4-20250514',
 messages: [
 { role: 'user', content: '用 TypeScript 写一个简单的 Express 中间件' }
 ],
 stream: true,
 max_tokens: 1024,
 });

 for await (const chunk of stream) {
 const content = chunk.choices[0]?.delta?.content || '';
 process.stdout.write(content);
 }
 console.log('\n--- done ---');
}

streamChat().catch(console.error);

兼容协议的好处是代码几乎和调 GPT-5.5 一模一样,换个 model 名就行。坏处是你拿不到 Anthropic 原生的那些细粒度事件(比如 content_block_start 里的 block index)。

踩坑记录

坑 1:stream=True.stream() 不是一回事

Anthropic SDK 里,messages.create(stream=True) 返回的是 MessageStreamManager,你得自己处理原始事件。而 .stream() 是个高级封装,帮你做了事件聚合。我一开始混用,拿到的对象类型不对,报了个:

AttributeError: 'MessageStreamManager' object has no attribute 'text_stream'

折腾了二十分钟才反应过来。

坑 2:超时设置

默认的 HTTP 超时对 streaming 来说太短了。Claude 生成长文本(2000+ token)可能要跑 30 秒以上,你得显式设长一点:

client = anthropic.Anthropic(
 api_key="your-key",
 timeout=120.0 # 秒
)

不然生成到一半就断了,报 httpx.ReadTimeout。这个报错信息还算友好,至少知道是超时,不像有些 API 直接返回个空 body 让你猜。

坑 3:并发 streaming 的连接数限制

我们压测的时候开了 20 个并发 stream,结果有几个请求直接返回 429 Too Many Requests

{"error":{"type":"rate_limit_error","message":"Number of request tokens has exceeded your per-minute rate limit"}}

Anthropic 的 rate limit 是按 token/分钟算的,不只是按请求数。20 个并发 stream 每个都在持续消耗 output token 配额。解决办法是要么升 tier,要么加个队列控制并发数。我们最后控制在 8 个并发,就稳了。

坑 4:Node.js 里 for await 的错误处理

这个比较隐蔽。如果 stream 中途断了(比如网络抖动),for await 会直接抛异常,你的已接收内容就丢了。得包一层:

let received = '';
try {
 for await (const chunk of stream) {
 const content = chunk.choices[0]?.delta?.content || '';
 received += content;
 process.stdout.write(content);
 }
} catch (err) {
 console.error('Stream 中断:', err.message);
 console.log('已接收内容:', received); // 至少保住已有的
}

小结

Claude 的 streaming 实现起来不复杂,核心就两条路:原生 SDK 用 .stream() 上下文管理器,兼容协议用 stream=True 迭代 chunk。原生 SDK 事件粒度更细,适合需要精确控制的场景(tool use streaming、content block 级别的处理);兼容协议改动最小,适合已有 OpenAI 代码的项目直接迁移。

超时和并发限制是最容易踩的两个坑,建议一开始就把 timeout 设到 120 秒以上,并发控制在 10 以内。首 token 延迟方面,Sonnet 4.6 大概 340-520ms,Opus 4.7 再慢个 200ms 左右,目前没找到更好的优化办法——瓶颈在模型推理本身。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐