1. 项目概述:一个轻量级、可复用的ChatGPT API调用工具

最近在GitHub上看到一个挺有意思的项目,叫“ChristopheZhao/ChaGPT-API-Call”。光看名字,你可能会觉得这又是一个简单的API调用示例,但实际扒开代码一看,发现它远不止于此。这是一个封装了OpenAI ChatGPT API调用的Python工具库,核心目标是提供一个稳定、易用且具备一定扩展性的接口,让开发者能快速、安全地将大语言模型的能力集成到自己的应用中。

我自己在对接各种AI服务API时,最头疼的就是那些重复性的工作:处理网络请求、管理API密钥、解析响应、处理错误重试、控制速率限制,还得考虑如何优雅地处理上下文对话。这个项目恰好把这些痛点都考虑进去了。它不是一个简单的脚本,而是一个结构清晰的工具包,把调用ChatGPT API的通用逻辑抽象出来,让你可以专注于业务逻辑本身,而不是底层通信的细节。对于需要频繁调用ChatGPT API的开发者,或者想快速搭建一个基于大语言模型的对话、内容生成、代码辅助等功能的项目来说,这个工具能省下不少时间。

2. 核心设计思路与架构拆解

2.1 为什么需要封装?从直接调用到工具化

直接使用 requests 库调用OpenAI的API当然可以,代码可能就十几行。但当你需要处理以下场景时,裸调API的代码就会迅速变得臃肿且难以维护:

  1. 错误处理与重试 :网络波动、API限流(429错误)、服务器内部错误(5xx)是家常便饭。一个健壮的应用需要自动重试,并且可能对不同错误采用不同策略(例如,限流错误等待更久)。
  2. 上下文管理 :ChatGPT的对话能力依赖于维护一个消息历史列表( messages )。手动管理这个列表,包括控制长度(因为token数有限制)、实现类似“系统指令”固定、用户与助手消息交替等功能,代码会变得很琐碎。
  3. 配置管理 :API密钥、基础URL、模型版本、超时时间、最大token数等参数,如果散落在代码各处,改动起来将是灾难。
  4. 日志与监控 :我们需要知道每次调用耗时、消耗的token数、是否成功,这对于调试和成本核算至关重要。
  5. 扩展性 :未来如果想支持异步调用、切换不同的模型提供商、或者增加流式响应(streaming)功能,直接写死的代码很难扩展。

ChaGPT-API-Call 这个项目的设计思路,正是为了解决上述问题。它采用了面向对象的设计,将API客户端、请求配置、上下文管理等功能模块化。

2.2 项目结构解析

虽然我无法看到项目实时的最新结构,但根据其命名和常见模式,可以推断其核心模块通常包括:

  • Client类 :这是核心。它封装了HTTP客户端(如 requests.Session ),内置了重试逻辑、错误处理、请求头构造(包含Authorization)、响应解析。它会暴露类似 chat_completion() 这样的主要方法。
  • Message/Context管理类 :负责维护一个对话会话(Session)。它可能提供一个类似 add_user_message() , add_assistant_message() , set_system_prompt() 的接口,并内部处理消息列表的构建,可能还会包含简单的token计数或截断逻辑(虽然精确的token计数需要依赖 tiktoken 库,但可以做一些基础的估算和长度控制)。
  • 配置类或字典 :集中管理所有可配置参数,如 api_key , base_url , model , temperature , max_tokens 等。这可以通过一个 Config 类或直接在Client初始化参数中实现。
  • 工具函数 :可能包含一些实用的函数,比如计算消息列表的大致token数、格式化响应、处理流式响应块等。

这种结构的好处是 高内聚、低耦合 。客户端只负责通信,上下文管理器只负责数据,配置集中管理。当你需要调整重试策略时,只需修改Client类;需要改变对话初始化方式时,只需修改Context类。

注意 :一个优秀的封装库应该在提供便利的同时,不隐藏原始API的灵活性。它应该允许高级用户直接传递原始的 messages 列表和参数,以覆盖默认行为。

3. 关键实现细节与源码级解读

3.1 客户端(Client)的核心实现

一个健壮的API客户端是工具的基石。我们来看看几个关键实现点。

3.1.1 请求重试与退避策略

