山东大学项目实训(2) AI响应服务器的搭建

往期文章

山东大学项目实训(1) DeepSeek API调用

1. AI服务器的作用

AI服务器就是下与接受deepseek模型交互, 获取AI回答后返回到应用中的中间服务器. 其功能如下

服务器功能

其实从上图看出AI服务器是一个很"鸡肋"的功能, 貌似后端同学可以直接调用deepseek模型, 但是这样的情形不利于我们分工开发和我进行模型的研究, 故我暂时实现了一个AI服务器, 其在后期可能融入后端当中, 详见下部分架构的介绍.

2. 我们的项目架构

我提出了1, 2两种架构, 加上我们组员提出的一种架构共三种

项目架构

这三种架构各有优劣, 我列举如下

  • 方案1
    • 几个优点: 开发各自独立, 很适合我们这样的小组作业, 且可扩展性高, 代码耦合度低. 适合跨语言作业, 像我们组后端使用java且AI服务使用python的情况下也可简单地使用http等通用协议调用.
    • 缺点: 后期问题追踪不易, 且显而易见的效率低, 但实则不然, 其效率在以下几种情形有优势:
      1. 后端和服务器在一台服务器上运行, 此时二者间通信耗时很小, 当然速度还是不如方案2, 但初步考虑我们的用户需求应该够用.
      2. 后端性能积压, 比如要处理搜题等复杂任务, 这时将AI服务器分离到别的服务器上可以减缓后端性能压力.
      3. AI模型本地部署, 当我们将AI部署到本地时, AI服务器可直接承担AI模型的运作. 相当于架构方案图中的本地部署部分.
  • 方案2
    • 优点:效率较高.
    • 缺点:代码耦合度高, 后续扩展困难. 在本地部署时需要后端自行处理AI的输出.
  • 方案3
    • 优点:集成了方案1的架构优势和方案2的高效率
    • 缺点:前端工作量大, 且需要在数据统一处理和AI服务器不存数据间做出选择.

由于我们现在的需求较简单, 只在多用户对话的上下文处理间有些许疑问, 故三种架构均可, 我们暂定方案1. 因此下文中我都假设是后端调用AI服务器.

大模型的所有调用参数基本都可以在对话内容中给出, 其强大的语言处理能力使得用户个性化信息的传递非常容易. 例如:用户在数据结构领域答题正确率是70%, 我们只需要把这个信息告诉deepseek即可, 无需其它处理. 因此这使得AI服务器与后端的交互很简单. 换句话说, 这印证了AI服务器很"鸡肋" .

3. AI服务器编写

AI服务器问答交互

我们的大语言模型需求先定为智能问答, 智能答题和智能出题. 由于大语言模型的API参数较少, 因此我们只需设置不同的temperature和引导词即可. 引导词例如: {"role":"system", "content":"你在为用户出题, 请按照'<题目>题目内容</题目><答案>答案内容</答案>'的格式出题"} 引导词的设置可由后端或服务器端完成. 返回即为一条json对话或者流数据或者纯粹是AI回答的文本, 内容为AI的回答.

多轮对话处理

我们目前设定一个用户可以有若干个对话, 每个对话都有其唯一ID, 因此AI服务器只关注对话ID而不关注用户, 对于用户信息后端许通过参数传递或者直接放在与AI的对话中, 例如"用户在数据结构领域历史正确率为70%, 请据此出一道题目, 使其符合并能提高用户水平" . 这种个性化工作可由后端或者AI服务器完成, 我们组在后期合并阶段可由双方同学合作完成.

用户对话信息存储是个大问题. 我们暂时决定将对话记录存到用户本地, 还有一种可选方案是存到后端服务器上的数据库中.总而言之, 其大概率不会存在AI服务器本地. 因此, 我们需要在对话开始时将内容加载进AI服务器中. 考虑到服务器关断后重启等情况, 我选择处理以下两种情况:

  1. 后端调用初始化函数, 在AI服务器缓存中注册对话ID, 并将后端给的历史记录存入服务器缓存中.
  2. 后端进行对话请求, 这时缓存中若没有此对话的ID记录, 则新建一个 ,若有, 则读取历史对话记录.

代码编写

类的初始化, 其中save函数负责将内存中历史记录传回后端, 考虑到前/后端自主维护用户对话记录的设计, 一般用不到.

class DeepSeekChatServer:
    def __init__(self):
        #flask框架
        self.app = Flask(__name__)
        CORS(self.app)
        #对话内容内存,重要
        self.conversations = {}  # {conversation_id: [messages]}
        #api调用相关
        #一般人用deepseek官网,这是我们学校的api
        #记得把/v1/chat/completions部分加上
        #openai的库会自动给你添加而你自己写的request不会
        self.api_base = "http://学校给的api/v1/chat/completions"
        self.api_key = "sk-差点忘了删" #填写你自己的key
        #model类型也按官网来,这个是自己api上的类型
        self.AI_model="DeepSeek-R1"
        # 注册路由
        self.app.add_url_rule('/chat', 'chat_endpoint', self.chat_endpoint, methods=['POST'])
        self.app.add_url_rule('/save', 'save_conversation', self.save_conversation, methods=['POST'])
        self.app.add_url_rule('/load', 'load_conversation', self.load_conversation, methods=['POST'])

