这篇文章,我将教大家如何在客户端接入DeepSeek V3在线模型进行对话。这套Client代码代码同样适用于接入其他LLM大模型。

一、安装相关依赖

为了支持调用OpenAI模型,以及在环境变量中读取API-KEY等信息,需要先安装相关依赖,执行代码如下:

uv add mcp openai python-dotenv

其中,openai库提供了与OpenAI服务交互的接口,例如访问GPT-3等语言模型。而python-dotenv这个包是一个Python库,用于从 .env 文件中加载环境变量。.env 文件通常用于存储配置信息和敏感信息,如API密钥、数据库连接字符串等。python-dotenv 允许你在Python脚本中通过 os.getenv()dotenv 模块访问这些环境变量,从而实现配置信息的隔离和管理。

二、创建.env文件

.env 文件(通常称为环境变量文件)主要用于存储配置信息和环境变量。在应用程序中,可以使用各种库(如Python的 dotenv 库)来加载 .env 文件中的环境变量,并在代码中使用它们。我以阿里云上的DeepSeek V3模型为示例:

目前,各大厂商都有免费的LLM大模型可以使用,大家可以登录各大云厂商,免费获取API KEY,我的示例中用的是阿里云的,链接:https://bailian.console.aliyun.com/?tab=api#/api/?type=model&url=https%3A%2F%2Fhelp.aliyun.com%2Fdocument_detail%2F2868565.html

链接中都有示例代码,大家可以参考。

阿里云的API KEY的获取,链接:https://bailian.console.aliyun.com/?tab=api#/api

三、修改client.py的文件

新增接入DeepSeek V3模型时,修改client.py文件,第一版的client.py文件,可以参考我写的第一篇MCP实战的文章:MCP(模型上下文协议)保姆级教程实战篇(一)----MCP客户端搭建-CSDN博客

所有新增的代码处,我都进行了备注,供大家参考。

#!/usr/bin/env python3  # 指定 Python 解释器路径
# -*- coding: utf-8 -*-  # 指定文件编码为 UTF-8

# 导入必要的模块
import asyncio  # 导入异步 IO 模块,用于处理异步操作
import logging  # 导入日志模块,用于记录程序运行状态
from typing import Optional  # 导入类型提示模块中的 Optional 类型
import os  # 导入操作系统模块
from dotenv import load_dotenv  # 导入环境变量加载模块
import json  # 导入 JSON 处理模块
import aiohttp  # 导入异步 HTTP 客户端

# 加载环境变量
load_dotenv()  # 从 .env 文件加载环境变量

# 获取环境变量
BASE_URL = os.getenv('BASE_URL')  # 获取 API 基础 URL
MODEL = os.getenv('MODEL')  # 获取模型名称
API_KEY = os.getenv('OPENAI_API_KEY')  # 获取 API 密钥