这是生产级应用必须考虑的。直接使用 requests 加上 try...except 是不够的。通常会集成 urllib3 tenacity 库来实现重试。一个简单的策略可能如下:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class ChatGPTClient:
    def __init__(self, api_key, max_retries=3, backoff_factor=0.5):
        self.api_key = api_key
        self.session = requests.Session()
        
        # 定义重试策略
        retry_strategy = Retry(
            total=max_retries, # 总重试次数
            status_forcelist=[429, 500, 502, 503, 504], # 遇到这些状态码重试
            allowed_methods=["POST"], # 只对POST请求重试
            backoff_factor=backoff_factor # 退避等待时间因子
        )
        
        # 为session的http和https适配器挂载重试策略
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("https://", adapter)
        self.session.mount("http://", adapter)
        
        self.base_url = "https://api.openai.com/v1"
        self.headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

backoff_factor 的作用是让重试等待时间逐渐变长(指数退避)。例如,第一次重试等待 backoff_factor * (2^(0)) = 0.5秒 ,第二次等待 0.5 * (2^(1)) = 1秒 ,第三次等待 2秒 。这能有效避免在服务器恢复过程中继续“轰炸”API。

3.1.2 响应解析与错误处理

API的响应可能成功,也可能包含业务逻辑错误(如提示词违规、模型过载)或HTTP错误。需要统一处理。

    def chat_completion(self, messages, model="gpt-3.5-turbo", **kwargs):
        url = f"{self.base_url}/chat/completions"
        payload = {
            "model": model,
            "messages": messages,
            **kwargs # 允许用户覆盖或添加其他参数,如temperature, max_tokens
        }
        
        try:
            response = self.session.post(url, json=payload, headers=self.headers, timeout=30)
            response.raise_for_status() # 如果状态码不是200,抛出HTTPError
            result = response.json()
            
            # 即使HTTP状态码是200,OpenAI API也可能在返回体中包含错误信息
            # 但通常结构是固定的,我们主要检查是否有'choices'字段
            if "choices" not in result or len(result["choices"]) == 0:
                raise ValueError("Invalid response format from API")
                
            # 提取助手的回复内容
            assistant_message = result["choices"][0]["message"]["content"]
            # 通常还会返回完整的响应和usage信息,便于记录
            full_response = result
            return assistant_message, full_response
            
        except requests.exceptions.RequestException as e:
            # 处理网络超时、连接错误等
            print(f"Network error occurred: {e}")
            # 这里可以触发自定义的重试或告警逻辑
            raise
        except ValueError as e:
            # 处理响应格式错误
            print(f"Response parsing error: {e}")
            raise
        except Exception as e:
            # 捕获其他未知异常
            print(f"Unexpected error: {e}")
            raise

实操心得 :错误处理时,一定要区分“可重试的错误”(如网络超时、5xx错误、429限流)和“不可重试的错误”(如401密钥错误、400请求格式错误)。对于可重试错误,应由重试机制处理;对于不可重试错误,应立即失败并给出明确提示。上面的代码片段将重试委托给了 urllib3 ,而业务逻辑错误在 raise_for_status() 和后续检查中抛出。

3.2 对话上下文(Context)管理

这是让ChatGPT拥有“记忆”的关键。一个简单的上下文管理器需要实现以下功能:

class ConversationContext:
    def __init__(self, system_prompt="You are a helpful assistant."):
        self.messages = []
        if system_prompt:
            self.add_message("system", system_prompt)
    
    def add_message(self, role, content):
        """添加一条消息。role可以是'system', 'user', 'assistant'"""
        if role not in ["system", "user", "assistant"]:
            raise ValueError(f"Invalid role: {role}. Must be 'system', 'user', or 'assistant'")
        self.messages.append({"role": role, "content": content})
    
    def add_user_message(self, content):
        self.add_message("user", content)
    
    def add_assistant_message(self, content):
        self.add_message("assistant", content)
    
    def get_messages(self):
        """返回当前的消息历史,用于API调用"""
        return self.messages.copy() # 返回副本,防止外部修改
    
    def clear_messages(self, keep_system=True):
        """清空对话历史,默认保留系统指令"""
        system_msg = None
        if keep_system and self.messages and self.messages[0]["role"] == "system":
            system_msg = self.messages[0]
        self.messages = []
        if system_msg:
            self.messages.append(system_msg)

3.2.1 Token限制与历史消息裁剪

