Claude API Streaming 怎么实现?Python + Node.js 两种方案实测(2026)
上周三帮朋友的创业团队搞一个客服对话系统,需求很简单——用 Claude Opus 4.7 做流式输出,让用户看到"打字机效果",别干等着一整坨文字刷出来。我心想这不就改个参数的事嘛,结果折腾了大半天,踩了好几个坑。把过程记录下来,省得你们再走弯路。Claude API 的 streaming 走的是 SSE(Server-Sent Events)协议,逐 token 往回吐内容。你可以用 Ant
上周三帮朋友的创业团队搞一个客服对话系统,需求很简单——用 Claude Opus 4.7 做流式输出,让用户看到"打字机效果",别干等着一整坨文字刷出来。我心想这不就改个参数的事嘛,结果折腾了大半天,踩了好几个坑。把过程记录下来,省得你们再走弯路。
Claude API 的 streaming 走的是 SSE(Server-Sent Events)协议,逐 token 往回吐内容。你可以用 Anthropic 原生 SDK,也可以用 OpenAI 兼容协议来调。两种方式我都测了,下面直接上代码和实测数据。
先说结论
| 维度 | Anthropic 原生 SDK | OpenAI 兼容协议 |
|---|---|---|
| 首 token 延迟 | 380ms 左右 | 420ms 左右 |
| 代码改动量 | 需要学 Anthropic 的事件类型 | 改个 base_url 就行 |
| 工具生态 | Anthropic 专用 | Cursor/Cherry Studio 等通用 |
| 错误处理 | 事件粒度更细 | 和 OpenAI 一样的 try-catch |
如果你项目里已经在用 OpenAI SDK,走兼容协议最省事。如果你需要 Anthropic 独有的功能(比如 content_block_delta 级别的事件控制),就用原生 SDK。
环境准备
Python 3.10+,装两个包:
pip install anthropic openai
Node.js 那边:
npm install @anthropic-ai/sdk openai
API Key 的话,你可以直接用 Anthropic 官方的,也可以用聚合平台的。我这边测试用的是 ofox.ai 的 Key,它同时支持 Anthropic 原生协议和 OpenAI 兼容协议,改 base_url 就能切换,OpenRouter 也类似但会额外收 5.5% 手续费。
整体调用链路长这样:
sequenceDiagram
participant Client as 你的代码
participant Gateway as API 网关
participant Claude as Claude Opus 4.7
Client->>Gateway: POST /v1/messages (stream=true)
Gateway->>Claude: 转发请求
Claude-->>Gateway: SSE: content_block_start
Gateway-->>Client: SSE: content_block_start
loop 逐 token 返回
Claude-->>Gateway: SSE: content_block_delta
Gateway-->>Client: SSE: content_block_delta
end
Claude-->>Gateway: SSE: message_stop
Gateway-->>Client: SSE: message_stop
方案一:Anthropic 原生 SDK(Python)
这是最"正统"的写法。Anthropic 的 streaming 事件类型比 OpenAI 多不少,一开始看文档有点懵。
import anthropic
client = anthropic.Anthropic(
api_key="your-api-key",
base_url="https://api.ofox.ai/anthropic" # 也可以用官方 https://api.anthropic.com
)
# 流式调用
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "用 Python 写一个快速排序,加上详细注释"}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # 换行
# 拿最终的完整 message 对象
final_message = stream.get_final_message()
print(f"\n总 token: {final_message.usage.input_tokens + final_message.usage.output_tokens}")
跑一下,效果很丝滑。但我一开始犯了个蠢——把 stream=True 当参数传给 messages.create(),结果返回的是原始 SSE 事件流,得自己解析。正确的姿势是用 .stream() 这个上下文管理器,SDK 帮你把事件解析好了。
如果你需要更细粒度的事件控制(比如监听 input_json 用于 tool use streaming),可以用事件回调:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "北京今天天气怎么样?"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_stop":
print("\n--- 生成结束 ---")
实测 Claude Sonnet 4.6 生成 500 token 的代码片段,首 token 延迟 P50 在 340ms,P95 大概 520ms。Opus 4.7 慢一点,P50 约 580ms——模型大,没办法。
方案二:OpenAI 兼容协议
如果你项目里已经有一堆 OpenAI 的调用代码,不想改太多,直接走兼容协议最省事。
Python 版
from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://api.ofox.ai/v1"
)
stream = client.chat.completions.create(
model="claude-sonnet-4-20250514",
messages=[
{"role": "system", "content": "你是一个资深 Python 开发者"},
{"role": "user", "content": "解释一下 Python 的 GIL 锁,说人话"}
],
stream=True,
max_tokens=1024
)
full_content = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_content += content
print(f"\n\n生成了约 {len(full_content)} 个字符")
Node.js 版
import OpenAI from 'openai';
const client = new OpenAI({
apiKey: 'your-api-key',
baseURL: 'https://api.ofox.ai/v1',
});
async function streamChat() {
const stream = await client.chat.completions.create({
model: 'claude-sonnet-4-20250514',
messages: [
{ role: 'user', content: '用 TypeScript 写一个简单的 Express 中间件' }
],
stream: true,
max_tokens: 1024,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
process.stdout.write(content);
}
console.log('\n--- done ---');
}
streamChat().catch(console.error);
兼容协议的好处是代码几乎和调 GPT-5.5 一模一样,换个 model 名就行。坏处是你拿不到 Anthropic 原生的那些细粒度事件(比如 content_block_start 里的 block index)。
踩坑记录
坑 1:stream=True 和 .stream() 不是一回事
Anthropic SDK 里,messages.create(stream=True) 返回的是 MessageStreamManager,你得自己处理原始事件。而 .stream() 是个高级封装,帮你做了事件聚合。我一开始混用,拿到的对象类型不对,报了个:
AttributeError: 'MessageStreamManager' object has no attribute 'text_stream'
折腾了二十分钟才反应过来。
坑 2:超时设置
默认的 HTTP 超时对 streaming 来说太短了。Claude 生成长文本(2000+ token)可能要跑 30 秒以上,你得显式设长一点:
client = anthropic.Anthropic(
api_key="your-key",
timeout=120.0 # 秒
)
不然生成到一半就断了,报 httpx.ReadTimeout。这个报错信息还算友好,至少知道是超时,不像有些 API 直接返回个空 body 让你猜。
坑 3:并发 streaming 的连接数限制
我们压测的时候开了 20 个并发 stream,结果有几个请求直接返回 429 Too Many Requests:
{"error":{"type":"rate_limit_error","message":"Number of request tokens has exceeded your per-minute rate limit"}}
Anthropic 的 rate limit 是按 token/分钟算的,不只是按请求数。20 个并发 stream 每个都在持续消耗 output token 配额。解决办法是要么升 tier,要么加个队列控制并发数。我们最后控制在 8 个并发,就稳了。
坑 4:Node.js 里 for await 的错误处理
这个比较隐蔽。如果 stream 中途断了(比如网络抖动),for await 会直接抛异常,你的已接收内容就丢了。得包一层:
let received = '';
try {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
received += content;
process.stdout.write(content);
}
} catch (err) {
console.error('Stream 中断:', err.message);
console.log('已接收内容:', received); // 至少保住已有的
}
小结
Claude 的 streaming 实现起来不复杂,核心就两条路:原生 SDK 用 .stream() 上下文管理器,兼容协议用 stream=True 迭代 chunk。原生 SDK 事件粒度更细,适合需要精确控制的场景(tool use streaming、content block 级别的处理);兼容协议改动最小,适合已有 OpenAI 代码的项目直接迁移。
超时和并发限制是最容易踩的两个坑,建议一开始就把 timeout 设到 120 秒以上,并发控制在 10 以内。首 token 延迟方面,Sonnet 4.6 大概 340-520ms,Opus 4.7 再慢个 200ms 左右,目前没找到更好的优化办法——瓶颈在模型推理本身。
更多推荐

所有评论(0)