class MCPClient:  # 定义 MCP 客户端类
    def __init__(self):  # 初始化方法
        """初始化 MCP 客户端"""
        self.connected = False  # 初始化连接状态为未连接
        self.session: Optional[aiohttp.ClientSession] = None  # 初始化会话为 None
        self.logger = logging.getLogger(__name__)  # 创建日志记录器
        self.model = MODEL  # 存储模型名称
        self.api_key = API_KEY  # 存储 API 密钥
        
    async def connect(self) -> bool:  # 异步连接方法,返回布尔值表示连接是否成功
        """异步连接到 MCP 服务器
        
        Returns:
            bool: 连接是否成功
        """
        try:  # 尝试建立连接
            # 创建会话
            self.session = aiohttp.ClientSession(
                headers={
                    'Authorization': f'Bearer {self.api_key}',
                    'Content-Type': 'application/json'
                }
            )
            self.connected = True  # 设置连接状态为已连接
            self.logger.info("已连接到服务器")  # 记录连接成功信息
            return True  # 返回连接成功
        except Exception as e:  # 捕获连接过程中的异常
            self.logger.error(f"连接服务器失败: {e}")  # 记录连接失败信息
            return False  # 返回连接失败
            
    async def disconnect(self):  # 异步断开连接方法
        """断开与服务器的连接"""
        if self.session:  # 如果会话存在
            await self.session.close()  # 关闭会话
            self.connected = False  # 设置连接状态为未连接
            self.logger.info("已断开与服务器的连接")  # 记录断开连接信息
            
    async def send_message(self, message: str):  # 异步发送消息方法
        """发送消息到服务器
        
        Args:
            message: 要发送的消息
        """
        if not self.connected or not self.session:  # 如果未连接或会话不存在
            self.logger.error("未连接到服务器")  # 记录错误信息
            return  # 直接返回
            
        try:  # 尝试发送消息
            # 构建消息数据
            data = {
                "model": self.model,
                "messages": [{"role": "user", "content": message}]
            }
            # 发送请求
            async with self.session.post(f"{BASE_URL}/chat/completions", json=data) as response:
                if response.status == 200:  # 如果请求成功
                    result = await response.json()  # 解析响应
                    return result.get("choices", [{}])[0].get("message", {}).get("content")  # 返回消息内容
                else:  # 如果请求失败
                    self.logger.error(f"请求失败: {response.status}")  # 记录错误信息
                    return None  # 返回 None
        except Exception as e:  # 捕获发送过程中的异常
            self.logger.error(f"发送消息失败: {e}")  # 记录发送失败信息
            return None  # 返回 None
            
    async def chat_loop(self):  # 异步聊天循环方法
        """运行交互式聊天循环"""
        while self.connected:  # 当连接状态为已连接时循环
            try:  # 尝试执行聊天循环
                # 接收用户输入
                message = await asyncio.get_event_loop().run_in_executor(  # 在事件循环中运行阻塞的输入操作
                    None, input, "请输入消息 (输入 'quit' 退出): "  # 使用 input 函数获取用户输入
                )
                
                if message.lower() == 'quit':  # 如果用户输入 'quit'
                    break  # 退出循环
                    
                # 发送消息并获取响应
                response = await self.send_message(message)  # 发送消息并获取响应
                if response:  # 如果接收到响应
                    print(f"服务器响应: {response}")  # 打印服务器响应
                    
            except Exception as e:  # 捕获聊天循环中的异常
                self.logger.error(f"聊天循环出错: {e}")  # 记录错误信息
                break  # 退出循环
                
    async def cleanup(self):  # 异步清理资源方法
        """清理资源"""
        await self.disconnect()  # 异步断开连接
        self.logger.info("资源清理完成")  # 记录资源清理完成信息

async def main():  # 异步主函数
    """主函数"""
    # 配置日志
    logging.basicConfig(  # 配置日志记录
        level=logging.INFO,  # 设置日志级别为 INFO
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'  # 设置日志格式
    )
    
    # 新增:验证环境变量
    if not all([BASE_URL, MODEL, API_KEY]):
        logging.error("缺少必要的环境变量,请检查 .env 文件")
        return
    
    # 创建客户端实例
    client = MCPClient()  # 创建 MCP 客户端实例
    
    try:  # 尝试执行主要操作
        # 连接到服务器
        if await client.connect():  # 如果连接成功
            # 运行聊天循环
            await client.chat_loop()  # 运行聊天循环
    except KeyboardInterrupt:  # 捕获键盘中断异常
        logging.info("用户中断程序")  # 记录用户中断信息
    finally:  # 无论是否发生异常,都执行清理操作
        # 清理资源
        await client.cleanup()  # 清理资源

if __name__ == "__main__":  # 如果直接运行此文件
    # 运行主函数
    asyncio.run(main())  # 运行异步主函数 

以下是对client.py文件所做的修改及其作用:

  • 添加 aiohttp 库
import aiohttp  # 导入异步 HTTP 客户端