更高级的上下文管理器还需要处理token限制。OpenAI的API有上下文窗口限制(例如 gpt-3.5-turbo 是16K tokens)。我们需要在消息列表总长度接近限制时,移除最早的一些对话轮次(但通常保留系统指令)。

实现精确的token计数需要 tiktoken 库。一个简化的裁剪策略可以是基于字符数或消息条数的估算:

import tiktoken

class SmartConversationContext(ConversationContext):
    def __init__(self, system_prompt="", model="gpt-3.5-turbo", max_tokens_limit=4000):
        super().__init__(system_prompt)
        self.model = model
        self.max_tokens_limit = max_tokens_limit # 设置一个安全上限,小于模型实际限制
        try:
            self.encoding = tiktoken.encoding_for_model(model)
        except KeyError:
            # 如果模型未找到,使用cl100k_base作为后备(ChatGPT系列通用)
            self.encoding = tiktoken.get_encoding("cl100k_base")
    
    def _count_tokens(self, messages):
        """粗略计算messages列表的token数"""
        # 官方提供的计算方式:https://platform.openai.com/docs/guides/text-generation/managing-tokens
        tokens_per_message = 3 # 每条消息额外的token开销(角色、内容等)
        tokens_per_name = 1 # 名字字段的token(如果存在)
        
        num_tokens = 0
        for message in messages:
            num_tokens += tokens_per_message
            for key, value in message.items():
                num_tokens += len(self.encoding.encode(value))
                if key == "name":
                    num_tokens += tokens_per_name
        num_tokens += 3 # 回复开始的额外token
        return num_tokens
    
    def add_user_message(self, content, auto_truncate=True):
        """添加用户消息,如果开启自动裁剪,会在添加前检查并清理历史"""
        if auto_truncate:
            self._auto_truncate_history()
        super().add_user_message(content)
    
    def _auto_truncate_history(self):
        """当历史消息token数接近上限时,从最早的user/assistant对话对开始移除"""
        while len(self.messages) > 1: # 至少保留系统消息
            current_tokens = self._count_tokens(self.messages)
            if current_tokens < self.max_tokens_limit * 0.9: # 留10%余量给本次请求的回复
                break
            # 假设消息顺序是 [系统, (用户1, 助手1), (用户2, 助手2), ...]
            # 移除第二和第三条消息(第一对对话)
            if len(self.messages) >= 3:
                # 保留系统消息(索引0)
                removed = self.messages.pop(1) # 移除最早的user
                removed = self.messages.pop(1) # 移除对应的assistant
                print(f"Truncated history. Removed early Q&A pair.")
            else:
                # 如果只有系统消息和一条用户消息,无法再裁剪,可能需要警告或截断单条消息内容
                # 这里可以引入单条消息内容截断逻辑
                break

注意事项 :Token裁剪是一个复杂问题。上述策略(按对话轮次移除)比较通用,但可能会破坏对话的逻辑连贯性。对于需要超长上下文的应用,更好的方案是使用向量数据库进行检索,只将最相关的历史片段放入上下文,或者直接选用上下文窗口更大的模型(如GPT-4 Turbo 128K)。

4. 完整集成与使用示例

现在,我们将客户端和上下文管理器组合起来,形成一个完整的工作流。

4.1 基础使用:快速开始

假设项目已经将上述类封装好,并提供了简洁的入口。使用起来可能像这样:

# 假设库已安装或导入
from chatgpt_api_call import ChatGPTClient, ConversationContext

# 1. 初始化客户端(API_KEY应从环境变量或安全配置中读取,切勿硬编码)
client = ChatGPTClient(api_key="your-api-key-here")

# 2. 创建一个对话上下文,可以设置系统指令
conversation = ConversationContext(system_prompt="你是一位精通Python的编程助手,回答要简洁专业。")

# 3. 添加用户消息
conversation.add_user_message("请用Python写一个快速排序函数,并加上注释。")

# 4. 调用API,获取回复
try:
    reply, full_response = client.chat_completion(
        messages=conversation.get_messages(),
        model="gpt-3.5-turbo",
        temperature=0.7,
        max_tokens=500
    )
    print("助手回复:", reply)
    
    # 5. 将助手的回复加入上下文,以维持对话记忆
    conversation.add_assistant_message(reply)
    
    # 打印本次消耗的token数(从full_response中获取)
    if 'usage' in full_response:
        usage = full_response['usage']
        print(f"本次消耗: {usage['total_tokens']} tokens (Prompt: {usage['prompt_tokens']}, Completion: {usage['completion_tokens']})")
        
