AI时代高效设计开发:Anthropic → OpenAI 协议转换插件实战
摘要: 本文介绍了APISIX的Anthropic与OpenAI协议互转插件,支持流式处理、思考状态、工具调用与多模态功能。通过复用Higress等项目的代码实现,插件能双向转换Claude Messages API与OpenAI Chat Completions协议,覆盖非流式/流式请求、thinking状态、tool_use及多模态图片处理。核心功能包括请求/响应格式转换、动态字段处理(如移除
·
APISIX Anthropic 与 OpenAI 协议互转插件:支持流式、思考、工具调用与多模态
AI时代高效设计开发:Anthropic → OpenAI 协议转换插件实战
一、参考已有实现
复用 Higress、LiteLLM 等相似项目的代码实现,让 AI 直接参考。
二、准备测试用例
整理 CURL 示例等场景,让 AI 自行校验验证结果。
三、明确功能边界
| 阶段 | 转换内容 |
|---|---|
| 请求 | Anthropic Messages API → OpenAI Chat Completions |
| 响应 | OpenAI Chat Completions → Anthropic Messages API |
| 支持 | 非流式/流式、thinking、tool_use、多模态图片 |
四、Vibe Coding
--
-- APISIX Plugin: claude2openai
-- 参考实现: Higress ai-proxy provider/claude_to_openai.go
--
-- 功能:
-- 1. 请求阶段:将 Claude Messages API 请求转换为 OpenAI Chat Completions 请求
-- 2. 响应阶段:将 OpenAI Chat Completions 响应转换为 Claude Messages API 响应
-- 支持:非流式、流式、thinking、tool_use、多模态图片
--
local core = require("apisix.core")
local cjson = require("cjson.safe")
local ngx = ngx
local pairs = pairs
local ipairs = ipairs
local type = type
local table_insert = table.insert
local table_concat = table.concat
local string_sub = string.sub
local string_find = string.find
local ngx_now = ngx.now
local plugin_name = "jiankunking-claude2openai"
local schema = {
type = "object",
properties = {
target_model = {
type = "string",
description = "Override the model name sent to OpenAI backend"
},
},
}
local _M = {
version = 0.1,
priority = 2650, -- 在 consumer-restriction(2640) 和 key-auth(2500) 之前执行
name = plugin_name,
schema = schema,
}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
------------------------------------------------------------------------
-- 工具函数
------------------------------------------------------------------------
-- OpenAI finish_reason -> Claude stop_reason (参考 Higress openAIFinishReasonToClaude)
local function openai_finish_reason_to_claude(reason)
if not reason or reason == cjson.null then return "end_turn" end
local mapping = {
stop = "end_turn",
length = "max_tokens",
tool_calls = "tool_use",
content_filter = "end_turn",
}
return mapping[reason] or reason
end
-- 移除 x-anthropic-billing-header 中的动态 cch 字段以支持 prompt caching
-- 参考 Higress stripCchFromBillingHeader
-- cch 值每次请求都会变化,如果不移除会导致缓存失效
local function strip_cch_from_billing_header(text)
if type(text) ~= "string" then return text end
local prefix = "x-anthropic-billing-header:"
if string_sub(text, 1, #prefix) ~= prefix then
return text
end
-- 循环移除所有"; cch=xxx" 模式
local result = text
while true do
local cch_start = string_find(result, "; cch=", 1, true)
if not cch_start then break end
local after = cch_start + 2 -- skip "; "
local semi_pos = string_find(result, ";", after, true)
if not semi_pos then
-- cch 在末尾,直接截断
result = string_sub(result, 1, cch_start - 1)
else
-- cch 后面还有内容,移除 "; cch=xxx" 部分
result = string_sub(result, 1, cch_start - 1) .. string_sub(result, semi_pos)
end
end
return result
end
-- Claude content blocks 中提取纯文本部分
local function extract_text_parts(content_array)
local parts = {}
if type(content_array) ~= "table" then return parts end
for _, block in ipairs(content_array) do
if block.type == "text" and block.text then
table_insert(parts, block.text)
end
end
return parts
end
------------------------------------------------------------------------
-- 请求转换: Claude Messages -> OpenAI Chat Completions
-- 参Higress ConvertClaudeRequestToOpenAI
------------------------------------------------------------------------
-- 转换 Claude content blocks 数组为 OpenAI 格式
-- 返回 { text_parts, tool_calls, tool_results, openai_contents }
local function convert_content_array(content_array)
local result = {
text_parts = {},
tool_calls = {},
tool_results = {},
openai_contents = {},
}
if type(content_array) ~= "table" then return result end
for _, block in ipairs(content_array) do
if block.type == "text" and block.text then
local processed_text = strip_cch_from_billing_header(block.text)
table_insert(result.text_parts, processed_text)
local openai_content = {
type = "text",
text = processed_text
}
-- 透传 cache_control 以支持 prompt caching
if block.cache_control then
openai_content.cache_control = block.cache_control
end
table_insert(result.openai_contents, openai_content)
elseif (block.type == "image" or block.type == "document") and block.source then
-- Claude: {type:"image"/"document", source:{type:"base64", media_type:"...", data:"..."}}
-- OpenAI: {type:"image_url", image_url:{url:"data:...;base64,..."}}
local img_obj
if block.source.type == "base64" then
img_obj = {
type = "image_url",
image_url = {
url = "data:" .. (block.source.media_type or "image/png")
.. ";base64," .. block.source.data
}
}
elseif block.source.type == "url" then
img_obj = {
type = "image_url",
image_url = { url = block.source.url }
}
end
if img_obj then
if block.cache_control then
img_obj.cache_control = block.cache_control
end
table_insert(result.openai_contents, img_obj)
end
elseif block.type == "thinking" and block.thinking then
-- Claude assistant 历史消息中的 thinking block -> OpenAI thinking block
local thinking_obj = {
type = "thinking",
thinking = block.thinking,
signature = block.signature or ""
}
table_insert(result.openai_contents, thinking_obj)
elseif block.type == "redacted_thinking" then
-- Claude redacted_thinking block -> 透传
table_insert(result.openai_contents, {
type = "redacted_thinking",
data = block.data or ""
})
elseif block.type == "tool_use" then
-- Claude tool_use -> OpenAI tool_calls
if block.id and block.name then
table_insert(result.tool_calls, {
id = block.id,
type = "function",
["function"] = {
name = block.name,
arguments = cjson.encode(block.input or {}) or "{}"
}
})
core.log.debug("[Claude->OpenAI] Converted tool_use to tool_call: ", block.name)
end
elseif block.type == "tool_result" then
table_insert(result.tool_results, block)
core.log.debug("[Claude->OpenAI] Found tool_result for tool_use_id: ", block.tool_use_id or "")
end
end
return result
end
local function convert_claude_request_to_openai(claude_body, conf)
core.log.debug("[Claude->OpenAI] Original Claude request body: ", cjson.encode(claude_body))
local openai_body = {
model = conf.target_model or claude_body.model,
max_tokens = claude_body.max_tokens,
temperature = claude_body.temperature,
top_p = claude_body.top_p,
stop = claude_body.stop_sequences,
stream = claude_body.stream,
}
if openai_body.stream then
openai_body.stream_options = { include_usage = true }
end
local messages = {}
-- system: Claude 顶层字段 -> OpenAI system message(插入到 messages 最前面)
if claude_body.system then
if type(claude_body.system) == "string" then
table_insert(messages, { role = "system", content = strip_cch_from_billing_header(claude_body.system) })
elseif type(claude_body.system) == "table" then
-- system 可以是 content blocks 数组
-- 参考 litellm: 过滤掉 x-anthropic-billing-header 开头的 block,空文本 block 也跳过
local sys_contents = {}
for _, block in ipairs(claude_body.system) do
if block.type == "text" and block.text then
if block.text == "" then
-- 跳过空文本(Anthropic API 会报错)
elseif string_sub(block.text, 1, 28) == "x-anthropic-billing-header:" then
-- 跳过 billing header metadata
core.log.debug("[Claude->OpenAI] Skipping billing header system block")
else
local sys_block = {
type = "text",
text = strip_cch_from_billing_header(block.text)
}
if block.cache_control then
sys_block.cache_control = block.cache_control
end
table_insert(sys_contents, sys_block)
end
end
end
if #sys_contents > 0 then
table_insert(messages, { role = "system", content = sys_contents })
end
end
end
-- messages 转换
if claude_body.messages then
for _, msg in ipairs(claude_body.messages) do
if type(msg.content) == "string" then
-- 简单文本
table_insert(messages, { role = msg.role, content = msg.content })
elseif type(msg.content) == "table" then
local conv = convert_content_array(msg.content)
-- 有 tool_calls(assistant 消息中的 tool_use)
if #conv.tool_calls > 0 then
local openai_msg = {
role = msg.role,
tool_calls = conv.tool_calls,
}
if #conv.text_parts > 0 then
openai_msg.content = table_concat(conv.text_parts, "\n\n")
else
openai_msg.content = cjson.null
end
table_insert(messages, openai_msg)
end
-- 有 tool_results(user 消息中的 tool_result)
if #conv.tool_results > 0 then
for _, tr in ipairs(conv.tool_results) do
local tool_content
if type(tr.content) == "string" then
tool_content = tr.content
elseif type(tr.content) == "table" then
-- 参考 litellm: tool_result.content 可能包含 text + image 混合内容
local content_parts = {}
local has_non_text = false
for _, c in ipairs(tr.content) do
if type(c) == "string" then
table_insert(content_parts, { type = "text", text = c })
elseif type(c) == "table" then
if c.type == "text" then
table_insert(content_parts, { type = "text", text = c.text or "" })
elseif (c.type == "image" or c.type == "document") and c.source then
has_non_text = true
if c.source.type == "base64" then
table_insert(content_parts, {
type = "image_url",
image_url = {
url = "data:" .. (c.source.media_type or "image/png")
.. ";base64," .. c.source.data
}
})
elseif c.source.type == "url" then
table_insert(content_parts, {
type = "image_url",
image_url = { url = c.source.url }
})
end
end
end
end
-- 如果只有纯文本,合并为 string(兼容性更好)
if not has_non_text then
local texts = {}
for _, p in ipairs(content_parts) do
if p.type == "text" then table_insert(texts, p.text) end
end
tool_content = table_concat(texts, "\n")
else
-- 混合内容(text + image),保持数组格式
tool_content = content_parts
end
else
tool_content = ""
end
table_insert(messages, {
role = "tool",
tool_call_id = tr.tool_use_id,
content = tool_content
})
end
-- tool_result 旁边可能还有 text
if #conv.text_parts > 0 then
table_insert(messages, {
role = msg.role,
content = table_concat(conv.text_parts, "\n\n")
})
end
end
-- 普通内容(无 tool_calls 也无 tool_results)
if #conv.tool_calls == 0 and #conv.tool_results == 0 then
-- 如果只有 text,简化为 string
if #conv.openai_contents == 1 and conv.openai_contents[1].type == "text" then
table_insert(messages, {
role = msg.role,
content = conv.openai_contents[1].text
})
else
table_insert(messages, {
role = msg.role,
content = conv.openai_contents
})
end
end
end
end
end
openai_body.messages = messages
-- tools 转换: Claude {name, description, input_schema} -> OpenAI {type, function}
if claude_body.tools then
local openai_tools = {}
for _, tool in ipairs(claude_body.tools) do
table_insert(openai_tools, {
type = "function",
["function"] = {
name = tool.name,
description = tool.description,
parameters = tool.input_schema
}
})
end
openai_body.tools = openai_tools
end
-- tool_choice 转换
if claude_body.tool_choice then
local tc = claude_body.tool_choice
if tc.type == "auto" then
openai_body.tool_choice = "auto"
elseif tc.type == "any" then
openai_body.tool_choice = "required"
elseif tc.type == "tool" and tc.name then
openai_body.tool_choice = {
type = "function",
["function"] = { name = tc.name }
}
end
-- parallel_tool_calls
if tc.disable_parallel_tool_use ~= nil then
openai_body.parallel_tool_calls = not tc.disable_parallel_tool_use
end
end
-- thinking 转换: Claude {type:"enabled", budget_tokens:N} -> OpenAI reasoning 参数
if claude_body.thinking then
core.log.debug("[Claude->OpenAI] Found thinking config: type=", claude_body.thinking.type,
", budget_tokens=", claude_body.thinking.budget_tokens or 0)
if claude_body.thinking.type == "enabled" then
local budget = claude_body.thinking.budget_tokens or 8192
openai_body.reasoning_max_tokens = budget
if budget < 4096 then
openai_body.reasoning_effort = "low"
elseif budget < 16384 then
openai_body.reasoning_effort = "medium"
else
openai_body.reasoning_effort = "high"
end
core.log.debug("[Claude->OpenAI] Converted thinking config: budget_tokens=", budget,
", reasoning_effort=", openai_body.reasoning_effort,
", reasoning_max_tokens=", openai_body.reasoning_max_tokens)
end
end
-- output_format 转换: Claude {type:"json_schema", schema:{...}} -> OpenAI response_format
-- 参考 litellm translate_anthropic_output_format_to_openai
if claude_body.output_format and type(claude_body.output_format) == "table" then
if claude_body.output_format.type == "json_schema" and claude_body.output_format.schema then
openai_body.response_format = {
type = "json_schema",
json_schema = {
name = "structured_output",
schema = claude_body.output_format.schema,
strict = true,
}
}
core.log.debug("[Claude->OpenAI] Converted output_format to response_format: json_schema")
end
end
core.log.debug("[Claude->OpenAI] Converted OpenAI request body: ", cjson.encode(openai_body))
return openai_body
end
------------------------------------------------------------------------
-- 非流式响应转换: OpenAI -> Claude
-- 参考 Higress ConvertOpenAIResponseToClaude
------------------------------------------------------------------------
local function convert_openai_response_to_claude(openai_body, conf)
core.log.debug("[OpenAI->Claude] Original OpenAI response body: ", cjson.encode(openai_body))
-- 上游返回 error 对象时,直接透传真实错误信息给前端
-- 注意:GLM 等模型可能不返回 error 字段,但其他模型可能返回 null,需排除 cjson.null
if openai_body and openai_body.error and openai_body.error ~= cjson.null then
local err_msg = "Unknown upstream error"
if type(openai_body.error) == "table" then
err_msg = openai_body.error.message or cjson.encode(openai_body.error)
elseif type(openai_body.error) == "string" then
err_msg = openai_body.error
end
core.log.debug("[OpenAI->Claude] Upstream returned error: ", err_msg)
return {
type = "error",
error = {
type = openai_body.error.type or "api_error",
message = err_msg,
}
}
end
if not openai_body or not openai_body.choices
or type(openai_body.choices) ~= "table" or #openai_body.choices == 0 then
return {
type = "error",
error = { type = "api_error", message = "Empty response from upstream" }
}
end
local choice = openai_body.choices[1]
local msg = choice.message or {}
local content = {}
-- reasoning/thinking content(兼容 reasoning_content 和 reasoning 两种字段名)
-- 注意:GLM 等模型返回 "reasoning_content": null,cjson.null 是 truthy,必须用 type() 过滤
local rc = msg.reasoning_content
local r = msg.reasoning
local reasoning = (type(rc) == "string" and rc ~= "" and rc)
or (type(r) == "string" and r ~= "" and r)
or nil
if reasoning then
core.log.debug("[OpenAI->Claude] Added thinking content: ", string_sub(reasoning, 1, 200))
table_insert(content, {
type = "thinking",
thinking = reasoning,
signature = "", -- Claude 协议要求 signature 字段
})
end
-- text content
-- 注意:GLM 等模型可能返回 "content": null,必须用 type() 过滤 cjson.null
if type(msg.content) == "string" and msg.content ~= "" then
table_insert(content, {
type = "text",
text = msg.content
})
end
-- tool_calls -> tool_use content blocks
if msg.tool_calls and type(msg.tool_calls) == "table" then
for _, tc in ipairs(msg.tool_calls) do
local input = {}
if tc["function"] and tc["function"].arguments then
input = cjson.decode(tc["function"].arguments) or {}
end
table_insert(content, {
type = "tool_use",
id = tc.id,
name = tc["function"] and tc["function"].name or "",
input = input
})
end
end
-- 空 content 兜底
if #content == 0 then
table_insert(content, { type = "text", text = "" })
end
-- 构建 Claude 响应
local stop_reason = openai_finish_reason_to_claude(choice.finish_reason)
-- 构建 usage(参考 litellm: input_tokens = prompt_tokens - cached_tokens)
local claude_usage = {
input_tokens = 0,
output_tokens = 0,
}
if type(openai_body.usage) == "table" then
local prompt_tokens = openai_body.usage.prompt_tokens or 0
local cached_tokens = 0
if type(openai_body.usage.prompt_tokens_details) == "table"
and openai_body.usage.prompt_tokens_details.cached_tokens then
cached_tokens = openai_body.usage.prompt_tokens_details.cached_tokens
end
-- Claude 协议: input_tokens 是未缓存的 token 数
claude_usage.input_tokens = prompt_tokens - cached_tokens
claude_usage.output_tokens = openai_body.usage.completion_tokens or 0
-- cache token 字段
if cached_tokens > 0 then
claude_usage.cache_read_input_tokens = cached_tokens
end
-- cache_creation_input_tokens(如果上游返回)
if type(openai_body.usage.prompt_tokens_details) == "table"
and openai_body.usage.prompt_tokens_details.cache_creation_input_tokens then
claude_usage.cache_creation_input_tokens = openai_body.usage.prompt_tokens_details.cache_creation_input_tokens
end
end
local claude_resp = {
id = openai_body.id or ("msg_" .. ngx_now()),
type = "message",
role = "assistant",
content = content,
model = openai_body.model or "unknown",
stop_reason = stop_reason,
stop_sequence = cjson.null,
usage = claude_usage,
}
core.log.debug("[OpenAI->Claude] Converted Claude response body: ", cjson.encode(claude_resp))
return claude_resp
end
------------------------------------------------------------------------
-- 流式响应转换: OpenAI SSE -> Claude SSE
-- 参考 Higress ConvertOpenAIStreamResponseToClaude + buildClaudeStreamResponse
--
-- 关键设计(对齐 Higress):
-- 1. nextContentIndex 动态递增分配 content block index
-- 2. pendingStopReason 缓存 stop_reason 直到收到 usage 后一起发送
-- 3. tool_calls 支持 activeToolIndex + cachedArguments 序列化机制
-- 4. thinking block 输出 signature 字段
------------------------------------------------------------------------
-- 创建流式转换器状态(对应 Higress ClaudeToOpenAIConverter 的状态字段)
local function new_stream_ctx()
return {
message_start_sent = false,
message_stop_sent = false,
message_id = nil,
-- content block index 动态分配
next_content_index = 0,
-- thinking block
thinking_started = false,
thinking_stopped = false,
thinking_index = -1,
-- text block
text_started = false,
text_stopped = false,
text_index = -1,
-- tool call 状态追踪
tool_call_states = {}, -- map[openai_index] = {id, name, claude_index, started, stopped, cached_args}
active_tool_index = nil, -- 当前活跃的 tool call index
-- stop_reason 缓存
pending_stop_reason = nil,
-- usage
output_tokens = 0,
}
end
-- 启动一个 tool call content block(参考 Higress startToolCall)
local function start_tool_call(sctx, tool_state, conf)
local events = {}
-- 关闭 thinking block
if sctx.thinking_started and not sctx.thinking_stopped then
sctx.thinking_stopped = true
core.log.debug("[OpenAI->Claude] Closing thinking content block before tool use")
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
})
end
-- 关闭 text block
if sctx.text_started and not sctx.text_stopped then
sctx.text_stopped = true
core.log.debug("[OpenAI->Claude] Closing text content block before tool use")
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
})
end
-- 分配 Claude content index
tool_state.claude_index = sctx.next_content_index
sctx.next_content_index = sctx.next_content_index + 1
tool_state.started = true
core.log.debug("[OpenAI->Claude] Started tool call: Claude index=", tool_state.claude_index,
", id=", tool_state.id, ", name=", tool_state.name)
-- content_block_start
table_insert(events, {
event = "content_block_start",
data = cjson.encode({
type = "content_block_start",
index = tool_state.claude_index,
content_block = {
type = "tool_use",
id = tool_state.id,
name = tool_state.name,
input = {}
}
})
})
-- 输出缓存的 arguments
if tool_state.cached_args and tool_state.cached_args ~= "" then
core.log.debug("[OpenAI->Claude] Outputting cached arguments for tool: ", tool_state.cached_args)
table_insert(events, {
event = "content_block_delta",
data = cjson.encode({
type = "content_block_delta",
index = tool_state.claude_index,
delta = { type = "input_json_delta", partial_json = tool_state.cached_args }
})
})
end
return events
end
-- 处理单个 OpenAI SSE data chunk,返回 Claude SSE events 列表
-- 参考 Higress buildClaudeStreamResponse
local function build_claude_stream_events(sctx, openai_chunk, conf)
local events = {}
local choice
if openai_chunk.choices and type(openai_chunk.choices) == "table" and #openai_chunk.choices > 0 then
choice = openai_chunk.choices[1]
else
choice = { index = 0, delta = {} }
end
local delta = choice.delta or {}
-- 日志:分析当前 chunk 包含的内容
local has_role = type(delta.role) == "string" and delta.role ~= ""
local has_content = type(delta.content) == "string" and delta.content ~= ""
local has_finish = choice.finish_reason ~= nil and choice.finish_reason ~= cjson.null
local has_usage = type(openai_chunk.usage) == "table"
local has_reasoning = (type(delta.reasoning_content) == "string" and delta.reasoning_content ~= "") or (type(delta.reasoning) == "string" and delta.reasoning ~= "")
local has_tool_calls = delta.tool_calls ~= nil and delta.tool_calls ~= cjson.null
core.log.debug("[OpenAI->Claude] Processing OpenAI chunk - Role: ", tostring(has_role),
", Content: ", tostring(has_content), ", Reasoning: ", tostring(has_reasoning),
", ToolCalls: ", tostring(has_tool_calls),
", FinishReason: ", tostring(has_finish), ", Usage: ", tostring(has_usage))
-- message_start(只发一次)
-- 注意:GLM 等模型后续 chunk 中 role 为 null(cjson.null),必须用 type() 过滤
if type(delta.role) == "string" and delta.role ~= "" and not sctx.message_start_sent then
sctx.message_start_sent = true
sctx.message_id = openai_chunk.id
core.log.debug("[OpenAI->Claude] Generated message_start event for id: ", openai_chunk.id)
local msg_start = {
type = "message_start",
message = {
id = openai_chunk.id or ("msg_" .. ngx_now()),
type = "message",
role = "assistant",
content = {},
model = openai_chunk.model or "unknown",
stop_reason = cjson.null,
stop_sequence = cjson.null,
usage = {
input_tokens = 0,
output_tokens = 0,
cache_creation_input_tokens = 0,
cache_read_input_tokens = 0,
}
}
}
-- 如果首个 chunk 就带 usage
if type(openai_chunk.usage) == "table" then
msg_start.message.usage.input_tokens = openai_chunk.usage.prompt_tokens or 0
end
table_insert(events, { event = "message_start", data = cjson.encode(msg_start) })
elseif type(delta.role) == "string" and delta.role ~= "" and sctx.message_start_sent then
-- Skip duplicate role messages from OpenRouter
core.log.debug("[OpenAI->Claude] Skipping duplicate role message for id: ", openai_chunk.id)
end
-- reasoning content(thinking)
-- 注意:Kimi-K2.5 等模型的流式输出可能交错发送 reasoning_content 和 content
-- 即某个 chunk 同时包含两者,或者 content 出现后仍有后续 reasoning_content。
-- 因此不能在收到 content 时立即关闭 thinking block,而是延迟到 finish_reason 时关闭。
-- 注意:GLM 等模型每个 chunk 都带 "reasoning_content": null,cjson.null 是 truthy 且 ~= "" 为 true,
-- 必须用 type() == "string" 过滤
local rc = delta.reasoning_content
local r = delta.reasoning
local reasoning = (type(rc) == "string" and rc ~= "" and rc)
or (type(r) == "string" and r ~= "" and r)
or nil
if reasoning then
if not sctx.thinking_started then
sctx.thinking_index = sctx.next_content_index
sctx.next_content_index = sctx.next_content_index + 1
sctx.thinking_started = true
table_insert(events, {
event = "content_block_start",
data = cjson.encode({
type = "content_block_start",
index = sctx.thinking_index,
content_block = { type = "thinking", thinking = "", signature = "" }
})
})
end
table_insert(events, {
event = "content_block_delta",
data = cjson.encode({
type = "content_block_delta",
index = sctx.thinking_index,
delta = { type = "thinking_delta", thinking = reasoning }
})
})
end
-- text content
-- 不在此处关闭 thinking block,因为后续 chunk 可能还有 reasoning_content(Kimi-K2.5 交错行为)。
-- thinking block 的关闭统一在 finish_reason 处理中完成
-- 注意:GLM 等模型每个 chunk 都带 "content": null,必须用 type() == "string" 过滤 cjson.null
if type(delta.content) == "string" and delta.content ~= "" then
if not sctx.text_started then
sctx.text_index = sctx.next_content_index
sctx.next_content_index = sctx.next_content_index + 1
sctx.text_started = true
table_insert(events, {
event = "content_block_start",
data = cjson.encode({
type = "content_block_start",
index = sctx.text_index,
content_block = { type = "text", text = "" }
})
})
end
table_insert(events, {
event = "content_block_delta",
data = cjson.encode({
type = "content_block_delta",
index = sctx.text_index,
delta = { type = "text_delta", text = delta.content }
})
})
end
-- tool_calls(参考 Higress 的 activeToolIndex + cachedArguments 机制)
if delta.tool_calls and type(delta.tool_calls) == "table" then
-- 确保 message_start 已发送
if not sctx.message_start_sent then
sctx.message_start_sent = true
sctx.message_id = openai_chunk.id
core.log.debug("[OpenAI->Claude] Generated message_start event before tool calls for id: ", openai_chunk.id)
table_insert(events, {
event = "message_start",
data = cjson.encode({
type = "message_start",
message = {
id = openai_chunk.id or ("msg_" .. ngx_now()),
type = "message", role = "assistant", content = {},
model = openai_chunk.model or "unknown",
stop_reason = cjson.null, stop_sequence = cjson.null,
usage = { input_tokens = 0, output_tokens = 0 }
}
})
})
end
for _, tc in ipairs(delta.tool_calls) do
local idx = tc.index or 0
-- 新 tool call(有 id 和 name)
if tc.id and tc.id ~= "" and tc["function"] and tc["function"].name and tc["function"].name ~= "" then
core.log.debug("[OpenAI->Claude] Processing tool call delta: index=", idx,
", id=", tc.id, ", name=", tc["function"].name,
", args=", tc["function"].arguments or "")
if not sctx.tool_call_states[idx] then
sctx.tool_call_states[idx] = {
id = tc.id,
name = tc["function"].name,
claude_index = -1,
started = false,
stopped = false,
cached_args = "",
}
end
-- 如果没有活跃的 tool call,立即启动
if sctx.active_tool_index == nil then
sctx.active_tool_index = idx
local tc_events = start_tool_call(sctx, sctx.tool_call_states[idx], conf)
for _, e in ipairs(tc_events) do
table_insert(events, e)
end
end
end
-- arguments 增量
if tc["function"] and tc["function"].arguments and tc["function"].arguments ~= "" then
local state = sctx.tool_call_states[idx]
if state then
state.cached_args = (state.cached_args or "") .. tc["function"].arguments
core.log.debug("[OpenAI->Claude] Cached arguments for tool index ", idx,
": ", tc["function"].arguments, " (total: ", state.cached_args, ")")
-- 只有活跃的 tool call 才实时输出 delta
if sctx.active_tool_index == idx and state.started then
core.log.debug("[OpenAI->Claude] Generated input_json_delta event for active tool index ", idx,
": ", tc["function"].arguments)
table_insert(events, {
event = "content_block_delta",
data = cjson.encode({
type = "content_block_delta",
index = state.claude_index,
delta = { type = "input_json_delta", partial_json = tc["function"].arguments }
})
})
end
end
end
end
end
-- finish_reason 处理 (v0.8: 排除 cjson.null,JSON null 在 Lua 中是 truthy userdata)
if choice.finish_reason and choice.finish_reason ~= cjson.null then
local claude_reason = openai_finish_reason_to_claude(choice.finish_reason)
core.log.debug("[OpenAI->Claude] Processing finish_reason: ", choice.finish_reason, " -> ", claude_reason)
-- 关闭所有活跃的 content blocks
if sctx.thinking_started and not sctx.thinking_stopped then
sctx.thinking_stopped = true
core.log.debug("[OpenAI->Claude] Generated thinking content_block_stop event at index ", sctx.thinking_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
})
end
if sctx.text_started and not sctx.text_stopped then
sctx.text_stopped = true
core.log.debug("[OpenAI->Claude] Generated text content_block_stop event at index ", sctx.text_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
})
end
-- 启动所有未启动的 tool calls,然后关闭所有 tool calls(按 index 排序)
local sorted_indices = {}
for k, _ in pairs(sctx.tool_call_states) do
table_insert(sorted_indices, k)
end
table.sort(sorted_indices)
for _, tidx in ipairs(sorted_indices) do
local ts = sctx.tool_call_states[tidx]
if not ts.started then
core.log.debug("[OpenAI->Claude] Starting remaining tool call at finish: index=", tidx,
", id=", ts.id, ", name=", ts.name)
sctx.active_tool_index = tidx
local tc_events = start_tool_call(sctx, ts, conf)
for _, e in ipairs(tc_events) do table_insert(events, e) end
sctx.active_tool_index = nil
end
end
for _, tidx in ipairs(sorted_indices) do
local ts = sctx.tool_call_states[tidx]
if ts.started and not ts.stopped then
ts.stopped = true
core.log.debug("[OpenAI->Claude] Generated content_block_stop for tool at index ", tidx,
", Claude index ", ts.claude_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
})
end
end
-- 缓存 stop_reason,等 usage 到了一起发(Claude 协议要求)
sctx.pending_stop_reason = claude_reason
core.log.debug("[OpenAI->Claude] Cached stop_reason: ", claude_reason, ", waiting for usage")
end
-- usage 处理(可能和 finish_reason 在同一个 chunk,也可能在后续 chunk)?
if type(openai_chunk.usage) == "table" then
core.log.debug("[OpenAI->Claude] Processing usage info - input: ",
openai_chunk.usage.prompt_tokens or 0, ", output: ",
openai_chunk.usage.completion_tokens or 0)
-- GLM 等模型可能在 usage-only chunk 中不带 finish_reason(choices 为空数组),
-- 导致 content blocks 未被关闭。在发送 message_delta 之前补关所有未关闭的 blocks。
if sctx.thinking_started and not sctx.thinking_stopped then
sctx.thinking_stopped = true
core.log.debug("[OpenAI->Claude] Closing thinking block before usage, index=", sctx.thinking_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
})
end
if sctx.text_started and not sctx.text_stopped then
sctx.text_stopped = true
core.log.debug("[OpenAI->Claude] Closing text block before usage, index=", sctx.text_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
})
end
-- 关闭所有未关闭的 tool call blocks
local sorted_indices = {}
for k, _ in pairs(sctx.tool_call_states) do
table_insert(sorted_indices, k)
end
table.sort(sorted_indices)
for _, tidx in ipairs(sorted_indices) do
local ts = sctx.tool_call_states[tidx]
if ts.started and not ts.stopped then
ts.stopped = true
core.log.debug("[OpenAI->Claude] Closing tool block before usage, index=", ts.claude_index)
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
})
end
end
-- 参考 litellm: input_tokens = prompt_tokens - cached_tokens
local prompt_tokens = openai_chunk.usage.prompt_tokens or 0
local cached_tokens = 0
if type(openai_chunk.usage.prompt_tokens_details) == "table"
and openai_chunk.usage.prompt_tokens_details.cached_tokens then
cached_tokens = openai_chunk.usage.prompt_tokens_details.cached_tokens
end
local stream_usage = {
input_tokens = prompt_tokens - cached_tokens,
output_tokens = openai_chunk.usage.completion_tokens or 0,
}
if cached_tokens > 0 then
stream_usage.cache_read_input_tokens = cached_tokens
end
if type(openai_chunk.usage.prompt_tokens_details) == "table"
and openai_chunk.usage.prompt_tokens_details.cache_creation_input_tokens then
stream_usage.cache_creation_input_tokens = openai_chunk.usage.prompt_tokens_details.cache_creation_input_tokens
end
local msg_delta = {
type = "message_delta",
delta = {
stop_sequence = cjson.null,
},
usage = stream_usage,
}
-- 合并缓存的 stop_reason;如果没有缓存(GLM usage-only chunk 场景),默认 end_turn
if sctx.pending_stop_reason then
msg_delta.delta.stop_reason = sctx.pending_stop_reason
core.log.debug("[OpenAI->Claude] Combining cached stop_reason ", sctx.pending_stop_reason, " with usage")
sctx.pending_stop_reason = nil
else
msg_delta.delta.stop_reason = "end_turn"
core.log.debug("[OpenAI->Claude] No cached stop_reason, defaulting to end_turn")
end
table_insert(events, { event = "message_delta", data = cjson.encode(msg_delta) })
core.log.debug("[OpenAI->Claude] Generated message_delta event with usage and stop_reason")
-- message_stop
if not sctx.message_stop_sent then
sctx.message_stop_sent = true
core.log.debug("[OpenAI->Claude] Generated message_stop event")
table_insert(events, {
event = "message_stop",
data = cjson.encode({ type = "message_stop" })
})
end
end
return events
end
-- 处理 [DONE] 信号,发送所有未关闭的 block 和结束事件,并重置状态
local function build_claude_stream_done(sctx, conf)
core.log.debug("[OpenAI->Claude] Processing [DONE] message, finalizing stream")
local events = {}
-- 关闭所有未关闭的 content blocks
if sctx.thinking_started and not sctx.thinking_stopped then
sctx.thinking_stopped = true
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
})
end
if sctx.text_started and not sctx.text_stopped then
sctx.text_stopped = true
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
})
end
for _, ts in pairs(sctx.tool_call_states) do
if ts.started and not ts.stopped then
ts.stopped = true
table_insert(events, {
event = "content_block_stop",
data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
})
end
end
-- 如果还有缓存的 stop_reason 没发(没收到 usage 的情况)
if sctx.pending_stop_reason then
table_insert(events, {
event = "message_delta",
data = cjson.encode({
type = "message_delta",
delta = { stop_reason = sctx.pending_stop_reason, stop_sequence = cjson.null },
usage = { output_tokens = 0 }
})
})
sctx.pending_stop_reason = nil
end
-- message_stop
if sctx.message_start_sent and not sctx.message_stop_sent then
sctx.message_stop_sent = true
table_insert(events, {
event = "message_stop",
data = cjson.encode({ type = "message_stop" })
})
end
-- 重置所有状态,防止连接复用时状态污染(对齐 Higress [DONE] 后的 reset 逻辑)
sctx.message_start_sent = false
sctx.message_stop_sent = false
sctx.message_id = nil
sctx.next_content_index = 0
sctx.thinking_started = false
sctx.thinking_stopped = false
sctx.thinking_index = -1
sctx.text_started = false
sctx.text_stopped = false
sctx.text_index = -1
sctx.tool_call_states = {}
sctx.active_tool_index = nil
sctx.pending_stop_reason = nil
sctx.output_tokens = 0
core.log.debug("[OpenAI->Claude] Reset converter state for next request")
return events
end
------------------------------------------------------------------------
-- APISIX 插件钩子
------------------------------------------------------------------------
-- rewrite 阶段:在 key-auth(2500) 之前执行 header 转换,确保认证能通过
function _M.rewrite(conf, ctx)
local uri = ngx.var.uri
if uri ~= "/v1/messages" then
core.log.debug("[APISIX] rewrite phase skipped, uri=", uri,
", method=", ngx.req.get_method(),
", x-api-key=", ngx.req.get_headers()["x-api-key"] and "present" or "nil",
", authorization=", ngx.req.get_headers()["authorization"] and "present" or "nil")
return
end
core.log.debug("[APISIX] rewrite phase triggered, uri=", uri,
", method=", ngx.req.get_method())
-- Header 转换: x-api-key -> Authorization: Bearer
-- 必须在 rewrite 阶段完成,否则 key-auth 在 rewrite 阶段读不到 Authorization
local api_key = ngx.req.get_headers()["x-api-key"]
if api_key then
core.log.debug("[Claude->OpenAI] Converting x-api-key to Authorization header")
ngx.req.set_header("Authorization", "Bearer " .. api_key)
ngx.req.clear_header("x-api-key")
end
end
-- access 阶段:改写请求体 + URI + 清理其他 header
function _M.access(conf, ctx)
local uri = ngx.var.uri
if uri ~= "/v1/messages" then
core.log.debug("[APISIX] access phase skipped, uri=", uri)
return
end
core.log.debug("[APISIX] access phase triggered, uri=", uri)
ngx.req.read_body()
local body = ngx.req.get_body_data()
if not body then
-- body 可能在临时文件中(大请求体场景)
local file_name = ngx.req.get_body_file()
if file_name then
local f = io.open(file_name, "r")
if f then
body = f:read("*a")
f:close()
end
end
if not body then
core.log.error("failed to read request body, body_file=",
file_name or "nil")
return
end
end
local claude_body, decode_err = cjson.decode(body)
if not claude_body then
core.log.error("failed to decode Claude request body, err=",
decode_err or "unknown", ", body_prefix=", string_sub(body, 1, 200))
return
end
-- 保存 stream 标志
ctx.claude2openai_stream = claude_body.stream or false
-- 转换请求体
local openai_body = convert_claude_request_to_openai(claude_body, conf)
local encoded_body, encode_err = cjson.encode(openai_body)
if not encoded_body then
core.log.error("failed to encode OpenAI request body, err=",
encode_err or "unknown")
return
end
ngx.req.set_body_data(encoded_body)
-- 改写 URI
ngx.req.set_uri("/v1/chat/completions")
-- 清理 Claude 特有 header
ngx.req.clear_header("anthropic-version")
ngx.req.clear_header("anthropic-beta")
ctx.claude2openai_converted = true
core.log.debug("[c2o v0.8] request converted, stream=", ctx.claude2openai_stream,
", model=", claude_body.model or "nil",
", ctx_id=", tostring(ctx))
end
-- 响应头阶段
function _M.header_filter(conf, ctx)
if not ctx.claude2openai_converted then return end
core.log.debug("[APISIX] header_filter phase, stream=", ctx.claude2openai_stream,
", upstream_status=", ngx.status,
", upstream_content_type=", ngx.header["Content-Type"] or "nil",
", upstream_content_length=", ngx.header["Content-Length"] or "nil")
-- 上游返回非 2xx 时记录警告
if ngx.status >= 400 then
core.log.error("upstream returned error status=", ngx.status)
end
if ctx.claude2openai_stream then
ngx.header["Content-Type"] = "text/event-stream"
-- 关键:SSE 流式响应必须关闭缓冲,否则 nginx 会缓冲上游数据,
-- 导致 body_filter 在 SSE data 行中间分割 chunk,丢失大量数据
ngx.header["X-Accel-Buffering"] = "no"
else
ngx.header["Content-Type"] = "application/json"
end
-- 响应体会被改写,移除 Content-Length
ngx.header["Content-Length"] = nil
end
-- 响应体阶段
-- 注意:当 ai-proxy-multi bypass 模式下,lua_body_filter 已经完成了转换,
-- 其输出通过 ngx_print 会再次触发 nginx body_filter。
-- 必须检测并跳过,避免对已转换的 Claude SSE 数据二次处理。
function _M.body_filter(conf, ctx)
if not ctx.claude2openai_converted then return end
-- 如果 lua_body_filter 已经在处理流式转换,跳过 nginx body_filter
-- 避免对 lua_body_filter 输出的 Claude SSE 数据进行二次转换
if ctx.claude2openai_lua_body_filter_active then
return
end
local chunk = ngx.arg[1]
local eof = ngx.arg[2]
core.log.debug("[APISIX] body_filter phase v0.6 (nginx), stream=", ctx.claude2openai_stream,
", chunk_len=", chunk and #chunk or 0, ", eof=", eof,
", status=", ngx.status)
if ctx.claude2openai_stream then
-- 流式响应:逐 chunk 转换
if not ctx.claude2openai_sctx then
ctx.claude2openai_sctx = new_stream_ctx()
ctx.claude2openai_line_buf = "" -- SSE 行缓冲,处理 body_filter chunk 边界分割
end
local sctx = ctx.claude2openai_sctx
local output_parts = {}
-- 流式请求上游返回非流式 error JSON 时(如 4xx/5xx),直接透传真实错误
if not ctx.claude2openai_stream_error_checked then
ctx.claude2openai_stream_error_checked = true
if ngx.status >= 400 and chunk and chunk ~= "" then
core.log.error("upstream error response (stream), status=", ngx.status,
", body=", string_sub(chunk, 1, 4096))
local err_resp, _ = cjson.decode(chunk)
if err_resp and err_resp.error then
local err_msg = "Unknown upstream error"
if type(err_resp.error) == "table" then
err_msg = err_resp.error.message or cjson.encode(err_resp.error)
elseif type(err_resp.error) == "string" then
err_msg = err_resp.error
end
core.log.debug("[OpenAI->Claude] Stream request got upstream error: ", err_msg)
local claude_err = cjson.encode({
type = "error",
error = {
type = err_resp.error.type or "api_error",
message = err_msg,
}
})
ngx.arg[1] = claude_err
return
end
end
end
if chunk and chunk ~= "" then
core.log.debug("[OpenAI->Claude] Original OpenAI streaming chunk len=", #chunk)
-- 行缓冲:nginx body_filter 可能在 SSE data 行中间分割 chunk
-- 导致不完整的 JSON 行被丢弃。将上一次的不完整行和当前 chunk 拼接。
local buf = chunk
if ctx.claude2openai_line_buf and ctx.claude2openai_line_buf ~= "" then
buf = ctx.claude2openai_line_buf .. buf
core.log.debug("[OpenAI->Claude] Prepended buffered line fragment, len=",
#ctx.claude2openai_line_buf)
ctx.claude2openai_line_buf = ""
end
-- 检查 chunk 是否以换行结尾,如果不是,最后一行可能不完整
local last_char = string_sub(buf, -1)
local complete_buf, trailing
if last_char == "\n" or last_char == "\r" then
complete_buf = buf
trailing = nil
else
-- 找到最后一个换行符的位置,之后的内容缓存到下次
local last_nl = nil
for i = #buf, 1, -1 do
local c = string_sub(buf, i, i)
if c == "\n" or c == "\r" then
last_nl = i
break
end
end
if last_nl then
complete_buf = string_sub(buf, 1, last_nl)
trailing = string_sub(buf, last_nl + 1)
else
-- 整个 buf 都没有换行,全部缓存
ctx.claude2openai_line_buf = buf
core.log.debug("[OpenAI->Claude] Entire chunk buffered (no newline), len=", #buf)
complete_buf = nil
trailing = nil
end
end
if trailing and trailing ~= "" then
ctx.claude2openai_line_buf = trailing
core.log.debug("[OpenAI->Claude] Buffered trailing incomplete line, len=", #trailing)
end
if complete_buf and complete_buf ~= "" then
for line in complete_buf:gmatch("[^\r\n]+") do
if line:sub(1, 6) == "data: " then
local data = line:sub(7)
if data == "[DONE]" then
local done_events = build_claude_stream_done(sctx, conf)
for _, evt in ipairs(done_events) do
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
else
local openai_chunk, chunk_err = cjson.decode(data)
if openai_chunk then
-- SSE data 中包含 error 对象时,转换为 Claude error 格式
if openai_chunk.error then
local err_msg = "Unknown upstream error"
if type(openai_chunk.error) == "table" then
err_msg = openai_chunk.error.message or cjson.encode(openai_chunk.error)
elseif type(openai_chunk.error) == "string" then
err_msg = openai_chunk.error
end
core.log.debug("[OpenAI->Claude] SSE chunk contains error: ", err_msg)
local claude_err = cjson.encode({
type = "error",
error = {
type = openai_chunk.error.type or "api_error",
message = err_msg,
}
})
table_insert(output_parts, "event: error\ndata: " .. claude_err .. "\n\n")
else
local ok, err = pcall(function()
local events = build_claude_stream_events(sctx, openai_chunk, conf)
core.log.debug("[OpenAI->Claude] Generated ", #events, " Claude stream events from OpenAI chunk")
for i, evt in ipairs(events) do
core.log.debug("[OpenAI->Claude] Stream event [", i, "/", #events, "]: ", evt.data)
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
end)
if not ok then
core.log.error("stream event conversion error, err=",
err, ", data_prefix=", string_sub(data, 1, 200))
end
end
else
core.log.error("failed to decode stream chunk, err=",
chunk_err or "unknown", ", data_prefix=", string_sub(data, 1, 200))
end
end
end
end
end -- complete_buf
end
-- EOF 时处理行缓冲残留
if eof and ctx.claude2openai_line_buf and ctx.claude2openai_line_buf ~= "" then
local remaining = ctx.claude2openai_line_buf
ctx.claude2openai_line_buf = ""
core.log.debug("[OpenAI->Claude] Processing remaining buffered line at EOF, len=", #remaining)
for line in remaining:gmatch("[^\r\n]+") do
if line:sub(1, 6) == "data: " then
local data = line:sub(7)
if data == "[DONE]" then
local done_events = build_claude_stream_done(sctx, conf)
for _, evt in ipairs(done_events) do
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
else
local openai_chunk, chunk_err = cjson.decode(data)
if openai_chunk then
local ok, err = pcall(function()
local events = build_claude_stream_events(sctx, openai_chunk, conf)
for _, evt in ipairs(events) do
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
end)
if not ok then
core.log.error("stream event conversion error at EOF, err=", err)
end
else
core.log.error("failed to decode buffered stream chunk at EOF, err=",
chunk_err or "unknown")
end
end
end
end
end
if eof and not sctx.message_stop_sent then
core.log.debug("[OpenAI->Claude] EOF reached without message_stop, sending final events")
local done_events = build_claude_stream_done(sctx, conf)
for _, evt in ipairs(done_events) do
table_insert(output_parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
end
ngx.arg[1] = #output_parts > 0 and table_concat(output_parts) or ""
if #output_parts > 0 then
core.log.debug("[OpenAI->Claude] Converted Claude streaming chunk: ", ngx.arg[1])
end
else
-- 非流式响应:收集完整响应体后转换
if not ctx.claude2openai_resp_chunks then
ctx.claude2openai_resp_chunks = {}
end
if chunk and chunk ~= "" then
table_insert(ctx.claude2openai_resp_chunks, chunk)
if not eof then
ngx.arg[1] = nil
return
end
end
if eof then
local full_body = table_concat(ctx.claude2openai_resp_chunks)
core.log.debug("[OpenAI->Claude] non-stream response complete, body_len=", #full_body,
", status=", ngx.status)
if ngx.status >= 400 then
core.log.error("upstream error response, status=", ngx.status,
", body=", string_sub(full_body, 1, 4096))
end
local openai_resp, resp_err = cjson.decode(full_body)
if openai_resp then
local claude_resp = convert_openai_response_to_claude(openai_resp, conf)
local encoded, enc_err = cjson.encode(claude_resp)
if encoded then
ngx.arg[1] = encoded
else
core.log.error("failed to encode Claude response, err=",
enc_err or "unknown")
ngx.arg[1] = full_body
end
else
core.log.error("failed to decode upstream response, err=",
resp_err or "unknown", ", body_prefix=", string_sub(full_body, 1, 300))
ngx.arg[1] = full_body
end
end
end
end
------------------------------------------------------------------------
-- lua_body_filter: 供 ai-proxy-multi 的 lua_response_filter 调用
-- 签名: (conf, ctx, headers, body) -> (code, new_body)
-- body 是 ai-proxy-multi 从上游body_reader() 读取的原始 SSE chunk
-- 返回 nil, new_body 替换输出内容
--
-- 关键:body_reader() 按 HTTP chunk 返回数据,chunk 边界可能在 SSE data 行
-- 中间截断,必须用行缓冲处理跨 chunk 的不完整行,否则会丢失事件。
------------------------------------------------------------------------
-- 处理完整的 SSE data 行,返回转换后的 Claude SSE 事件片段
local function process_sse_line(sctx, line, conf)
local parts = {}
if line:sub(1, 6) ~= "data: " then
return parts
end
local data = line:sub(7)
if data == "[DONE]" then
local done_events = build_claude_stream_done(sctx, conf)
for _, evt in ipairs(done_events) do
table_insert(parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
else
local openai_chunk, chunk_err = cjson.decode(data)
if openai_chunk then
if openai_chunk.error then
local err_msg = "Unknown upstream error"
if type(openai_chunk.error) == "table" then
err_msg = openai_chunk.error.message or cjson.encode(openai_chunk.error)
elseif type(openai_chunk.error) == "string" then
err_msg = openai_chunk.error
end
local claude_err = cjson.encode({
type = "error",
error = {
type = openai_chunk.error.type or "api_error",
message = err_msg,
}
})
table_insert(parts, "event: error\ndata: " .. claude_err .. "\n\n")
else
local ok, err = pcall(function()
local events = build_claude_stream_events(sctx, openai_chunk, conf)
for _, evt in ipairs(events) do
table_insert(parts,
"event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
end
end)
if not ok then
core.log.warn("[c2o v0.8] convert err: ", err)
end
end
else
core.log.warn("[c2o v0.8] decode err: ", chunk_err,
" data=", string_sub(data, 1, 200))
end
end
return parts
end
function _M.lua_body_filter(conf, ctx, headers, body)
core.log.debug("[c2o v0.8] ENTER converted=",
tostring(ctx.claude2openai_converted),
" stream=", tostring(ctx.claude2openai_stream),
" body_len=", body and #body or 0,
" ctx_id=", tostring(ctx))
if not ctx.claude2openai_converted then return end
if not ctx.claude2openai_stream then
return
end
-- 标记 lua_body_filter 已激活,阻止 nginx body_filter 二次处理
ctx.claude2openai_lua_body_filter_active = true
-- 流式响应转换
if not ctx.claude2openai_sctx then
ctx.claude2openai_sctx = new_stream_ctx()
ctx.claude2openai_lua_line_buf = "" -- 行缓冲:处理跨 chunk 的不完整 SSE 行
core.log.debug("[c2o v0.8] INIT stream ctx")
end
local sctx = ctx.claude2openai_sctx
if not body or body == "" then return nil, "" end
core.log.debug("[c2o v0.8] IN len=", #body,
" first80=", string_sub(body, 1, 80))
local output_parts = {}
-- 拼接上次残留的不完整行
local buf = body
if ctx.claude2openai_lua_line_buf and ctx.claude2openai_lua_line_buf ~= "" then
buf = ctx.claude2openai_lua_line_buf .. buf
core.log.debug("[c2o v0.8] prepend buf len=", #ctx.claude2openai_lua_line_buf)
ctx.claude2openai_lua_line_buf = ""
end
-- 检查 buf 是否以换行结尾;如果不是,最后一行可能不完整
local last_char = string_sub(buf, -1)
local complete_buf, trailing
if last_char == "\n" or last_char == "\r" then
complete_buf = buf
trailing = nil
else
-- 找最后一个换行符
local last_nl = nil
for i = #buf, 1, -1 do
local c = string_sub(buf, i, i)
if c == "\n" or c == "\r" then
last_nl = i
break
end
end
if last_nl then
complete_buf = string_sub(buf, 1, last_nl)
trailing = string_sub(buf, last_nl + 1)
else
-- 整个 buf 都没有换行,全部缓存到下次
ctx.claude2openai_lua_line_buf = buf
core.log.debug("[c2o v0.8] no newline, buffered all len=", #buf)
return nil, ""
end
end
if trailing and trailing ~= "" then
ctx.claude2openai_lua_line_buf = trailing
core.log.debug("[c2o v0.8] trailing buf len=", #trailing,
" content=", string_sub(trailing, 1, 80))
end
-- 逐行解析完整的 SSE 行
for line in complete_buf:gmatch("[^\r\n]+") do
local parts = process_sse_line(sctx, line, conf)
for _, p in ipairs(parts) do
table_insert(output_parts, p)
end
end
local out = table_concat(output_parts)
core.log.debug("[c2o v0.8] OUT len=", #out,
" events=", #output_parts,
" first100=", string_sub(out, 1, 100))
return nil, out
end
return _M
更多推荐



所有评论(0)