作用:引入异步 HTTP 客户端库,用于替代原始的 socket 连接,更好地处理 HTTP 请求和响应。

  • 简化 MCPClient 类的初始化
   def __init__(self):  # 初始化方法
       """初始化 MCP 客户端"""
       self.connected = False  # 初始化连接状态为未连接
       self.session: Optional[aiohttp.ClientSession] = None  # 初始化会话为 None
       self.logger = logging.getLogger(__name__)  # 创建日志记录器
       self.model = MODEL  # 存储模型名称
       self.api_key = API_KEY  # 存储 API 密钥

作用:
   - 移除了不必要的 host 和 port 参数,因为我们现在使用 HTTP 请求
   - 将 reader 和 writer 替换为 aiohttp 的 ClientSession
   - 直接使用环境变量中的 MODEL 和 API_KEY

  • 修改连接方法
   async def connect(self) -> bool:
       try:
           # 创建会话
           self.session = aiohttp.ClientSession(
               headers={
                   'Authorization': f'Bearer {self.api_key}',
                   'Content-Type': 'application/json'
               }
           )
           self.connected = True
           self.logger.info("已连接到服务器")
           return True
       except Exception as e:
           self.logger.error(f"连接服务器失败: {e}")
           return False

作用:
   - 使用 aiohttp 创建 HTTP 会话,而不是建立 socket 连接
   - 在会话中设置认证头和内容类型
   - 简化了连接逻辑

  • 修改断开连接方法
   async def disconnect(self):
       if self.session:
           await self.session.close()
           self.connected = False
           self.logger.info("已断开与服务器的连接")

作用:
   - 关闭 aiohttp 会话,而不是关闭 socket 连接
   - 简化了断开连接的逻辑

  • 重写消息发送方法
  async def send_message(self, message: str):
       if not self.connected or not self.session:
           self.logger.error("未连接到服务器")
           return
           
       try:
           # 构建消息数据
           data = {
               "model": self.model,
               "messages": [{"role": "user", "content": message}]
           }
           # 发送请求
           async with self.session.post(f"{BASE_URL}/chat/completions", json=data) as response:
               if response.status == 200:
                   result = await response.json()
                   return result.get("choices", [{}])[0].get("message", {}).get("content")
               else:
                   self.logger.error(f"请求失败: {response.status}")
                   return None
       except Exception as e:
           self.logger.error(f"发送消息失败: {e}")
           return None

作用:
   - 使用 HTTP POST 请求发送消息,而不是通过 socket 发送
   - 构建符合 OpenAI API 格式的请求数据
   - 处理 HTTP 响应,提取消息内容
   - 添加了错误处理和状态码检查

  • 移除 receive_message 方法

   作用:
   - 将接收消息的逻辑整合到 send_message 方法中
   - 简化了代码结构,减少了重复逻辑

  • 修改聊天循环

   async def chat_loop(self):
       while self.connected:
           try:
               message = await asyncio.get_event_loop().run_in_executor(
                   None, input, "请输入消息 (输入 'quit' 退出): "
               )
               
               if message.lower() == 'quit':
                   break
                   
               # 发送消息并获取响应
               response = await self.send_message(message)
               if response:
                   print(f"服务器响应: {response}")
                   
           except Exception as e:
               self.logger.error(f"聊天循环出错: {e}")
               break

作用:
   - 简化了聊天循环,直接使用 send_message 的返回值作为响应
   - 修复了 this 关键字的使用,改为正确的 self

  • 修复 this 关键字的使用

   作用:
   - 将所有 this 关键字替换为 self,这是 Python 中正确的实例引用方式
   - 修复了导致 NameError 错误的代码

  • 总结

这些修改使代码更加符合现代 Python 异步编程实践,特别是:

1. 使用 aiohttp 库进行 HTTP 通信,更适合与 Web API 交互
2. 简化了代码结构,减少了重复逻辑
3. 改进了错误处理和日志记录
4. 修复了 this 关键字的使用错误
5. 使代码更加符合 OpenAI API 的调用方式

这些修改使客户端能够正确地与 OpenAI API 进行交互,发送消息并接收响应。
 

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