APISIX Anthropic 与 OpenAI 协议互转插件:支持流式、思考、工具调用与多模态

AI时代高效设计开发:Anthropic → OpenAI 协议转换插件实战

一、参考已有实现

复用 Higress、LiteLLM 等相似项目的代码实现,让 AI 直接参考。

二、准备测试用例

整理 CURL 示例等场景,让 AI 自行校验验证结果。

三、明确功能边界

阶段 转换内容
请求 Anthropic Messages API → OpenAI Chat Completions
响应 OpenAI Chat Completions → Anthropic Messages API
支持 非流式/流式、thinking、tool_use、多模态图片

四、Vibe Coding

--
-- APISIX Plugin: claude2openai
-- 参考实现: Higress ai-proxy provider/claude_to_openai.go
--
-- 功能:
--   1. 请求阶段:将 Claude Messages API 请求转换为 OpenAI Chat Completions 请求
--   2. 响应阶段:将 OpenAI Chat Completions 响应转换为 Claude Messages API 响应
--   支持:非流式、流式、thinking、tool_use、多模态图片
--

local core = require("apisix.core")
local cjson = require("cjson.safe")
local ngx = ngx
local pairs = pairs
local ipairs = ipairs
local type = type
local table_insert = table.insert
local table_concat = table.concat
local string_sub = string.sub
local string_find = string.find
local ngx_now = ngx.now

local plugin_name = "jiankunking-claude2openai"

local schema = {
    type = "object",
    properties = {
        target_model = {
            type = "string",
            description = "Override the model name sent to OpenAI backend"
        },
    },
}

local _M = {
    version = 0.1,
    priority = 2650,  -- 在 consumer-restriction(2640) 和 key-auth(2500) 之前执行
    name = plugin_name,
    schema = schema,
}

function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end

------------------------------------------------------------------------
-- 工具函数
------------------------------------------------------------------------

-- OpenAI finish_reason -> Claude stop_reason (参考 Higress openAIFinishReasonToClaude)
local function openai_finish_reason_to_claude(reason)
    if not reason or reason == cjson.null then return "end_turn" end
    local mapping = {
        stop = "end_turn",
        length = "max_tokens",
        tool_calls = "tool_use",
        content_filter = "end_turn",
    }
    return mapping[reason] or reason
end