except Exception as e:
    print(f"调用失败: {e}")

4.2 进阶功能:流式响应处理

对于生成较长内容时,为了提升用户体验(实现打字机效果),可以使用流式响应(Streaming)。这需要处理服务器发送的(Server-Sent Events, SSE)。一个处理流式响应的客户端方法可能如下:

def chat_completion_stream(self, messages, model="gpt-3.5-turbo", **kwargs):
    url = f"{self.base_url}/chat/completions"
    payload = {
        "model": model,
        "messages": messages,
        "stream": True, # 关键参数
        **kwargs
    }
    
    response = self.session.post(url, json=payload, headers=self.headers, stream=True, timeout=60)
    response.raise_for_status()
    
    collected_content = ""
    for line in response.iter_lines():
        if line:
            decoded_line = line.decode('utf-8')
            if decoded_line.startswith('data: '):
                data_str = decoded_line[6:] # 去掉'data: '前缀
                if data_str == '[DONE]':
                    break
                try:
                    data = json.loads(data_str)
                    delta = data['choices'][0]['delta']
                    # delta中可能包含'role'或'content'
                    if 'content' in delta:
                        chunk = delta['content']
                        collected_content += chunk
                        yield chunk # 将每个内容块通过生成器返回
                except json.JSONDecodeError:
                    print(f"Failed to parse SSE data: {data_str}")
    # 流式结束后,可以返回完整内容(可选)
    # return collected_content

使用时:

print("助手回复(流式): ", end="", flush=True)
full_reply = ""
for chunk in client.chat_completion_stream(conversation.get_messages()):
    print(chunk, end="", flush=True)
    full_reply += chunk
print() # 换行
conversation.add_assistant_message(full_reply)

4.3 配置与最佳实践

4.3.1 安全地管理API密钥

绝对不要将API密钥硬编码在源代码中!推荐的做法:

  1. 环境变量 :这是最简单通用的方式。

    # 在终端中设置(临时)
    export OPENAI_API_KEY='sk-...'
    # 或写入 ~/.bashrc 或 ~/.zshrc
    
    import os
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise ValueError("请设置 OPENAI_API_KEY 环境变量")
    
  2. 配置文件 :使用 .env 文件配合 python-dotenv 库。

    # .env 文件
    OPENAI_API_KEY=sk-...
    MODEL=gpt-3.5-turbo
    
    from dotenv import load_dotenv
    load_dotenv() # 加载 .env 文件中的变量到环境变量
    api_key = os.getenv("OPENAI_API_KEY")
    
  3. 密钥管理服务 :对于生产环境,使用AWS Secrets Manager、HashiCorp Vault等专业服务。

4.3.2 参数调优建议

  • temperature (温度,默认0.7) : 控制输出的随机性。值越高(如0.9),输出越多样、有创意;值越低(如0.2),输出越确定、一致。对于代码生成、事实问答,建议较低(0.1-0.3);对于创意写作,可以调高(0.7-0.9)。
  • max_tokens (最大token数) : 限制模型生成回复的长度。 务必设置 ,特别是对于开放域对话,防止生成过长的回复消耗不必要的token。根据你的需求合理设定,例如简短回答设100,长文设1000。
  • top_p (核采样,默认1) : 另一种控制随机性的方式,与 temperature 二选一即可,通常不需要同时调整。
  • frequency_penalty presence_penalty (频率惩罚和存在惩罚,默认0) : 用于降低重复用词的可能性。轻微的正值(如0.1-0.2)可以使文本更丰富,但过高可能导致语句不通顺。

5. 常见问题排查与实战技巧

在实际集成和使用过程中,你肯定会遇到各种问题。下面是我总结的一些常见坑点和解决思路。

5.1 网络与连接问题