读取历史记录与初始化对话记录

    def get_or_create_conversation(self, conversation_id=None):
        cid = conversation_id or str(uuid.uuid4())
        if cid not in self.conversations:
            self.conversations[cid] = []
        return cid
    def init_conversation(
        self,
        conversation_id: Optional[str] = None,
        init_messages: Optional[List[Dict]] = None
    ) -> str:
        """初始化或重置对话,严格校验消息格式"""
        cid = conversation_id or str(uuid.uuid4())
        
        # 校验初始化消息
        if init_messages is not None:
            if not isinstance(init_messages, list):
                raise ValueError("初始化消息必须是消息列表")
            if not all(self._validate_message(msg) for msg in init_messages):
                raise ValueError("消息格式不合法")
        
        self.conversations[cid] = init_messages.copy() if init_messages else []
        return cid

传输信息合法性检验,如果是信任的后端传来的,建议不使用以提高效率.

    def _validate_message(self, message: Dict) -> bool:
        required_keys = {"role", "content"}
        return (
            isinstance(message, dict)
            and all(key in message for key in required_keys)
            and message["role"] in ("user", "assistant", "system")
            and isinstance(message["content"], str)
            and len(message["content"].strip()) > 0
        )

主要的请求处理部分, 支持流式和非流式输出, 让deepseek给我加了很多错误处理部分. 注意data, header等部分其实可以写到类的属性里以提高效率, 不用每次都在代码里生成, 但是我们的项目还没固定, 先这么写着.

    def _build_headers(self):
        return {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
            "Accept": "application/json"
        }
    def generate_response(self, conversation_id, stream=False):
        messages = self.conversations.get(conversation_id, [])
        
        data = {
            "model": self.AI_model,
            "messages": messages,
            "temperature": 0.7,
            "stream": stream
        }

        response = requests.post(
            self.api_base,
            headers=self._build_headers(),
            json=data,
            stream=stream
        )

        if response.status_code != 200:
            return jsonify({"error": f"API Error: {response.text}"}), 500

        if stream:
            def generate():
                full_content = ""
                for line in response.iter_lines():
                    if not line:
                        continue
                    
                    decoded_line = line.decode('utf-8')
                    if not decoded_line.startswith("data: "):
                        continue
                    
                    chunk = json.loads(decoded_line[6:])
                    if chunk=="[DONE]" :
                        break
                    
                    try:
                        if "content" in chunk["choices"][0]["delta"]:
                            delta = chunk["choices"][0]["delta"]["content"]
                            full_content += delta
                            yield delta
                    except json.JSONDecodeError:
                        yield "[数据解析错误]"
                self.conversations[conversation_id].append({
                    "role": "assistant",
                    "content": full_content
                })
            return Response(generate(), mimetype='text/event-stream')
        else:
            content = response.json()["choices"][0]["message"]["content"]
            self.conversations[conversation_id].append({
                "role": "assistant",
                "content": content
            })
            return jsonify({
                "conversation_id": conversation_id,
                "response": content
            })

向外暴露的接口与运行, 也让deepseek添加了很多错误处理.

    def chat_endpoint(self):
        data = request.json
        try:
            # 校验基础参数
            if 'message' not in data or not data['message'].strip():
                return jsonify({"error": "消息内容不能为空"}), 400
            # 处理对话ID
            cid = self.get_or_create_conversation(data.get('conversation_id'))
            # 构造消息
            new_message = {
                "role": "user",
                "content": data['message'].strip()
            }
            if not self._validate_message(new_message):
                return jsonify({"error": "无效的消息结构"}), 400
            # 添加消息
            self.conversations[cid].append(new_message)

            return self.generate_response(cid, data.get('stream', False))
            
        except KeyError:
            return jsonify({"error": "请求格式错误"}), 400

    def save_conversation(self):
        data = request.json
        cid = data['conversation_id']
        return jsonify({
            "conversation_id": cid,
            "messages": self.conversations.get(cid, [])
        })

    def load_conversation(self):
        data = request.json
        try:
            cid = self.init_conversation(
                data.get('conversation_id'),
                data['messages']
            )
            return jsonify({"conversation_id": cid}), 200
        except KeyError:
            return jsonify({"error": "缺少必要参数 messages"}), 400
        except ValueError as e:
            return jsonify({"error": str(e)}), 400

    def run(self, host='127.0.0.1', port=5000, debug=True):
        self.app.run(host=host, port=port, debug=debug)

运行代码

if __name__ == '__main__':
    server = DeepSeekChatServer()
    server.run()

完整代码

import os
from typing import Dict, List, Optional
import uuid
import json
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
import requests