-- 移除 x-anthropic-billing-header 中的动态 cch 字段以支持 prompt caching
-- 参考 Higress stripCchFromBillingHeader
-- cch 值每次请求都会变化,如果不移除会导致缓存失效
local function strip_cch_from_billing_header(text)
    if type(text) ~= "string" then return text end
    local prefix = "x-anthropic-billing-header:"
    if string_sub(text, 1, #prefix) ~= prefix then
        return text
    end
    -- 循环移除所有"; cch=xxx" 模式
    local result = text
    while true do
        local cch_start = string_find(result, "; cch=", 1, true)
        if not cch_start then break end
        local after = cch_start + 2  -- skip "; "
        local semi_pos = string_find(result, ";", after, true)
        if not semi_pos then
            -- cch 在末尾,直接截断
            result = string_sub(result, 1, cch_start - 1)
        else
            -- cch 后面还有内容,移除 "; cch=xxx" 部分
            result = string_sub(result, 1, cch_start - 1) .. string_sub(result, semi_pos)
        end
    end
    return result
end

-- Claude content blocks 中提取纯文本部分
local function extract_text_parts(content_array)
    local parts = {}
    if type(content_array) ~= "table" then return parts end
    for _, block in ipairs(content_array) do
        if block.type == "text" and block.text then
            table_insert(parts, block.text)
        end
    end
    return parts
end

------------------------------------------------------------------------
-- 请求转换: Claude Messages -> OpenAI Chat Completions
-- 参Higress ConvertClaudeRequestToOpenAI
------------------------------------------------------------------------

-- 转换 Claude content blocks 数组为 OpenAI 格式
-- 返回 { text_parts, tool_calls, tool_results, openai_contents }
local function convert_content_array(content_array)
    local result = {
        text_parts = {},
        tool_calls = {},
        tool_results = {},
        openai_contents = {},
    }
    if type(content_array) ~= "table" then return result end

    for _, block in ipairs(content_array) do
        if block.type == "text" and block.text then
            local processed_text = strip_cch_from_billing_header(block.text)
            table_insert(result.text_parts, processed_text)
            local openai_content = {
                type = "text",
                text = processed_text
            }
            -- 透传 cache_control 以支持 prompt caching
            if block.cache_control then
                openai_content.cache_control = block.cache_control
            end
            table_insert(result.openai_contents, openai_content)

        elseif (block.type == "image" or block.type == "document") and block.source then
            -- Claude: {type:"image"/"document", source:{type:"base64", media_type:"...", data:"..."}}
            -- OpenAI: {type:"image_url", image_url:{url:"data:...;base64,..."}}
            local img_obj
            if block.source.type == "base64" then
                img_obj = {
                    type = "image_url",
                    image_url = {
                        url = "data:" .. (block.source.media_type or "image/png")
                                .. ";base64," .. block.source.data
                    }
                }
            elseif block.source.type == "url" then
                img_obj = {
                    type = "image_url",
                    image_url = { url = block.source.url }
                }
            end
            if img_obj then
                if block.cache_control then
                    img_obj.cache_control = block.cache_control
                end
                table_insert(result.openai_contents, img_obj)
            end

        elseif block.type == "thinking" and block.thinking then
            -- Claude assistant 历史消息中的 thinking block -> OpenAI thinking block
            local thinking_obj = {
                type = "thinking",
                thinking = block.thinking,
                signature = block.signature or ""
            }
            table_insert(result.openai_contents, thinking_obj)

        elseif block.type == "redacted_thinking" then
            -- Claude redacted_thinking block -> 透传
            table_insert(result.openai_contents, {
                type = "redacted_thinking",
                data = block.data or ""
            })

        elseif block.type == "tool_use" then
            -- Claude tool_use -> OpenAI tool_calls
            if block.id and block.name then
                table_insert(result.tool_calls, {
                    id = block.id,
                    type = "function",
                    ["function"] = {
                        name = block.name,
                        arguments = cjson.encode(block.input or {}) or "{}"
                    }
                })
                core.log.debug("[Claude->OpenAI] Converted tool_use to tool_call: ", block.name)
            end

        elseif block.type == "tool_result" then
            table_insert(result.tool_results, block)
            core.log.debug("[Claude->OpenAI] Found tool_result for tool_use_id: ", block.tool_use_id or "")
        end
    end

    return result
end

local function convert_claude_request_to_openai(claude_body, conf)
    core.log.debug("[Claude->OpenAI] Original Claude request body: ", cjson.encode(claude_body))

    local openai_body = {
        model = conf.target_model or claude_body.model,
        max_tokens = claude_body.max_tokens,
        temperature = claude_body.temperature,
        top_p = claude_body.top_p,
        stop = claude_body.stop_sequences,
        stream = claude_body.stream,
    }

    if openai_body.stream then
        openai_body.stream_options = { include_usage = true }
    end

    local messages = {}

    -- system: Claude 顶层字段 -> OpenAI system message(插入到 messages 最前面)
    if claude_body.system then
        if type(claude_body.system) == "string" then
            table_insert(messages, { role = "system", content = strip_cch_from_billing_header(claude_body.system) })
        elseif type(claude_body.system) == "table" then
            -- system 可以是 content blocks 数组
            -- 参考 litellm: 过滤掉 x-anthropic-billing-header 开头的 block,空文本 block 也跳过
            local sys_contents = {}
            for _, block in ipairs(claude_body.system) do
                if block.type == "text" and block.text then
                    if block.text == "" then
                        -- 跳过空文本(Anthropic API 会报错)
                    elseif string_sub(block.text, 1, 28) == "x-anthropic-billing-header:" then
                        -- 跳过 billing header metadata
                        core.log.debug("[Claude->OpenAI] Skipping billing header system block")
                    else
                        local sys_block = {
                            type = "text",
                            text = strip_cch_from_billing_header(block.text)
                        }
                        if block.cache_control then
                            sys_block.cache_control = block.cache_control
                        end
                        table_insert(sys_contents, sys_block)
                    end
                end
            end
            if #sys_contents > 0 then
                table_insert(messages, { role = "system", content = sys_contents })
            end
        end
    end

    -- messages 转换
    if claude_body.messages then
        for _, msg in ipairs(claude_body.messages) do
            if type(msg.content) == "string" then
                -- 简单文本
                table_insert(messages, { role = msg.role, content = msg.content })
            elseif type(msg.content) == "table" then
                local conv = convert_content_array(msg.content)

                -- 有 tool_calls(assistant 消息中的 tool_use)
                if #conv.tool_calls > 0 then
                    local openai_msg = {
                        role = msg.role,
                        tool_calls = conv.tool_calls,
                    }
                    if #conv.text_parts > 0 then
                        openai_msg.content = table_concat(conv.text_parts, "\n\n")
                    else
                        openai_msg.content = cjson.null
                    end
                    table_insert(messages, openai_msg)
                end

                -- 有 tool_results(user 消息中的 tool_result)
                if #conv.tool_results > 0 then
                    for _, tr in ipairs(conv.tool_results) do
                        local tool_content
                        if type(tr.content) == "string" then
                            tool_content = tr.content
                        elseif type(tr.content) == "table" then
                            -- 参考 litellm: tool_result.content 可能包含 text + image 混合内容
                            local content_parts = {}
                            local has_non_text = false
                            for _, c in ipairs(tr.content) do
                                if type(c) == "string" then
                                    table_insert(content_parts, { type = "text", text = c })
                                elseif type(c) == "table" then
                                    if c.type == "text" then
                                        table_insert(content_parts, { type = "text", text = c.text or "" })
                                    elseif (c.type == "image" or c.type == "document") and c.source then
                                        has_non_text = true
                                        if c.source.type == "base64" then
                                            table_insert(content_parts, {
                                                type = "image_url",
                                                image_url = {
                                                    url = "data:" .. (c.source.media_type or "image/png")
                                                            .. ";base64," .. c.source.data
                                                }
                                            })
                                        elseif c.source.type == "url" then
                                            table_insert(content_parts, {
                                                type = "image_url",
                                                image_url = { url = c.source.url }
                                            })
                                        end
                                    end
                                end
                            end
                            -- 如果只有纯文本,合并为 string(兼容性更好)
                            if not has_non_text then
                                local texts = {}
                                for _, p in ipairs(content_parts) do
                                    if p.type == "text" then table_insert(texts, p.text) end
                                end
                                tool_content = table_concat(texts, "\n")
                            else
                                -- 混合内容(text + image),保持数组格式
                                tool_content = content_parts
                            end
                        else
                            tool_content = ""
                        end
                        table_insert(messages, {
                            role = "tool",
                            tool_call_id = tr.tool_use_id,
                            content = tool_content
                        })
                    end
                    -- tool_result 旁边可能还有 text
                    if #conv.text_parts > 0 then
                        table_insert(messages, {
                            role = msg.role,
                            content = table_concat(conv.text_parts, "\n\n")
                        })
                    end
                end

                -- 普通内容(无 tool_calls 也无 tool_results)
                if #conv.tool_calls == 0 and #conv.tool_results == 0 then
                    -- 如果只有 text,简化为 string
                    if #conv.openai_contents == 1 and conv.openai_contents[1].type == "text" then
                        table_insert(messages, {
                            role = msg.role,
                            content = conv.openai_contents[1].text
                        })
                    else
                        table_insert(messages, {
                            role = msg.role,
                            content = conv.openai_contents
                        })
                    end
                end
            end
        end
    end

    openai_body.messages = messages

    -- tools 转换: Claude {name, description, input_schema} -> OpenAI {type, function}
    if claude_body.tools then
        local openai_tools = {}
        for _, tool in ipairs(claude_body.tools) do
            table_insert(openai_tools, {
                type = "function",
                ["function"] = {
                    name = tool.name,
                    description = tool.description,
                    parameters = tool.input_schema
                }
            })
        end
        openai_body.tools = openai_tools
    end

    -- tool_choice 转换
    if claude_body.tool_choice then
        local tc = claude_body.tool_choice
        if tc.type == "auto" then
            openai_body.tool_choice = "auto"
        elseif tc.type == "any" then
            openai_body.tool_choice = "required"
        elseif tc.type == "tool" and tc.name then
            openai_body.tool_choice = {
                type = "function",
                ["function"] = { name = tc.name }
            }
        end
        -- parallel_tool_calls
        if tc.disable_parallel_tool_use ~= nil then
            openai_body.parallel_tool_calls = not tc.disable_parallel_tool_use
        end
    end

    -- thinking 转换: Claude {type:"enabled", budget_tokens:N} -> OpenAI reasoning 参数
    if claude_body.thinking then
        core.log.debug("[Claude->OpenAI] Found thinking config: type=", claude_body.thinking.type,
                ", budget_tokens=", claude_body.thinking.budget_tokens or 0)
        if claude_body.thinking.type == "enabled" then
            local budget = claude_body.thinking.budget_tokens or 8192
            openai_body.reasoning_max_tokens = budget
            if budget < 4096 then
                openai_body.reasoning_effort = "low"
            elseif budget < 16384 then
                openai_body.reasoning_effort = "medium"
            else
                openai_body.reasoning_effort = "high"
            end
            core.log.debug("[Claude->OpenAI] Converted thinking config: budget_tokens=", budget,
                    ", reasoning_effort=", openai_body.reasoning_effort,
                    ", reasoning_max_tokens=", openai_body.reasoning_max_tokens)
        end
    end

    -- output_format 转换: Claude {type:"json_schema", schema:{...}} -> OpenAI response_format
    -- 参考 litellm translate_anthropic_output_format_to_openai
    if claude_body.output_format and type(claude_body.output_format) == "table" then
        if claude_body.output_format.type == "json_schema" and claude_body.output_format.schema then
            openai_body.response_format = {
                type = "json_schema",
                json_schema = {
                    name = "structured_output",
                    schema = claude_body.output_format.schema,
                    strict = true,
                }
            }
            core.log.debug("[Claude->OpenAI] Converted output_format to response_format: json_schema")
        end
    end

    core.log.debug("[Claude->OpenAI] Converted OpenAI request body: ", cjson.encode(openai_body))
    return openai_body
end

------------------------------------------------------------------------
-- 非流式响应转换: OpenAI -> Claude
-- 参考 Higress ConvertOpenAIResponseToClaude
------------------------------------------------------------------------

local function convert_openai_response_to_claude(openai_body, conf)
    core.log.debug("[OpenAI->Claude] Original OpenAI response body: ", cjson.encode(openai_body))

    -- 上游返回 error 对象时,直接透传真实错误信息给前端
    -- 注意:GLM 等模型可能不返回 error 字段,但其他模型可能返回 null,需排除 cjson.null
    if openai_body and openai_body.error and openai_body.error ~= cjson.null then
        local err_msg = "Unknown upstream error"
        if type(openai_body.error) == "table" then
            err_msg = openai_body.error.message or cjson.encode(openai_body.error)
        elseif type(openai_body.error) == "string" then
            err_msg = openai_body.error
        end
        core.log.debug("[OpenAI->Claude] Upstream returned error: ", err_msg)
        return {
            type = "error",
            error = {
                type = openai_body.error.type or "api_error",
                message = err_msg,
            }
        }
    end

    if not openai_body or not openai_body.choices
            or type(openai_body.choices) ~= "table" or #openai_body.choices == 0 then
        return {
            type = "error",
            error = { type = "api_error", message = "Empty response from upstream" }
        }
    end

    local choice = openai_body.choices[1]
    local msg = choice.message or {}
    local content = {}

    -- reasoning/thinking content(兼容 reasoning_content 和 reasoning 两种字段名)
    -- 注意:GLM 等模型返回 "reasoning_content": null,cjson.null 是 truthy,必须用 type() 过滤
    local rc = msg.reasoning_content
    local r = msg.reasoning
    local reasoning = (type(rc) == "string" and rc ~= "" and rc)
            or (type(r) == "string" and r ~= "" and r)
            or nil
    if reasoning then
        core.log.debug("[OpenAI->Claude] Added thinking content: ", string_sub(reasoning, 1, 200))
        table_insert(content, {
            type = "thinking",
            thinking = reasoning,
            signature = "",  -- Claude 协议要求 signature 字段
        })
    end

    -- text content
    -- 注意:GLM 等模型可能返回 "content": null,必须用 type() 过滤 cjson.null
    if type(msg.content) == "string" and msg.content ~= "" then
        table_insert(content, {
            type = "text",
            text = msg.content
        })
    end

    -- tool_calls -> tool_use content blocks
    if msg.tool_calls and type(msg.tool_calls) == "table" then
        for _, tc in ipairs(msg.tool_calls) do
            local input = {}
            if tc["function"] and tc["function"].arguments then
                input = cjson.decode(tc["function"].arguments) or {}
            end
            table_insert(content, {
                type = "tool_use",
                id = tc.id,
                name = tc["function"] and tc["function"].name or "",
                input = input
            })
        end
    end

    -- 空 content 兜底
    if #content == 0 then
        table_insert(content, { type = "text", text = "" })
    end

    -- 构建 Claude 响应
    local stop_reason = openai_finish_reason_to_claude(choice.finish_reason)

    -- 构建 usage(参考 litellm: input_tokens = prompt_tokens - cached_tokens)
    local claude_usage = {
        input_tokens = 0,
        output_tokens = 0,
    }
    if type(openai_body.usage) == "table" then
        local prompt_tokens = openai_body.usage.prompt_tokens or 0
        local cached_tokens = 0
        if type(openai_body.usage.prompt_tokens_details) == "table"
                and openai_body.usage.prompt_tokens_details.cached_tokens then
            cached_tokens = openai_body.usage.prompt_tokens_details.cached_tokens
        end
        -- Claude 协议: input_tokens 是未缓存的 token 数
        claude_usage.input_tokens = prompt_tokens - cached_tokens
        claude_usage.output_tokens = openai_body.usage.completion_tokens or 0
        -- cache token 字段
        if cached_tokens > 0 then
            claude_usage.cache_read_input_tokens = cached_tokens
        end
        -- cache_creation_input_tokens(如果上游返回)
        if type(openai_body.usage.prompt_tokens_details) == "table"
                and openai_body.usage.prompt_tokens_details.cache_creation_input_tokens then
            claude_usage.cache_creation_input_tokens = openai_body.usage.prompt_tokens_details.cache_creation_input_tokens
        end
    end

    local claude_resp = {
        id = openai_body.id or ("msg_" .. ngx_now()),
        type = "message",
        role = "assistant",
        content = content,
        model = openai_body.model or "unknown",
        stop_reason = stop_reason,
        stop_sequence = cjson.null,
        usage = claude_usage,
    }

    core.log.debug("[OpenAI->Claude] Converted Claude response body: ", cjson.encode(claude_resp))
    return claude_resp
end

------------------------------------------------------------------------
-- 流式响应转换: OpenAI SSE -> Claude SSE
-- 参考 Higress ConvertOpenAIStreamResponseToClaude + buildClaudeStreamResponse
--
-- 关键设计(对齐 Higress):
-- 1. nextContentIndex 动态递增分配 content block index
-- 2. pendingStopReason 缓存 stop_reason 直到收到 usage 后一起发送
-- 3. tool_calls 支持 activeToolIndex + cachedArguments 序列化机制
-- 4. thinking block 输出 signature 字段
------------------------------------------------------------------------

-- 创建流式转换器状态(对应 Higress ClaudeToOpenAIConverter 的状态字段)
local function new_stream_ctx()
    return {
        message_start_sent = false,
        message_stop_sent = false,
        message_id = nil,
        -- content block index 动态分配
        next_content_index = 0,
        -- thinking block
        thinking_started = false,
        thinking_stopped = false,
        thinking_index = -1,
        -- text block
        text_started = false,
        text_stopped = false,
        text_index = -1,
        -- tool call 状态追踪
        tool_call_states = {},   -- map[openai_index] = {id, name, claude_index, started, stopped, cached_args}
        active_tool_index = nil, -- 当前活跃的 tool call index
        -- stop_reason 缓存
        pending_stop_reason = nil,
        -- usage
        output_tokens = 0,
    }
end

-- 启动一个 tool call content block(参考 Higress startToolCall)
local function start_tool_call(sctx, tool_state, conf)
    local events = {}

    -- 关闭 thinking block
    if sctx.thinking_started and not sctx.thinking_stopped then
        sctx.thinking_stopped = true
        core.log.debug("[OpenAI->Claude] Closing thinking content block before tool use")
        table_insert(events, {
            event = "content_block_stop",
            data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
        })
    end

    -- 关闭 text block
    if sctx.text_started and not sctx.text_stopped then
        sctx.text_stopped = true
        core.log.debug("[OpenAI->Claude] Closing text content block before tool use")
        table_insert(events, {
            event = "content_block_stop",
            data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
        })
    end

    -- 分配 Claude content index
    tool_state.claude_index = sctx.next_content_index
    sctx.next_content_index = sctx.next_content_index + 1
    tool_state.started = true

    core.log.debug("[OpenAI->Claude] Started tool call: Claude index=", tool_state.claude_index,
            ", id=", tool_state.id, ", name=", tool_state.name)

    -- content_block_start
    table_insert(events, {
        event = "content_block_start",
        data = cjson.encode({
            type = "content_block_start",
            index = tool_state.claude_index,
            content_block = {
                type = "tool_use",
                id = tool_state.id,
                name = tool_state.name,
                input = {}
            }
        })
    })

    -- 输出缓存的 arguments
    if tool_state.cached_args and tool_state.cached_args ~= "" then
        core.log.debug("[OpenAI->Claude] Outputting cached arguments for tool: ", tool_state.cached_args)
        table_insert(events, {
            event = "content_block_delta",
            data = cjson.encode({
                type = "content_block_delta",
                index = tool_state.claude_index,
                delta = { type = "input_json_delta", partial_json = tool_state.cached_args }
            })
        })
    end

    return events
end

-- 处理单个 OpenAI SSE data chunk,返回 Claude SSE events 列表
-- 参考 Higress buildClaudeStreamResponse
local function build_claude_stream_events(sctx, openai_chunk, conf)
    local events = {}

    local choice
    if openai_chunk.choices and type(openai_chunk.choices) == "table" and #openai_chunk.choices > 0 then
        choice = openai_chunk.choices[1]
    else
        choice = { index = 0, delta = {} }
    end
    local delta = choice.delta or {}

    -- 日志:分析当前 chunk 包含的内容
    local has_role = type(delta.role) == "string" and delta.role ~= ""
    local has_content = type(delta.content) == "string" and delta.content ~= ""
    local has_finish = choice.finish_reason ~= nil and choice.finish_reason ~= cjson.null
    local has_usage = type(openai_chunk.usage) == "table"
    local has_reasoning = (type(delta.reasoning_content) == "string" and delta.reasoning_content ~= "") or (type(delta.reasoning) == "string" and delta.reasoning ~= "")
    local has_tool_calls = delta.tool_calls ~= nil and delta.tool_calls ~= cjson.null
    core.log.debug("[OpenAI->Claude] Processing OpenAI chunk - Role: ", tostring(has_role),
            ", Content: ", tostring(has_content), ", Reasoning: ", tostring(has_reasoning),
            ", ToolCalls: ", tostring(has_tool_calls),
            ", FinishReason: ", tostring(has_finish), ", Usage: ", tostring(has_usage))

    -- message_start(只发一次)
    -- 注意:GLM 等模型后续 chunk 中 role 为 null(cjson.null),必须用 type() 过滤
    if type(delta.role) == "string" and delta.role ~= "" and not sctx.message_start_sent then
        sctx.message_start_sent = true
        sctx.message_id = openai_chunk.id

        core.log.debug("[OpenAI->Claude] Generated message_start event for id: ", openai_chunk.id)

        local msg_start = {
            type = "message_start",
            message = {
                id = openai_chunk.id or ("msg_" .. ngx_now()),
                type = "message",
                role = "assistant",
                content = {},
                model = openai_chunk.model or "unknown",
                stop_reason = cjson.null,
                stop_sequence = cjson.null,
                usage = {
                    input_tokens = 0,
                    output_tokens = 0,
                    cache_creation_input_tokens = 0,
                    cache_read_input_tokens = 0,
                }
            }
        }
        -- 如果首个 chunk 就带 usage
        if type(openai_chunk.usage) == "table" then
            msg_start.message.usage.input_tokens = openai_chunk.usage.prompt_tokens or 0
        end
        table_insert(events, { event = "message_start", data = cjson.encode(msg_start) })
    elseif type(delta.role) == "string" and delta.role ~= "" and sctx.message_start_sent then
        -- Skip duplicate role messages from OpenRouter
        core.log.debug("[OpenAI->Claude] Skipping duplicate role message for id: ", openai_chunk.id)
    end

    -- reasoning content(thinking)
    -- 注意:Kimi-K2.5 等模型的流式输出可能交错发送 reasoning_content 和 content
    -- 即某个 chunk 同时包含两者,或者 content 出现后仍有后续 reasoning_content。
    -- 因此不能在收到 content 时立即关闭 thinking block,而是延迟到 finish_reason 时关闭。
    -- 注意:GLM 等模型每个 chunk 都带 "reasoning_content": null,cjson.null 是 truthy 且 ~= "" 为 true,
    -- 必须用 type() == "string" 过滤
    local rc = delta.reasoning_content
    local r = delta.reasoning
    local reasoning = (type(rc) == "string" and rc ~= "" and rc)
            or (type(r) == "string" and r ~= "" and r)
            or nil
    if reasoning then
        if not sctx.thinking_started then
            sctx.thinking_index = sctx.next_content_index
            sctx.next_content_index = sctx.next_content_index + 1
            sctx.thinking_started = true
            table_insert(events, {
                event = "content_block_start",
                data = cjson.encode({
                    type = "content_block_start",
                    index = sctx.thinking_index,
                    content_block = { type = "thinking", thinking = "", signature = "" }
                })
            })
        end
        table_insert(events, {
            event = "content_block_delta",
            data = cjson.encode({
                type = "content_block_delta",
                index = sctx.thinking_index,
                delta = { type = "thinking_delta", thinking = reasoning }
            })
        })
    end

    -- text content
    -- 不在此处关闭 thinking block,因为后续 chunk 可能还有 reasoning_content(Kimi-K2.5 交错行为)。
    -- thinking block 的关闭统一在 finish_reason 处理中完成
    -- 注意:GLM 等模型每个 chunk 都带 "content": null,必须用 type() == "string" 过滤 cjson.null
    if type(delta.content) == "string" and delta.content ~= "" then
        if not sctx.text_started then
            sctx.text_index = sctx.next_content_index
            sctx.next_content_index = sctx.next_content_index + 1
            sctx.text_started = true
            table_insert(events, {
                event = "content_block_start",
                data = cjson.encode({
                    type = "content_block_start",
                    index = sctx.text_index,
                    content_block = { type = "text", text = "" }
                })
            })
        end
        table_insert(events, {
            event = "content_block_delta",
            data = cjson.encode({
                type = "content_block_delta",
                index = sctx.text_index,
                delta = { type = "text_delta", text = delta.content }
            })
        })
    end

    -- tool_calls(参考 Higress 的 activeToolIndex + cachedArguments 机制)
    if delta.tool_calls and type(delta.tool_calls) == "table" then
        -- 确保 message_start 已发送
        if not sctx.message_start_sent then
            sctx.message_start_sent = true
            sctx.message_id = openai_chunk.id
            core.log.debug("[OpenAI->Claude] Generated message_start event before tool calls for id: ", openai_chunk.id)
            table_insert(events, {
                event = "message_start",
                data = cjson.encode({
                    type = "message_start",
                    message = {
                        id = openai_chunk.id or ("msg_" .. ngx_now()),
                        type = "message", role = "assistant", content = {},
                        model = openai_chunk.model or "unknown",
                        stop_reason = cjson.null, stop_sequence = cjson.null,
                        usage = { input_tokens = 0, output_tokens = 0 }
                    }
                })
            })
        end

        for _, tc in ipairs(delta.tool_calls) do
            local idx = tc.index or 0

            -- 新 tool call(有 id 和 name)
            if tc.id and tc.id ~= "" and tc["function"] and tc["function"].name and tc["function"].name ~= "" then
                core.log.debug("[OpenAI->Claude] Processing tool call delta: index=", idx,
                        ", id=", tc.id, ", name=", tc["function"].name,
                        ", args=", tc["function"].arguments or "")
                if not sctx.tool_call_states[idx] then
                    sctx.tool_call_states[idx] = {
                        id = tc.id,
                        name = tc["function"].name,
                        claude_index = -1,
                        started = false,
                        stopped = false,
                        cached_args = "",
                    }
                end

                -- 如果没有活跃的 tool call,立即启动
                if sctx.active_tool_index == nil then
                    sctx.active_tool_index = idx
                    local tc_events = start_tool_call(sctx, sctx.tool_call_states[idx], conf)
                    for _, e in ipairs(tc_events) do
                        table_insert(events, e)
                    end
                end
            end

            -- arguments 增量
            if tc["function"] and tc["function"].arguments and tc["function"].arguments ~= "" then
                local state = sctx.tool_call_states[idx]
                if state then
                    state.cached_args = (state.cached_args or "") .. tc["function"].arguments
                    core.log.debug("[OpenAI->Claude] Cached arguments for tool index ", idx,
                            ": ", tc["function"].arguments, " (total: ", state.cached_args, ")")
                    -- 只有活跃的 tool call 才实时输出 delta
                    if sctx.active_tool_index == idx and state.started then
                        core.log.debug("[OpenAI->Claude] Generated input_json_delta event for active tool index ", idx,
                                ": ", tc["function"].arguments)
                        table_insert(events, {
                            event = "content_block_delta",
                            data = cjson.encode({
                                type = "content_block_delta",
                                index = state.claude_index,
                                delta = { type = "input_json_delta", partial_json = tc["function"].arguments }
                            })
                        })
                    end
                end
            end
        end
    end

    -- finish_reason 处理 (v0.8: 排除 cjson.null,JSON null 在 Lua 中是 truthy userdata)
    if choice.finish_reason and choice.finish_reason ~= cjson.null then
        local claude_reason = openai_finish_reason_to_claude(choice.finish_reason)
        core.log.debug("[OpenAI->Claude] Processing finish_reason: ", choice.finish_reason, " -> ", claude_reason)

        -- 关闭所有活跃的 content blocks
        if sctx.thinking_started and not sctx.thinking_stopped then
            sctx.thinking_stopped = true
            core.log.debug("[OpenAI->Claude] Generated thinking content_block_stop event at index ", sctx.thinking_index)
            table_insert(events, {
                event = "content_block_stop",
                data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
            })
        end
        if sctx.text_started and not sctx.text_stopped then
            sctx.text_stopped = true
            core.log.debug("[OpenAI->Claude] Generated text content_block_stop event at index ", sctx.text_index)
            table_insert(events, {
                event = "content_block_stop",
                data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
            })
        end

        -- 启动所有未启动的 tool calls,然后关闭所有 tool calls(按 index 排序)
        local sorted_indices = {}
        for k, _ in pairs(sctx.tool_call_states) do
            table_insert(sorted_indices, k)
        end
        table.sort(sorted_indices)

        for _, tidx in ipairs(sorted_indices) do
            local ts = sctx.tool_call_states[tidx]
            if not ts.started then
                core.log.debug("[OpenAI->Claude] Starting remaining tool call at finish: index=", tidx,
                        ", id=", ts.id, ", name=", ts.name)
                sctx.active_tool_index = tidx
                local tc_events = start_tool_call(sctx, ts, conf)
                for _, e in ipairs(tc_events) do table_insert(events, e) end
                sctx.active_tool_index = nil
            end
        end
        for _, tidx in ipairs(sorted_indices) do
            local ts = sctx.tool_call_states[tidx]
            if ts.started and not ts.stopped then
                ts.stopped = true
                core.log.debug("[OpenAI->Claude] Generated content_block_stop for tool at index ", tidx,
                        ", Claude index ", ts.claude_index)
                table_insert(events, {
                    event = "content_block_stop",
                    data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
                })
            end
        end

        -- 缓存 stop_reason,等 usage 到了一起发(Claude 协议要求)
        sctx.pending_stop_reason = claude_reason
        core.log.debug("[OpenAI->Claude] Cached stop_reason: ", claude_reason, ", waiting for usage")
    end

    -- usage 处理(可能和 finish_reason 在同一个 chunk,也可能在后续 chunk)?
    if type(openai_chunk.usage) == "table" then
        core.log.debug("[OpenAI->Claude] Processing usage info - input: ",
                openai_chunk.usage.prompt_tokens or 0, ", output: ",
                openai_chunk.usage.completion_tokens or 0)

        -- GLM 等模型可能在 usage-only chunk 中不带 finish_reason(choices 为空数组),
        -- 导致 content blocks 未被关闭。在发送 message_delta 之前补关所有未关闭的 blocks。
        if sctx.thinking_started and not sctx.thinking_stopped then
            sctx.thinking_stopped = true
            core.log.debug("[OpenAI->Claude] Closing thinking block before usage, index=", sctx.thinking_index)
            table_insert(events, {
                event = "content_block_stop",
                data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
            })
        end
        if sctx.text_started and not sctx.text_stopped then
            sctx.text_stopped = true
            core.log.debug("[OpenAI->Claude] Closing text block before usage, index=", sctx.text_index)
            table_insert(events, {
                event = "content_block_stop",
                data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
            })
        end
        -- 关闭所有未关闭的 tool call blocks
        local sorted_indices = {}
        for k, _ in pairs(sctx.tool_call_states) do
            table_insert(sorted_indices, k)
        end
        table.sort(sorted_indices)
        for _, tidx in ipairs(sorted_indices) do
            local ts = sctx.tool_call_states[tidx]
            if ts.started and not ts.stopped then
                ts.stopped = true
                core.log.debug("[OpenAI->Claude] Closing tool block before usage, index=", ts.claude_index)
                table_insert(events, {
                    event = "content_block_stop",
                    data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
                })
            end
        end

        -- 参考 litellm: input_tokens = prompt_tokens - cached_tokens
        local prompt_tokens = openai_chunk.usage.prompt_tokens or 0
        local cached_tokens = 0
        if type(openai_chunk.usage.prompt_tokens_details) == "table"
                and openai_chunk.usage.prompt_tokens_details.cached_tokens then
            cached_tokens = openai_chunk.usage.prompt_tokens_details.cached_tokens
        end
        local stream_usage = {
            input_tokens = prompt_tokens - cached_tokens,
            output_tokens = openai_chunk.usage.completion_tokens or 0,
        }
        if cached_tokens > 0 then
            stream_usage.cache_read_input_tokens = cached_tokens
        end
        if type(openai_chunk.usage.prompt_tokens_details) == "table"
                and openai_chunk.usage.prompt_tokens_details.cache_creation_input_tokens then
            stream_usage.cache_creation_input_tokens = openai_chunk.usage.prompt_tokens_details.cache_creation_input_tokens
        end
        local msg_delta = {
            type = "message_delta",
            delta = {
                stop_sequence = cjson.null,
            },
            usage = stream_usage,
        }
        -- 合并缓存的 stop_reason;如果没有缓存(GLM usage-only chunk 场景),默认 end_turn
        if sctx.pending_stop_reason then
            msg_delta.delta.stop_reason = sctx.pending_stop_reason
            core.log.debug("[OpenAI->Claude] Combining cached stop_reason ", sctx.pending_stop_reason, " with usage")
            sctx.pending_stop_reason = nil
        else
            msg_delta.delta.stop_reason = "end_turn"
            core.log.debug("[OpenAI->Claude] No cached stop_reason, defaulting to end_turn")
        end
        table_insert(events, { event = "message_delta", data = cjson.encode(msg_delta) })
        core.log.debug("[OpenAI->Claude] Generated message_delta event with usage and stop_reason")

        -- message_stop
        if not sctx.message_stop_sent then
            sctx.message_stop_sent = true
            core.log.debug("[OpenAI->Claude] Generated message_stop event")
            table_insert(events, {
                event = "message_stop",
                data = cjson.encode({ type = "message_stop" })
            })
        end
    end

    return events
end

-- 处理 [DONE] 信号,发送所有未关闭的 block 和结束事件,并重置状态
local function build_claude_stream_done(sctx, conf)
    core.log.debug("[OpenAI->Claude] Processing [DONE] message, finalizing stream")
    local events = {}

    -- 关闭所有未关闭的 content blocks
    if sctx.thinking_started and not sctx.thinking_stopped then
        sctx.thinking_stopped = true
        table_insert(events, {
            event = "content_block_stop",
            data = cjson.encode({ type = "content_block_stop", index = sctx.thinking_index })
        })
    end
    if sctx.text_started and not sctx.text_stopped then
        sctx.text_stopped = true
        table_insert(events, {
            event = "content_block_stop",
            data = cjson.encode({ type = "content_block_stop", index = sctx.text_index })
        })
    end
    for _, ts in pairs(sctx.tool_call_states) do
        if ts.started and not ts.stopped then
            ts.stopped = true
            table_insert(events, {
                event = "content_block_stop",
                data = cjson.encode({ type = "content_block_stop", index = ts.claude_index })
            })
        end
    end

    -- 如果还有缓存的 stop_reason 没发(没收到 usage 的情况)
    if sctx.pending_stop_reason then
        table_insert(events, {
            event = "message_delta",
            data = cjson.encode({
                type = "message_delta",
                delta = { stop_reason = sctx.pending_stop_reason, stop_sequence = cjson.null },
                usage = { output_tokens = 0 }
            })
        })
        sctx.pending_stop_reason = nil
    end

    -- message_stop
    if sctx.message_start_sent and not sctx.message_stop_sent then
        sctx.message_stop_sent = true
        table_insert(events, {
            event = "message_stop",
            data = cjson.encode({ type = "message_stop" })
        })
    end

    -- 重置所有状态,防止连接复用时状态污染(对齐 Higress [DONE] 后的 reset 逻辑)
    sctx.message_start_sent = false
    sctx.message_stop_sent = false
    sctx.message_id = nil
    sctx.next_content_index = 0
    sctx.thinking_started = false
    sctx.thinking_stopped = false
    sctx.thinking_index = -1
    sctx.text_started = false
    sctx.text_stopped = false
    sctx.text_index = -1
    sctx.tool_call_states = {}
    sctx.active_tool_index = nil
    sctx.pending_stop_reason = nil
    sctx.output_tokens = 0

    core.log.debug("[OpenAI->Claude] Reset converter state for next request")
    return events
end

------------------------------------------------------------------------
-- APISIX 插件钩子
------------------------------------------------------------------------

-- rewrite 阶段:在 key-auth(2500) 之前执行 header 转换,确保认证能通过
function _M.rewrite(conf, ctx)
    local uri = ngx.var.uri
    if uri ~= "/v1/messages" then
        core.log.debug("[APISIX] rewrite phase skipped, uri=", uri,
                ", method=", ngx.req.get_method(),
                ", x-api-key=", ngx.req.get_headers()["x-api-key"] and "present" or "nil",
                ", authorization=", ngx.req.get_headers()["authorization"] and "present" or "nil")
        return
    end

    core.log.debug("[APISIX] rewrite phase triggered, uri=", uri,
            ", method=", ngx.req.get_method())

    -- Header 转换: x-api-key -> Authorization: Bearer
    -- 必须在 rewrite 阶段完成,否则 key-auth 在 rewrite 阶段读不到 Authorization
    local api_key = ngx.req.get_headers()["x-api-key"]
    if api_key then
        core.log.debug("[Claude->OpenAI] Converting x-api-key to Authorization header")
        ngx.req.set_header("Authorization", "Bearer " .. api_key)
        ngx.req.clear_header("x-api-key")
    end
end

-- access 阶段:改写请求体 + URI + 清理其他 header
function _M.access(conf, ctx)
    local uri = ngx.var.uri
    if uri ~= "/v1/messages" then
        core.log.debug("[APISIX] access phase skipped, uri=", uri)
        return
    end

    core.log.debug("[APISIX] access phase triggered, uri=", uri)

    ngx.req.read_body()
    local body = ngx.req.get_body_data()
    if not body then
        -- body 可能在临时文件中(大请求体场景)
        local file_name = ngx.req.get_body_file()
        if file_name then
            local f = io.open(file_name, "r")
            if f then
                body = f:read("*a")
                f:close()
            end
        end
        if not body then
            core.log.error("failed to read request body, body_file=",
                    file_name or "nil")
            return
        end
    end

    local claude_body, decode_err = cjson.decode(body)
    if not claude_body then
        core.log.error("failed to decode Claude request body, err=",
                decode_err or "unknown", ", body_prefix=", string_sub(body, 1, 200))
        return
    end

    -- 保存 stream 标志
    ctx.claude2openai_stream = claude_body.stream or false

    -- 转换请求体
    local openai_body = convert_claude_request_to_openai(claude_body, conf)
    local encoded_body, encode_err = cjson.encode(openai_body)
    if not encoded_body then
        core.log.error("failed to encode OpenAI request body, err=",
                encode_err or "unknown")
        return
    end
    ngx.req.set_body_data(encoded_body)

    -- 改写 URI
    ngx.req.set_uri("/v1/chat/completions")

    -- 清理 Claude 特有 header
    ngx.req.clear_header("anthropic-version")
    ngx.req.clear_header("anthropic-beta")

    ctx.claude2openai_converted = true
    core.log.debug("[c2o v0.8] request converted, stream=", ctx.claude2openai_stream,
            ", model=", claude_body.model or "nil",
            ", ctx_id=", tostring(ctx))
end

-- 响应头阶段
function _M.header_filter(conf, ctx)
    if not ctx.claude2openai_converted then return end

    core.log.debug("[APISIX] header_filter phase, stream=", ctx.claude2openai_stream,
            ", upstream_status=", ngx.status,
            ", upstream_content_type=", ngx.header["Content-Type"] or "nil",
            ", upstream_content_length=", ngx.header["Content-Length"] or "nil")

    -- 上游返回非 2xx 时记录警告
    if ngx.status >= 400 then
        core.log.error("upstream returned error status=", ngx.status)
    end

    if ctx.claude2openai_stream then
        ngx.header["Content-Type"] = "text/event-stream"
        -- 关键:SSE 流式响应必须关闭缓冲,否则 nginx 会缓冲上游数据,
        -- 导致 body_filter 在 SSE data 行中间分割 chunk,丢失大量数据
        ngx.header["X-Accel-Buffering"] = "no"
    else
        ngx.header["Content-Type"] = "application/json"
    end
    -- 响应体会被改写,移除 Content-Length
    ngx.header["Content-Length"] = nil
end

-- 响应体阶段
-- 注意:当 ai-proxy-multi bypass 模式下,lua_body_filter 已经完成了转换,
-- 其输出通过 ngx_print 会再次触发 nginx body_filter。
-- 必须检测并跳过,避免对已转换的 Claude SSE 数据二次处理。
function _M.body_filter(conf, ctx)
    if not ctx.claude2openai_converted then return end

    -- 如果 lua_body_filter 已经在处理流式转换,跳过 nginx body_filter
    -- 避免对 lua_body_filter 输出的 Claude SSE 数据进行二次转换
    if ctx.claude2openai_lua_body_filter_active then
        return
    end

    local chunk = ngx.arg[1]
    local eof = ngx.arg[2]

    core.log.debug("[APISIX] body_filter phase v0.6 (nginx), stream=", ctx.claude2openai_stream,
            ", chunk_len=", chunk and #chunk or 0, ", eof=", eof,
            ", status=", ngx.status)

    if ctx.claude2openai_stream then
        -- 流式响应:逐 chunk 转换
        if not ctx.claude2openai_sctx then
            ctx.claude2openai_sctx = new_stream_ctx()
            ctx.claude2openai_line_buf = ""  -- SSE 行缓冲,处理 body_filter chunk 边界分割
        end
        local sctx = ctx.claude2openai_sctx

        local output_parts = {}

        -- 流式请求上游返回非流式 error JSON 时(如 4xx/5xx),直接透传真实错误
        if not ctx.claude2openai_stream_error_checked then
            ctx.claude2openai_stream_error_checked = true
            if ngx.status >= 400 and chunk and chunk ~= "" then
                core.log.error("upstream error response (stream), status=", ngx.status,
                        ", body=", string_sub(chunk, 1, 4096))
                local err_resp, _ = cjson.decode(chunk)
                if err_resp and err_resp.error then
                    local err_msg = "Unknown upstream error"
                    if type(err_resp.error) == "table" then
                        err_msg = err_resp.error.message or cjson.encode(err_resp.error)
                    elseif type(err_resp.error) == "string" then
                        err_msg = err_resp.error
                    end
                    core.log.debug("[OpenAI->Claude] Stream request got upstream error: ", err_msg)
                    local claude_err = cjson.encode({
                        type = "error",
                        error = {
                            type = err_resp.error.type or "api_error",
                            message = err_msg,
                        }
                    })
                    ngx.arg[1] = claude_err
                    return
                end
            end
        end

        if chunk and chunk ~= "" then
            core.log.debug("[OpenAI->Claude] Original OpenAI streaming chunk len=", #chunk)

            -- 行缓冲:nginx body_filter 可能在 SSE data 行中间分割 chunk
            -- 导致不完整的 JSON 行被丢弃。将上一次的不完整行和当前 chunk 拼接。
            local buf = chunk
            if ctx.claude2openai_line_buf and ctx.claude2openai_line_buf ~= "" then
                buf = ctx.claude2openai_line_buf .. buf
                core.log.debug("[OpenAI->Claude] Prepended buffered line fragment, len=",
                        #ctx.claude2openai_line_buf)
                ctx.claude2openai_line_buf = ""
            end

            -- 检查 chunk 是否以换行结尾,如果不是,最后一行可能不完整
            local last_char = string_sub(buf, -1)
            local complete_buf, trailing
            if last_char == "\n" or last_char == "\r" then
                complete_buf = buf
                trailing = nil
            else
                -- 找到最后一个换行符的位置,之后的内容缓存到下次
                local last_nl = nil
                for i = #buf, 1, -1 do
                    local c = string_sub(buf, i, i)
                    if c == "\n" or c == "\r" then
                        last_nl = i
                        break
                    end
                end
                if last_nl then
                    complete_buf = string_sub(buf, 1, last_nl)
                    trailing = string_sub(buf, last_nl + 1)
                else
                    -- 整个 buf 都没有换行,全部缓存
                    ctx.claude2openai_line_buf = buf
                    core.log.debug("[OpenAI->Claude] Entire chunk buffered (no newline), len=", #buf)
                    complete_buf = nil
                    trailing = nil
                end
            end

            if trailing and trailing ~= "" then
                ctx.claude2openai_line_buf = trailing
                core.log.debug("[OpenAI->Claude] Buffered trailing incomplete line, len=", #trailing)
            end

            if complete_buf and complete_buf ~= "" then
                for line in complete_buf:gmatch("[^\r\n]+") do
                    if line:sub(1, 6) == "data: " then
                        local data = line:sub(7)
                        if data == "[DONE]" then
                            local done_events = build_claude_stream_done(sctx, conf)
                            for _, evt in ipairs(done_events) do
                                table_insert(output_parts,
                                        "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
                            end
                        else
                            local openai_chunk, chunk_err = cjson.decode(data)
                            if openai_chunk then
                                -- SSE data 中包含 error 对象时,转换为 Claude error 格式
                                if openai_chunk.error then
                                    local err_msg = "Unknown upstream error"
                                    if type(openai_chunk.error) == "table" then
                                        err_msg = openai_chunk.error.message or cjson.encode(openai_chunk.error)
                                    elseif type(openai_chunk.error) == "string" then
                                        err_msg = openai_chunk.error
                                    end
                                    core.log.debug("[OpenAI->Claude] SSE chunk contains error: ", err_msg)
                                    local claude_err = cjson.encode({
                                        type = "error",
                                        error = {
                                            type = openai_chunk.error.type or "api_error",
                                            message = err_msg,
                                        }
                                    })
                                    table_insert(output_parts, "event: error\ndata: " .. claude_err .. "\n\n")
                                else
                                    local ok, err = pcall(function()
                                        local events = build_claude_stream_events(sctx, openai_chunk, conf)
                                        core.log.debug("[OpenAI->Claude] Generated ", #events, " Claude stream events from OpenAI chunk")
                                        for i, evt in ipairs(events) do
                                            core.log.debug("[OpenAI->Claude] Stream event [", i, "/", #events, "]: ",  evt.data)
                                            table_insert(output_parts,
                                                    "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
                                        end
                                    end)
                                    if not ok then
                                        core.log.error("stream event conversion error, err=",
                                                err, ", data_prefix=", string_sub(data, 1, 200))
                                    end
                                end
                            else
                                core.log.error("failed to decode stream chunk, err=",
                                        chunk_err or "unknown", ", data_prefix=", string_sub(data, 1, 200))
                            end
                        end
                    end
                end
            end -- complete_buf
        end

        -- EOF 时处理行缓冲残留
        if eof and ctx.claude2openai_line_buf and ctx.claude2openai_line_buf ~= "" then
            local remaining = ctx.claude2openai_line_buf
            ctx.claude2openai_line_buf = ""
            core.log.debug("[OpenAI->Claude] Processing remaining buffered line at EOF, len=", #remaining)
            for line in remaining:gmatch("[^\r\n]+") do
                if line:sub(1, 6) == "data: " then
                    local data = line:sub(7)
                    if data == "[DONE]" then
                        local done_events = build_claude_stream_done(sctx, conf)
                        for _, evt in ipairs(done_events) do
                            table_insert(output_parts,
                                    "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
                        end
                    else
                        local openai_chunk, chunk_err = cjson.decode(data)
                        if openai_chunk then
                            local ok, err = pcall(function()
                                local events = build_claude_stream_events(sctx, openai_chunk, conf)
                                for _, evt in ipairs(events) do
                                    table_insert(output_parts,
                                            "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
                                end
                            end)
                            if not ok then
                                core.log.error("stream event conversion error at EOF, err=", err)
                            end
                        else
                            core.log.error("failed to decode buffered stream chunk at EOF, err=",
                                    chunk_err or "unknown")
                        end
                    end
                end
            end
        end

        if eof and not sctx.message_stop_sent then
            core.log.debug("[OpenAI->Claude] EOF reached without message_stop, sending final events")
            local done_events = build_claude_stream_done(sctx, conf)
            for _, evt in ipairs(done_events) do
                table_insert(output_parts,
                        "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
            end
        end

        ngx.arg[1] = #output_parts > 0 and table_concat(output_parts) or ""
        if #output_parts > 0 then
            core.log.debug("[OpenAI->Claude] Converted Claude streaming chunk: ", ngx.arg[1])
        end

    else
        -- 非流式响应:收集完整响应体后转换
        if not ctx.claude2openai_resp_chunks then
            ctx.claude2openai_resp_chunks = {}
        end

        if chunk and chunk ~= "" then
            table_insert(ctx.claude2openai_resp_chunks, chunk)
            if not eof then
                ngx.arg[1] = nil
                return
            end
        end

        if eof then
            local full_body = table_concat(ctx.claude2openai_resp_chunks)
            core.log.debug("[OpenAI->Claude] non-stream response complete, body_len=", #full_body,
                    ", status=", ngx.status)
            if ngx.status >= 400 then
                core.log.error("upstream error response, status=", ngx.status,
                        ", body=", string_sub(full_body, 1, 4096))
            end
            local openai_resp, resp_err = cjson.decode(full_body)
            if openai_resp then
                local claude_resp = convert_openai_response_to_claude(openai_resp, conf)
                local encoded, enc_err = cjson.encode(claude_resp)
                if encoded then
                    ngx.arg[1] = encoded
                else
                    core.log.error("failed to encode Claude response, err=",
                            enc_err or "unknown")
                    ngx.arg[1] = full_body
                end
            else
                core.log.error("failed to decode upstream response, err=",
                        resp_err or "unknown", ", body_prefix=", string_sub(full_body, 1, 300))
                ngx.arg[1] = full_body
            end
        end
    end
end

------------------------------------------------------------------------
-- lua_body_filter: 供 ai-proxy-multi 的 lua_response_filter 调用
-- 签名: (conf, ctx, headers, body) -> (code, new_body)
-- body 是 ai-proxy-multi 从上游body_reader() 读取的原始 SSE chunk
-- 返回 nil, new_body 替换输出内容
--
-- 关键:body_reader() 按 HTTP chunk 返回数据,chunk 边界可能在 SSE data 行
-- 中间截断,必须用行缓冲处理跨 chunk 的不完整行,否则会丢失事件。
------------------------------------------------------------------------

-- 处理完整的 SSE data 行,返回转换后的 Claude SSE 事件片段
local function process_sse_line(sctx, line, conf)
    local parts = {}
    if line:sub(1, 6) ~= "data: " then
        return parts
    end
    local data = line:sub(7)
    if data == "[DONE]" then
        local done_events = build_claude_stream_done(sctx, conf)
        for _, evt in ipairs(done_events) do
            table_insert(parts,
                    "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
        end
    else
        local openai_chunk, chunk_err = cjson.decode(data)
        if openai_chunk then
            if openai_chunk.error then
                local err_msg = "Unknown upstream error"
                if type(openai_chunk.error) == "table" then
                    err_msg = openai_chunk.error.message or cjson.encode(openai_chunk.error)
                elseif type(openai_chunk.error) == "string" then
                    err_msg = openai_chunk.error
                end
                local claude_err = cjson.encode({
                    type = "error",
                    error = {
                        type = openai_chunk.error.type or "api_error",
                        message = err_msg,
                    }
                })
                table_insert(parts, "event: error\ndata: " .. claude_err .. "\n\n")
            else
                local ok, err = pcall(function()
                    local events = build_claude_stream_events(sctx, openai_chunk, conf)
                    for _, evt in ipairs(events) do
                        table_insert(parts,
                                "event: " .. evt.event .. "\ndata: " .. evt.data .. "\n\n")
                    end
                end)
                if not ok then
                    core.log.warn("[c2o v0.8] convert err: ", err)
                end
            end
        else
            core.log.warn("[c2o v0.8] decode err: ", chunk_err,
                    " data=", string_sub(data, 1, 200))
        end
    end
    return parts
end

function _M.lua_body_filter(conf, ctx, headers, body)
    core.log.debug("[c2o v0.8] ENTER converted=",
            tostring(ctx.claude2openai_converted),
            " stream=", tostring(ctx.claude2openai_stream),
            " body_len=", body and #body or 0,
            " ctx_id=", tostring(ctx))
    if not ctx.claude2openai_converted then return end
    if not ctx.claude2openai_stream then
        return
    end

    -- 标记 lua_body_filter 已激活,阻止 nginx body_filter 二次处理
    ctx.claude2openai_lua_body_filter_active = true

    -- 流式响应转换
    if not ctx.claude2openai_sctx then
        ctx.claude2openai_sctx = new_stream_ctx()
        ctx.claude2openai_lua_line_buf = ""  -- 行缓冲:处理跨 chunk 的不完整 SSE 行
        core.log.debug("[c2o v0.8] INIT stream ctx")
    end
    local sctx = ctx.claude2openai_sctx

    if not body or body == "" then return nil, "" end

    core.log.debug("[c2o v0.8] IN len=", #body,
            " first80=", string_sub(body, 1, 80))

    local output_parts = {}

    -- 拼接上次残留的不完整行
    local buf = body
    if ctx.claude2openai_lua_line_buf and ctx.claude2openai_lua_line_buf ~= "" then
        buf = ctx.claude2openai_lua_line_buf .. buf
        core.log.debug("[c2o v0.8] prepend buf len=", #ctx.claude2openai_lua_line_buf)
        ctx.claude2openai_lua_line_buf = ""
    end

    -- 检查 buf 是否以换行结尾;如果不是,最后一行可能不完整
    local last_char = string_sub(buf, -1)
    local complete_buf, trailing
    if last_char == "\n" or last_char == "\r" then
        complete_buf = buf
        trailing = nil
    else
        -- 找最后一个换行符
        local last_nl = nil
        for i = #buf, 1, -1 do
            local c = string_sub(buf, i, i)
            if c == "\n" or c == "\r" then
                last_nl = i
                break
            end
        end
        if last_nl then
            complete_buf = string_sub(buf, 1, last_nl)
            trailing = string_sub(buf, last_nl + 1)
        else
            -- 整个 buf 都没有换行,全部缓存到下次
            ctx.claude2openai_lua_line_buf = buf
            core.log.debug("[c2o v0.8] no newline, buffered all len=", #buf)
            return nil, ""
        end
    end

    if trailing and trailing ~= "" then
        ctx.claude2openai_lua_line_buf = trailing
        core.log.debug("[c2o v0.8] trailing buf len=", #trailing,
                " content=", string_sub(trailing, 1, 80))
    end

    -- 逐行解析完整的 SSE 行
    for line in complete_buf:gmatch("[^\r\n]+") do
        local parts = process_sse_line(sctx, line, conf)
        for _, p in ipairs(parts) do
            table_insert(output_parts, p)
        end
    end

    local out = table_concat(output_parts)
    core.log.debug("[c2o v0.8] OUT len=", #out,
            " events=", #output_parts,
            " first100=", string_sub(out, 1, 100))
    return nil, out
end

return _M

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