问题现象 可能原因 排查步骤与解决方案
requests.exceptions.ConnectionError 或超时 1. 本地网络不稳定
2. 代理设置问题
3. OpenAI服务暂时不可用
1. 检查本地网络连接。
2. 如果你在公司网络或使用代理,需要在 requests.Session 中配置代理: session.proxies.update({"https": "your-proxy:port"})
3. 访问 status.openai.com 查看API服务状态。
4. 增加 timeout 参数(如 timeout=(10, 30) 表示连接超时10秒,读取超时30秒)。
ssl.SSLCertVerificationError Python环境SSL证书问题 1. (不推荐临时方案) 在请求中设置 verify=False ,但这会降低安全性。
2. (推荐) 更新你的CA证书包。可以尝试运行 /Applications/Python\ 3.x/Install\ Certificates.command (Mac) 或通过系统包管理器更新。
间歇性失败,错误码不固定 网络波动或服务器负载高 1. 启用重试机制 ,这是最重要的。使用上文提到的 urllib3.Retry 配置合理的重试次数和退避策略。
2. 考虑在应用层增加一个备用的重试循环,并记录失败日志用于分析。

5.2 API错误与限流

错误码/信息 含义 解决方案
401 Invalid Authentication API密钥错误、过期或格式不对。 1. 检查密钥是否正确复制,确保包含 sk- 前缀。
2. 在OpenAI平台检查该密钥是否被禁用或额度已用完。
3. 确保请求头是 Authorization: Bearer <your-key>
429 Rate limit exceeded 请求速率超过限制。分两种:RPM(每分钟请求数)和TPM(每分钟token数)。 1. 降低请求频率 :在客户端实现请求队列或延迟。
2. 处理TPM限制 :估算你请求的token数,如果单个请求消耗token过多,也会触发TPM限流。需要优化提示词,减少上下文长度,或升级到拥有更高限额的账户。
3. 实现指数退避重试 :遇到429错误时,等待一段时间(可从响应头 Retry-After 中读取建议时间)再重试。
400 Invalid Request 请求参数错误。常见于 messages 格式不对、 model 不存在、参数值超出范围等。 1. 仔细检查请求体JSON格式,特别是 messages 列表,确保每个元素都有 role content 字段,且 role 值正确。
2. 核对 model 参数名称,例如是 gpt-3.5-turbo 而不是 gpt-3.5
3. 确保 max_tokens 等数值参数在合理范围内。
500 / 502 / 503 OpenAI服务器内部错误。 1. 这些错误通常是暂时的。 启用重试机制 是关键。
2. 如果持续出现,需要联系OpenAI支持或查看服务状态页。

5.3 内容与逻辑问题

问题 分析与技巧
回复不符合预期或“胡言乱语” 1. 检查系统指令 :系统指令对模型行为有深远影响。确保你的 system_prompt 清晰、明确地定义了助手的角色和任务边界。
2. 调整 temperature :过高的 temperature 会导致输出随机性大。对于需要准确性的任务,将其调低至0.1或0.2。
3. 优化用户提示 :遵循“清晰、具体、提供上下文”的原则。将复杂任务拆解,或提供少量示例(Few-shot Learning)。
上下文丢失,模型忘记之前对话 1. 确认消息列表 :每次调用API时,必须将 完整的 历史消息列表(包括之前的所有user和assistant消息)传入 messages 参数。只传最新的一条用户消息会导致模型失忆。
2. 检查上下文管理器 :确保你的 ConversationContext 类在 add_assistant_message 后正确保存了回复。
3. 注意token限制 :如果启用了自动裁剪,可能历史消息被过早移除。可以调高 max_tokens_limit 或关闭自动裁剪,手动管理历史长度。
处理长文本时被截断 1. 模型有上下文窗口限制 :例如 gpt-3.5-turbo 约16K tokens。你的输入(提示词+历史)和输出( max_tokens )之和不能超过此限制。
2. 主动管理上下文 :对于长文档处理,不要一次性全部喂给模型。可以总结、分段,或使用“Map-Reduce”等模式。
3. 使用支持更长上下文的模型 :如 gpt-3.5-turbo-16k gpt-4-turbo-preview
如何计算成本? 成本由消耗的token数决定。 full_response 中的 usage 字段会明确给出 prompt_tokens (输入)和 completion_tokens (输出)。
计算公式: 总成本 = (prompt_tokens * 输入单价 + completion_tokens * 输出单价) / 1000 。单价需查阅OpenAI最新定价页。务必在代码中记录 usage 数据,以便后续分析和成本控制。

