创新实训(三)——基于 FastAPI 与 WebSocket 实现 DeepSeek API 大模型接入及实时输出显示
本周开始攻克项目的核心——DeepSeek的接入与使用,我们项目采用了直接接入DeepSeek API,之后会通过RAG技术来使模型更加完善,减轻大模型幻觉问题,使大模型生成的内容更加符合用户的需要。
本周开始攻克项目的核心——DeepSeek的接入与使用,我们项目采用了直接接入DeepSeek API,之后会通过RAG技术来使模型更加完善,减轻大模型幻觉问题,使大模型生成的内容更加符合用户的需要。
目录
一、DeepSeek API的接入
DeepSeek 是一个国产先进的大语言模型,能够处理自然语言理解、文本生成等多种任务。通过调用 DeepSeek API,我们可以利用其强大的语言处理能力,为我们的智能辩论教练项目赋予智能交互的功能。
1.依赖安装
由于提供的接口部署在oneAPI平台,查看该平台的Github仓库的readme文件可知,接口可以通过标准的OpenAI API格式进行访问,因此在PyCharm中安装openapi包。
pip install openapi
2.DeepSeek API接入
首先 ,创建一个 OpenAI 客户端实例。base_url 参数指定了服务的基础 URL,这里指向的是本地的 DeepSeek 服务端点 http://10.2.8.77:3000/v1;api_key 是用于身份验证的密钥,用于确保只有授权用户可以访问服务。
随后,向模型发起对话请求并获取回复,设置其中的参数model为DeepSeek-R1,message是一个包含多个消息对象的列表,用于定义对话的上下文,其中system角色的消息通常用于给模型设定一些背景信息或任务要求,而user 角色的消息包含了用户输入的、实际要点评的辩论内容。将stream设置为True,意味着大模型给出的回答将采用流式输出,而不是等回答生成完再统一输出。
import time
from openai import OpenAI
client = OpenAI(base_url="http://10.2.8.77:3000/v1", api_key="your_api_key")
response = client.chat.completions.create(
model="DeepSeek-R1",
messages=[
{"role": "system", "content": "你是一个辩论教练,请为user表述的内容进行全方位智能点评"},
{"role": "user", "content": "同时我方还是要指出的是:就时间和理论两者的内在关系而言,如果在大学阶段以时间 锻炼为重,理论学习则是水到渠成,必然会有所提高,但是如果更加注重理论而轻视实践锻炼,理论呢学习必然会下降,因此,我们看到,如果能够更加注重实践锻炼,就能实现实践锻炼与理论学习比翼双飞。"}
],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end='', flush=True)
time.sleep(0.1)
测试DeepSeek接入情况,运行该方法后,得到了大模型的回答。
3.在项目中的应用
将上述代码中的message中user对应的content换成从前端传入的prompt,即可针对用户输入的信息调用大模型,生成回答。由于实现过程中用到websocket通信技术,具体细节在下面展开讲解。
二、WebSocket连接实现
1.为什么使用WebSocket进行连接?
在大模型应用中,由于大模型生成内容需要较长时间,传统的 HTTP 请求 - 响应模式下,客户端必须等待整个响应生成完成后才能获取结果,这会造成明显的延迟,给用户带来不佳的体验。
而 WebSocket 支持全双工通信,服务器能够在大模型生成内容的过程中,当后端调用 DeepSeek 模型得到点评内容的一个数据块时,能够立即将其发送给前端,前端可以马上显示出来,让用户几乎实时看到点评的进展,就像与真人对话一样流畅,大大提升了交互的即时性和真实感。
WebSocket是一种协议,用于在Web应用程序和服务器之间建立实时、双向的通信连接。它通过一个单一的TCP连接提供了持久化连接,这使得Web应用程序可以更加实时地传递数据。WebSocket协议最初由W3C开发,并于2011年成为标准。
WebSocket的优势包括:
双向通信: WebSocket协议支持双向通信,这意味着服务器可以主动向客户端发送数据,而不需要客户端发送请求。
实时性: WebSocket的持久化连接,可以实现实时的数据传输,避免了Web应用程序需要不断地发送请求以获取最新数据的情况。
减轻网络负载:WebSocket的持久化连接,减少了HTTP请求的数量,从而有效减轻了网络负载。
2.WebSocket连接的后端处理
首先,基于上面测试接入DeepSeek API的代码,将其改造成生成回答的方法,可以根据用户输入的prompt生成对应的回答。
利用@router.websocket("/stream")定义一个 WebSocket 路由,路径为/stream,在该方法中接受客户端的 WebSocket 连接,建立连接并利用while循环持续接收客户端发送的消息。
await websocket.send_json({ "type": "start", "content": "" })意味着向客户端发送开始信号,表示大模型回答过程开始,随后异步遍历llm_streamer生成器,获取流式点评内容,将每个点评内容块以 JSON 格式发送给客户端。向客户端发送结束信号,表示点评过程结束,最后关闭WebSocket连接。
from fastapi import APIRouter, WebSocket
from typing import Generator
import asyncio
import json
import time
from openai import OpenAI
router = APIRouter()
client = OpenAI(base_url="http://10.2.8.77:3000/v1", api_key="your_api_key")
async def llm_streamer(prompt: str) -> Generator[str, None, None]:
response = client.chat.completions.create(
model="DeepSeek-R1",
messages=[
{"role": "system", "content": "你是一个辩论教练,请为user表述的内容进行全方位智能点"},
{"role": "user",
"content": prompt}
],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
await asyncio.sleep(0.1)
@router.websocket("/stream")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
# 发送开始信号
await websocket.send_json({
"type": "start",
"content": ""
})
# 流式生成响应
async for chunk in llm_streamer(message["message"]):
await websocket.send_json({
"type": "stream",
"content": chunk
})
# 发送结束信号
await websocket.send_json({
"type": "end",
"content": ""
})
except Exception as e:
await websocket.send_json({
"type": "error",
"content": f"处理错误: {str(e)}"
})
finally:
await websocket.close()
3.前端WebSocket连接
首先,创建一个新的 WebSocket 实例,连接到指定的服务器地址ws://localhost:8000/api/chat/stream。ws:// 是 WebSocket 协议的前缀,localhost:8000 表示服务器的主机和端口,/api/chat/stream 是具体的 WebSocket 端点路径。
当接收到的消息类型为 'start' 时,表示服务器开始生成响应内容。 向 messages.value 数组中添加一个新的消息对象,该对象表示助手(role: 'assistant')的回复,初始内容为空,chunks 数组用于存储流式传输的内容块,loading 标志设置为 true 表示正在加载。
当接收到的消息类型为 'stream' 且最后一个消息正在加载时,将当前消息内容data.content添加到 lastMessage.chunks 数组中。 使用 join('') 方法将 chunks 数组中的所有内容块拼接成完整的内容,并更新 lastMessage.content。 调用 scrollToBottom 函数,确保聊天界面滚动到最新消息的位置,方便用户查看。
当接收到的消息类型为 'end' 时,表示服务器已经完成响应内容的生成。 将最后一个消息的 loading 标志设置为 false,表示加载完成,并将 completed 标志设置为 true。 将 isLoading.value 设置为 false,表示整个加载过程结束。
// 初始化WebSocket连接
const initWebSocket = () => {
socket = new WebSocket(`ws://localhost:8000/api/chat/stream`)
socket.onmessage = (event) => {
const data: WebSocketMessage = JSON.parse(event.data)
// 查找当前进行中的消息
const lastMessage = messages.value[messages.value.length - 1]
if (data.type === 'start') {
messages.value.push({
role: 'assistant',
content: '',
chunks: [],
loading: true
})
}
else if (data.type === 'stream' && lastMessage?.loading) {
lastMessage.chunks.push(data.content)
lastMessage.content = lastMessage.chunks.join('')
scrollToBottom()
}
else if (data.type === 'end') {
if (lastMessage) {
lastMessage.loading = false
lastMessage.completed = true
}
isLoading.value = false
}
else if (data.type === 'error') {
console.error('Error:', data.content)
messages.value.push({
role: 'assistant',
content: `错误: ${data.content}`,
chunks: [],
completed: true
})
isLoading.value = false
}
}
socket.onclose = () => {
console.log('WebSocket连接关闭')
isLoading.value = false
}
}
启动前端应用,发现请求中的状态码为101 Switching Protocols,这表明客户端和服务器成功从 HTTP 协议切换到了 WebSocket 协议,意味着连接成功建立。
在简易搭建的前端页面发送信息,DeepSeek可以以较快的速度进行响应,并流式输出回答生成的内容(目前还存在DeepSeek推理过程的显示问题以及格式调整问题)
更多推荐
所有评论(0)