MCP(模型上下文协议)保姆级教程实战篇(二)----MCP客户端接入DeepSeek V3在线模型
这篇文章,我将教大家如何在客户端接入DeepSeek V3在线模型进行对话。这套Client代码代码同样适用于接入其他LLM大模型。
一、安装相关依赖
为了支持调用OpenAI模型,以及在环境变量中读取API-KEY等信息,需要先安装相关依赖,执行代码如下:
uv add mcp openai python-dotenv

其中,openai库提供了与OpenAI服务交互的接口,例如访问GPT-3等语言模型。而python-dotenv这个包是一个Python库,用于从 .env 文件中加载环境变量。.env 文件通常用于存储配置信息和敏感信息,如API密钥、数据库连接字符串等。python-dotenv 允许你在Python脚本中通过 os.getenv() 或 dotenv 模块访问这些环境变量,从而实现配置信息的隔离和管理。
二、创建.env文件
.env 文件(通常称为环境变量文件)主要用于存储配置信息和环境变量。在应用程序中,可以使用各种库(如Python的 dotenv 库)来加载 .env 文件中的环境变量,并在代码中使用它们。我以阿里云上的DeepSeek V3模型为示例:

目前,各大厂商都有免费的LLM大模型可以使用,大家可以登录各大云厂商,免费获取API KEY,我的示例中用的是阿里云的,链接:https://bailian.console.aliyun.com/?tab=api#/api/?type=model&url=https%3A%2F%2Fhelp.aliyun.com%2Fdocument_detail%2F2868565.html
链接中都有示例代码,大家可以参考。
阿里云的API KEY的获取,链接:https://bailian.console.aliyun.com/?tab=api#/api
三、修改client.py的文件
新增接入DeepSeek V3模型时,修改client.py文件,第一版的client.py文件,可以参考我写的第一篇MCP实战的文章:MCP(模型上下文协议)保姆级教程实战篇(一)----MCP客户端搭建-CSDN博客
所有新增的代码处,我都进行了备注,供大家参考。
#!/usr/bin/env python3 # 指定 Python 解释器路径
# -*- coding: utf-8 -*- # 指定文件编码为 UTF-8
# 导入必要的模块
import asyncio # 导入异步 IO 模块,用于处理异步操作
import logging # 导入日志模块,用于记录程序运行状态
from typing import Optional # 导入类型提示模块中的 Optional 类型
import os # 导入操作系统模块
from dotenv import load_dotenv # 导入环境变量加载模块
import json # 导入 JSON 处理模块
import aiohttp # 导入异步 HTTP 客户端
# 加载环境变量
load_dotenv() # 从 .env 文件加载环境变量
# 获取环境变量
BASE_URL = os.getenv('BASE_URL') # 获取 API 基础 URL
MODEL = os.getenv('MODEL') # 获取模型名称
API_KEY = os.getenv('OPENAI_API_KEY') # 获取 API 密钥
class MCPClient: # 定义 MCP 客户端类
def __init__(self): # 初始化方法
"""初始化 MCP 客户端"""
self.connected = False # 初始化连接状态为未连接
self.session: Optional[aiohttp.ClientSession] = None # 初始化会话为 None
self.logger = logging.getLogger(__name__) # 创建日志记录器
self.model = MODEL # 存储模型名称
self.api_key = API_KEY # 存储 API 密钥
async def connect(self) -> bool: # 异步连接方法,返回布尔值表示连接是否成功
"""异步连接到 MCP 服务器
Returns:
bool: 连接是否成功
"""
try: # 尝试建立连接
# 创建会话
self.session = aiohttp.ClientSession(
headers={
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
)
self.connected = True # 设置连接状态为已连接
self.logger.info("已连接到服务器") # 记录连接成功信息
return True # 返回连接成功
except Exception as e: # 捕获连接过程中的异常
self.logger.error(f"连接服务器失败: {e}") # 记录连接失败信息
return False # 返回连接失败
async def disconnect(self): # 异步断开连接方法
"""断开与服务器的连接"""
if self.session: # 如果会话存在
await self.session.close() # 关闭会话
self.connected = False # 设置连接状态为未连接
self.logger.info("已断开与服务器的连接") # 记录断开连接信息
async def send_message(self, message: str): # 异步发送消息方法
"""发送消息到服务器
Args:
message: 要发送的消息
"""
if not self.connected or not self.session: # 如果未连接或会话不存在
self.logger.error("未连接到服务器") # 记录错误信息
return # 直接返回
try: # 尝试发送消息
# 构建消息数据
data = {
"model": self.model,
"messages": [{"role": "user", "content": message}]
}
# 发送请求
async with self.session.post(f"{BASE_URL}/chat/completions", json=data) as response:
if response.status == 200: # 如果请求成功
result = await response.json() # 解析响应
return result.get("choices", [{}])[0].get("message", {}).get("content") # 返回消息内容
else: # 如果请求失败
self.logger.error(f"请求失败: {response.status}") # 记录错误信息
return None # 返回 None
except Exception as e: # 捕获发送过程中的异常
self.logger.error(f"发送消息失败: {e}") # 记录发送失败信息
return None # 返回 None
async def chat_loop(self): # 异步聊天循环方法
"""运行交互式聊天循环"""
while self.connected: # 当连接状态为已连接时循环
try: # 尝试执行聊天循环
# 接收用户输入
message = await asyncio.get_event_loop().run_in_executor( # 在事件循环中运行阻塞的输入操作
None, input, "请输入消息 (输入 'quit' 退出): " # 使用 input 函数获取用户输入
)
if message.lower() == 'quit': # 如果用户输入 'quit'
break # 退出循环
# 发送消息并获取响应
response = await self.send_message(message) # 发送消息并获取响应
if response: # 如果接收到响应
print(f"服务器响应: {response}") # 打印服务器响应
except Exception as e: # 捕获聊天循环中的异常
self.logger.error(f"聊天循环出错: {e}") # 记录错误信息
break # 退出循环
async def cleanup(self): # 异步清理资源方法
"""清理资源"""
await self.disconnect() # 异步断开连接
self.logger.info("资源清理完成") # 记录资源清理完成信息
async def main(): # 异步主函数
"""主函数"""
# 配置日志
logging.basicConfig( # 配置日志记录
level=logging.INFO, # 设置日志级别为 INFO
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # 设置日志格式
)
# 新增:验证环境变量
if not all([BASE_URL, MODEL, API_KEY]):
logging.error("缺少必要的环境变量,请检查 .env 文件")
return
# 创建客户端实例
client = MCPClient() # 创建 MCP 客户端实例
try: # 尝试执行主要操作
# 连接到服务器
if await client.connect(): # 如果连接成功
# 运行聊天循环
await client.chat_loop() # 运行聊天循环
except KeyboardInterrupt: # 捕获键盘中断异常
logging.info("用户中断程序") # 记录用户中断信息
finally: # 无论是否发生异常,都执行清理操作
# 清理资源
await client.cleanup() # 清理资源
if __name__ == "__main__": # 如果直接运行此文件
# 运行主函数
asyncio.run(main()) # 运行异步主函数
以下是对client.py文件所做的修改及其作用:
- 添加 aiohttp 库
import aiohttp # 导入异步 HTTP 客户端
作用:引入异步 HTTP 客户端库,用于替代原始的 socket 连接,更好地处理 HTTP 请求和响应。
- 简化 MCPClient 类的初始化
def __init__(self): # 初始化方法
"""初始化 MCP 客户端"""
self.connected = False # 初始化连接状态为未连接
self.session: Optional[aiohttp.ClientSession] = None # 初始化会话为 None
self.logger = logging.getLogger(__name__) # 创建日志记录器
self.model = MODEL # 存储模型名称
self.api_key = API_KEY # 存储 API 密钥
作用:
- 移除了不必要的 host 和 port 参数,因为我们现在使用 HTTP 请求
- 将 reader 和 writer 替换为 aiohttp 的 ClientSession
- 直接使用环境变量中的 MODEL 和 API_KEY
- 修改连接方法
async def connect(self) -> bool:
try:
# 创建会话
self.session = aiohttp.ClientSession(
headers={
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
)
self.connected = True
self.logger.info("已连接到服务器")
return True
except Exception as e:
self.logger.error(f"连接服务器失败: {e}")
return False
作用:
- 使用 aiohttp 创建 HTTP 会话,而不是建立 socket 连接
- 在会话中设置认证头和内容类型
- 简化了连接逻辑
- 修改断开连接方法
async def disconnect(self):
if self.session:
await self.session.close()
self.connected = False
self.logger.info("已断开与服务器的连接")
作用:
- 关闭 aiohttp 会话,而不是关闭 socket 连接
- 简化了断开连接的逻辑
- 重写消息发送方法
async def send_message(self, message: str):
if not self.connected or not self.session:
self.logger.error("未连接到服务器")
return
try:
# 构建消息数据
data = {
"model": self.model,
"messages": [{"role": "user", "content": message}]
}
# 发送请求
async with self.session.post(f"{BASE_URL}/chat/completions", json=data) as response:
if response.status == 200:
result = await response.json()
return result.get("choices", [{}])[0].get("message", {}).get("content")
else:
self.logger.error(f"请求失败: {response.status}")
return None
except Exception as e:
self.logger.error(f"发送消息失败: {e}")
return None
作用:
- 使用 HTTP POST 请求发送消息,而不是通过 socket 发送
- 构建符合 OpenAI API 格式的请求数据
- 处理 HTTP 响应,提取消息内容
- 添加了错误处理和状态码检查
- 移除 receive_message 方法
作用:
- 将接收消息的逻辑整合到 send_message 方法中
- 简化了代码结构,减少了重复逻辑
- 修改聊天循环
async def chat_loop(self):
while self.connected:
try:
message = await asyncio.get_event_loop().run_in_executor(
None, input, "请输入消息 (输入 'quit' 退出): "
)
if message.lower() == 'quit':
break
# 发送消息并获取响应
response = await self.send_message(message)
if response:
print(f"服务器响应: {response}")
except Exception as e:
self.logger.error(f"聊天循环出错: {e}")
break
作用:
- 简化了聊天循环,直接使用 send_message 的返回值作为响应
- 修复了 this 关键字的使用,改为正确的 self
- 修复 this 关键字的使用
作用:
- 将所有 this 关键字替换为 self,这是 Python 中正确的实例引用方式
- 修复了导致 NameError 错误的代码
- 总结
这些修改使代码更加符合现代 Python 异步编程实践,特别是:
1. 使用 aiohttp 库进行 HTTP 通信,更适合与 Web API 交互
2. 简化了代码结构,减少了重复逻辑
3. 改进了错误处理和日志记录
4. 修复了 this 关键字的使用错误
5. 使代码更加符合 OpenAI API 的调用方式
这些修改使客户端能够正确地与 OpenAI API 进行交互,发送消息并接收响应。
更多推荐


所有评论(0)