5.4 性能优化技巧

  1. 连接池复用 :使用 requests.Session() 可以复用TCP连接,显著减少高频调用时的开销。 ChaGPT-API-Call 项目中的客户端应该已经做到了这一点。
  2. 异步调用 :如果你的应用是异步框架(如FastAPI, Sanic),或者需要同时发起多个不依赖彼此结果的API调用,使用 aiohttp 实现异步客户端可以极大提升吞吐量。这可以作为该库的一个高级扩展方向。
  3. 批量处理 :对于大量独立的文本处理任务(如情感分析、分类),可以考虑将多个任务合并到一个提示词中,让模型一次处理,然后解析多个结果。这比发起N次独立API调用更高效、更经济。但这需要精心设计提示词和结果解析逻辑。
  4. 缓存机制 :对于输入相同、预期输出也相同的确定性请求(例如, temperature=0 时,固定提示词的翻译任务),可以在客户端或应用层增加缓存,避免重复调用,节省成本和时间。

6. 项目扩展与高级应用场景

一个基础的API调用工具已经能解决大部分问题,但我们可以基于此构建更强大的应用。

6.1 构建一个简单的命令行聊天机器人

利用这个工具库,几十行代码就能实现一个持续对话的CLI工具:

import os
from chatgpt_api_call import ChatGPTClient, SmartConversationContext

def main():
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        print("错误:未设置 OPENAI_API_KEY 环境变量。")
        return
    
    client = ChatGPTClient(api_key=api_key)
    # 使用智能上下文,设置16K模型,限制上下文为12K tokens以留出回复空间
    context = SmartConversationContext(
        system_prompt="你是一个有用的助手。",
        model="gpt-3.5-turbo-16k",
        max_tokens_limit=12000
    )
    
    print("命令行ChatGPT助手已启动。输入内容开始对话,输入 '/quit' 退出,输入 '/clear' 清空历史。")
    while True:
        try:
            user_input = input("\nYou: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\n再见!")
            break
            
        if not user_input:
            continue
        if user_input.lower() == '/quit':
            print("再见!")
            break
        if user_input.lower() == '/clear':
            context.clear_messages()
            print("对话历史已清空。")
            continue
            
        context.add_user_message(user_input)
        try:
            print("Assistant: ", end="", flush=True)
            reply, full_resp = client.chat_completion(
                messages=context.get_messages(),
                model=context.model,
                temperature=0.7,
                stream=True # 假设我们的client支持返回生成器的stream模式
            )
            # 这里假设client.chat_completion在stream=True时返回一个生成器
            # 实际处理流式响应的代码略,参考4.2节
            # 为简化,我们这里用非流式
            reply, full_resp = client.chat_completion(
                messages=context.get_messages(),
                model=context.model,
                temperature=0.7,
                stream=False
            )
            print(reply)
            context.add_assistant_message(reply)
            
        except Exception as e:
            print(f"\n调用API时出错: {e}")
            # 可选:移除最后一条用户消息,因为这次对话失败了
            if context.messages and context.messages[-1]['role'] == 'user':
                context.messages.pop()

if __name__ == "__main__":
    main()

6.2 集成到Web应用(如Flask)

将其作为后端服务的一部分也非常简单:

from flask import Flask, request, jsonify, render_template
from chatgpt_api_call import ChatGPTClient, ConversationContext
import os

app = Flask(__name__)
client = ChatGPTClient(api_key=os.getenv("OPENAI_API_KEY"))
# 使用字典在内存中存储不同会话的上下文(生产环境应使用Redis等)
sessions = {}

@app.route('/')
def index():
    return render_template('chat.html') # 一个简单的聊天页面

@app.route('/api/chat', methods=['POST'])
def chat():
    data = request.json
    session_id = data.get('session_id', 'default')
    user_message = data.get('message')
    if not user_message:
        return jsonify({'error': 'No message provided'}), 400
    
    # 获取或创建会话上下文
    if session_id not in sessions:
        sessions[session_id] = ConversationContext()
    context = sessions[session_id]
    
    context.add_user_message(user_message)
    try:
        reply, full_resp = client.chat_completion(
            messages=context.get_messages(),
            model="gpt-3.5-turbo"
        )
        context.add_assistant_message(reply)
        
        return jsonify({
            'reply': reply,
            'session_id': session_id,
            'usage': full_resp.get('usage', {})
        })
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/session/clear', methods=['POST'])
def clear_session():
    data = request.json
    session_id = data.get('session_id', 'default')
    if session_id in sessions:
        sessions[session_id].clear_messages()
        return jsonify({'status': 'cleared'})
    return jsonify({'error': 'Session not found'}), 404