class DeepSeekChatServer:
    def __init__(self):
        #flask框架
        self.app = Flask(__name__)
        CORS(self.app)
        #对话内容内存,重要
        self.conversations = {}  # {conversation_id: [messages]}
        #api调用相关
        #一般人用deepseek官网,这是我们学校的api
        #记得把/v1/chat/completions部分加上
        #openai的库会自动给你添加而你自己写的request不会
        self.api_base = "http://某个api:某个端口/v1/chat/completions"
        self.api_key = "sk-又差点忘了删"
        self.AI_model="DeepSeek-R1"
        # 注册路由
        self.app.add_url_rule('/chat', 'chat_endpoint', self.chat_endpoint, methods=['POST'])
        self.app.add_url_rule('/save', 'save_conversation', self.save_conversation, methods=['POST'])
        self.app.add_url_rule('/load', 'load_conversation', self.load_conversation, methods=['POST'])

    def get_or_create_conversation(self, conversation_id=None):
        cid = conversation_id or str(uuid.uuid4())
        if cid not in self.conversations:
            self.conversations[cid] = []
        return cid
    def init_conversation(
        self,
        conversation_id: Optional[str] = None,
        init_messages: Optional[List[Dict]] = None
    ) -> str:
        """初始化或重置对话,严格校验消息格式"""
        cid = conversation_id or str(uuid.uuid4())
        
        # 校验初始化消息
        if init_messages is not None:
            if not isinstance(init_messages, list):
                raise ValueError("初始化消息必须是消息列表")
            if not all(self._validate_message(msg) for msg in init_messages):
                raise ValueError("消息格式不合法")
        
        self.conversations[cid] = init_messages.copy() if init_messages else []
        return cid
    
    
    def _validate_message(self, message: Dict) -> bool:
        required_keys = {"role", "content"}
        return (
            isinstance(message, dict)
            and all(key in message for key in required_keys)
            and message["role"] in ("user", "assistant", "system")
            and isinstance(message["content"], str)
            and len(message["content"].strip()) > 0
        )
    def _build_headers(self):
        return {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
            "Accept": "application/json"
        }
    def generate_response(self, conversation_id, stream=False):
        messages = self.conversations.get(conversation_id, [])
        
        data = {
            "model": self.AI_model,
            "messages": messages,
            "temperature": 0.7,
            "stream": stream
        }

        response = requests.post(
            self.api_base,
            headers=self._build_headers(),
            json=data,
            stream=stream
        )

        if response.status_code != 200:
            return jsonify({"error": f"API Error: {response.text}"}), 500

        if stream:
            def generate():
                full_content = ""
                for line in response.iter_lines():
                    if not line:
                        continue
                    
                    decoded_line = line.decode('utf-8')
                    if not decoded_line.startswith("data: "):
                        continue
                    
                    chunk = json.loads(decoded_line[6:])
                    if chunk=="[DONE]" :
                        break
                    
                    try:
                        if "content" in chunk["choices"][0]["delta"]:
                            delta = chunk["choices"][0]["delta"]["content"]
                            full_content += delta
                            yield delta
                    except json.JSONDecodeError:
                        yield "[数据解析错误]"
                self.conversations[conversation_id].append({
                    "role": "assistant",
                    "content": full_content
                })
            return Response(generate(), mimetype='text/event-stream')
        else:
            content = response.json()["choices"][0]["message"]["content"]
            self.conversations[conversation_id].append({
                "role": "assistant",
                "content": content
            })
            return jsonify({
                "conversation_id": conversation_id,
                "response": content
            })

    def chat_endpoint(self):
        data = request.json
        try:
            # 校验基础参数
            if 'message' not in data or not data['message'].strip():
                return jsonify({"error": "消息内容不能为空"}), 400
            # 处理对话ID
            cid = self.get_or_create_conversation(data.get('conversation_id'))
            # 构造消息
            new_message = {
                "role": "user",
                "content": data['message'].strip()
            }
            if not self._validate_message(new_message):
                return jsonify({"error": "无效的消息结构"}), 400
            # 添加消息
            self.conversations[cid].append(new_message)

            return self.generate_response(cid, data.get('stream', False))
            
        except KeyError:
            return jsonify({"error": "请求格式错误"}), 400

    def save_conversation(self):
        data = request.json
        cid = data['conversation_id']
        return jsonify({
            "conversation_id": cid,
            "messages": self.conversations.get(cid, [])
        })

    def load_conversation(self):
        data = request.json
        try:
            cid = self.init_conversation(
                data.get('conversation_id'),
                data['messages']
            )
            return jsonify({"conversation_id": cid}), 200
        except KeyError:
            return jsonify({"error": "缺少必要参数 messages"}), 400
        except ValueError as e:
            return jsonify({"error": str(e)}), 400

    def run(self, host='127.0.0.1', port=5000, debug=True):
        self.app.run(host=host, port=port, debug=debug)

if __name__ == '__main__':
    server = DeepSeekChatServer()
    server.run()
Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