if __name__ == '__main__':
    app.run(debug=True)

6.3 实现函数调用(Function Calling)

OpenAI的Chat Completions API支持函数调用功能,这让模型能够根据对话内容,决定是否需要调用你预先定义好的函数(工具),并返回结构化的参数。这对于构建智能代理、连接外部数据和工具至关重要。一个封装良好的库应该支持此功能。

在客户端中,我们需要增加处理 functions function_call 参数的能力,并解析模型返回的特殊消息。

def chat_completion_with_functions(self, messages, functions=None, function_call="auto", **kwargs):
    """
    支持函数调用的聊天补全。
    :param functions: 列表,定义可供模型调用的函数。
    :param function_call: 控制模型行为,"auto"(默认,由模型决定),"none"(不调用函数),或指定函数名。
    """
    url = f"{self.base_url}/chat/completions"
    payload = {
        "model": kwargs.get("model", "gpt-3.5-turbo"),
        "messages": messages,
        **kwargs
    }
    if functions:
        payload["functions"] = functions
    if function_call:
        payload["function_call"] = function_call
        
    response = self.session.post(url, json=payload, headers=self.headers, timeout=30)
    response.raise_for_status()
    result = response.json()
    
    choice = result["choices"][0]
    message = choice["message"]
    
    # 检查模型是否想要调用函数
    if "function_call" in message:
        # 模型返回了函数调用请求
        func_name = message["function_call"]["name"]
        func_args_str = message["function_call"]["arguments"]
        try:
            func_args = json.loads(func_args_str)
        except json.JSONDecodeError:
            func_args = {}
        # 这里不执行函数,而是将信息返回给调用者
        return {
            "type": "function_call",
            "function_name": func_name,
            "arguments": func_args,
            "message": message # 包含原始消息,用于后续将函数执行结果传回模型
        }
    else:
        # 模型返回了普通文本回复
        return {
            "type": "text",
            "content": message["content"],
            "message": message
        }

使用模式通常是一个循环:

  1. 用户发送消息。
  2. 调用 chat_completion_with_functions ,传入历史消息和定义好的 functions 列表。
  3. 如果返回类型是 function_call ,则在你的代码中执行对应的真实函数。
  4. 将函数执行的结果作为一条新的消息( role: "function" , name: <函数名> , content: <结果JSON字符串> )添加到历史中。
  5. 再次调用API,让模型基于函数执行结果生成面向用户的回答。

这个模式能实现强大的自动化流程,比如让模型帮你查天气、发邮件、查询数据库等。

7. 总结与项目价值

回过头来看“ChristopheZhao/ChaGPT-API-Call”这类项目,它的价值远不止是几行封装代码。它降低了大语言模型的应用门槛,将开发者从繁琐的HTTP通信、错误处理和状态管理中解放出来。通过良好的抽象,它使得集成AI能力变得像调用一个本地函数一样简单。

在实际选型时,除了这个项目,你也可以考虑更成熟、功能更全的官方 openai Python库,或者社区维护的 langchain 等框架。但这个轻量级项目的优势在于 透明、可控、易于定制 。你能完全理解每一行代码在做什么,可以根据自己的业务需求轻松修改,例如增加特定的日志格式、集成内部的监控系统、实现自定义的缓存策略等。

我个人在几个快速原型项目中使用了类似的封装,最大的体会是: 前期花一点时间搭建好稳健的基础设施(客户端、上下文管理、错误处理),后期开发效率会成倍提升,并且线上问题的排查也会清晰很多 。尤其是在处理API限流和网络抖动时,一个具备智能重试的客户端就是应用的“稳定器”。

最后一个小技巧:无论使用哪个库,一定要为你的API调用添加详细的日志记录,至少包括请求时间、消耗的token数、响应时间和是否成功。这些数据对于监控成本、分析性能和排查问题至关重要。你可以很容易地在客户端类的 chat_completion 方法里加入日志记录逻辑,这是自定义封装带来的另一个便利。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