通义千问1.5-1.8B-Chat-GPTQ-Int4项目实战:微信小程序智能客服后端开发

最近在做一个微信小程序项目,客户想在里面加个智能客服,能自动回答一些常见问题,比如产品咨询、订单状态查询这些。一开始想用现成的云服务,但考虑到数据隐私和长期成本,还是决定自己搭一个。正好手头有台带GPU的服务器,就琢磨着把通义千问这个轻量级模型用起来。

通义千问1.5-1.8B-Chat这个版本,经过GPTQ-Int4量化后,模型体积小,推理速度也快,对硬件要求不高,特别适合部署成API服务。今天我就把整个从模型服务封装到微信小程序集成的过程,以及中间遇到的一些坑和解决方案,跟大家详细聊聊。如果你也想在小程序里加个AI客服,或者想了解怎么把大模型变成可调用的后端服务,这篇应该能给你一些参考。

1. 项目整体思路与准备工作

做这个事,核心目标就一个:让微信小程序能像调用普通接口一样,和部署在自家服务器上的通义千问模型对话。听起来简单,但拆开来看,得搞定好几件事。

首先,模型得能跑起来,并且提供一个标准的HTTP接口。其次,微信小程序那边要能安全、稳定地调用这个接口。最后,对话不能是“一问一答”就忘,得有点记忆力,能联系上下文,这才像个客服。

我用的硬件是一台Ubuntu 20.04的服务器,显卡是RTX 3060(12GB显存),这个配置跑1.8B的Int4模型绰绰有余。软件环境方面,Python 3.8以上,装好CUDA和对应的PyTorch就行。

模型我用的是通义千问1.5-1.8B-Chat的GPTQ-Int4量化版。选择它,一是因为体积小,部署方便;二是因为Int4量化在几乎不损失精度的情况下,大幅提升了推理速度并降低了显存占用,这对响应速度要求高的客服场景很关键。

2. 模型服务的封装与启动

模型下载下来后,不能直接裸奔着用。我们需要把它包装成一个Web服务,这里我用的是FastAPI,因为它轻量、异步支持好,写起来也快。

首先,安装必要的库:

pip install fastapi uvicorn transformers torch accelerate

接下来是核心的服务端代码。我创建了一个app.py文件:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from threading import Thread
import uvicorn
import torch

app = FastAPI(title="Qwen-Chat-API")

# 定义请求和响应的数据格式
class ChatRequest(BaseModel):
    message: str
    session_id: str = "default"  # 用于区分不同对话会话
    max_new_tokens: int = 512
    temperature: float = 0.7

class ChatResponse(BaseModel):
    response: str
    session_id: str

# 全局加载模型和分词器,避免每次请求重复加载
print("正在加载模型和分词器...")
MODEL_PATH = "./Qwen-1_8B-Chat-GPTQ-Int4"  # 替换为你的模型实际路径
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_PATH,
    device_map="auto",  # 自动分配模型层到GPU/CPU
    torch_dtype=torch.float16,
    trust_remote_code=True
)
print("模型加载完毕!")

# 一个简单的内存字典,用于存储不同会话的对话历史(生产环境请用Redis等)
conversation_histories = {}

def build_prompt(history, new_message):
    """构建符合通义千问Chat格式的对话Prompt"""
    prompt = ""
    for old_query, old_response in history:
        prompt += f"<|im_start|>user\n{old_query}<|im_end|>\n<|im_start|>assistant\n{old_response}<|im_end|>\n"
    prompt += f"<|im_start|>user\n{new_message}<|im_end|>\n<|im_start|>assistant\n"
    return prompt

@app.post("/chat", response_model=ChatResponse)
async def chat_with_model(request: ChatRequest):
    try:
        # 1. 获取或初始化当前会话的历史记录
        history = conversation_histories.get(request.session_id, [])
        
        # 2. 构建完整Prompt
        full_prompt = build_prompt(history, request.message)
        inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device)
        
        # 3. 生成回复
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=request.max_new_tokens,
                temperature=request.temperature,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id
            )
        # 4. 解码并提取助理的新回复
        full_response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
        
        # 5. 更新对话历史(限制历史长度,避免过长)
        history.append((request.message, full_response))
        if len(history) > 5:  # 只保留最近5轮对话历史
            history = history[-5:]
        conversation_histories[request.session_id] = history
        
        return ChatResponse(response=full_response, session_id=request.session_id)
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"模型生成错误: {str(e)}")

if __name__ == "__main__":
    # 启动服务,host='0.0.0.0'允许外部访问,生产环境务必放在Nginx等反向代理后
    uvicorn.run(app, host="0.0.0.0", port=8000)

这段代码干了这么几件事:

  1. 定义了一个标准的/chat的POST接口。
  2. session_id来区分不同用户的对话,这样每个用户的聊天记录是独立的。
  3. conversation_histories这个字典在内存里存历史对话(注意:这只是demo,服务器重启数据就没了,后面会讲怎么优化)。
  4. 按照通义千问Chat模型要求的格式(<|im_start|><|im_end|>标签)来拼接对话历史,让模型知道上下文。
  5. 生成回复后,把新的问答对存到历史里,并限制只保留最近5轮,防止Prompt太长。

启动服务很简单:

python app.py

服务跑起来后,你可以在浏览器访问 http://你的服务器IP:8000/docs 看到自动生成的API文档,并且可以在这里手动测试接口。

3. 微信小程序端API调用设计

服务端准备好了,小程序那头怎么调呢?直接调用wx.request当然可以,但不够优雅,也不好维护。我的做法是封装一个专门的聊天模块。

在小程序项目的utils目录下,我创建了一个aiChat.js文件:

// utils/aiChat.js
const API_BASE_URL = 'https://你的域名或IP:8000'; // 务必使用HTTPS!小程序要求。

class AIChatService {
  constructor(sessionId = null) {
    // 如果没有传入sessionId,则生成一个唯一ID,用于标识一次完整的对话会话
    this.sessionId = sessionId || 'mini_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
    this.isGenerating = false;
  }

  /**
   * 发送消息给AI客服
   * @param {string} message 用户输入的消息
   * @param {function} onSuccess 成功回调
   * @param {function} onError 失败回调
   */
  async sendMessage(message, onSuccess, onError) {
    if (this.isGenerating) {
      wx.showToast({ title: 'AI正在思考中...', icon: 'none' });
      return;
    }

    this.isGenerating = true;
    wx.showLoading({ title: 'AI思考中...', mask: true });

    try {
      const response = await new Promise((resolve, reject) => {
        wx.request({
          url: `${API_BASE_URL}/chat`,
          method: 'POST',
          data: {
            message: message,
            session_id: this.sessionId,
            max_new_tokens: 256, // 小程序回复不宜过长
            temperature: 0.8, // 稍高的温度让回复更活泼
          },
          header: {
            'content-type': 'application/json',
          },
          success: (res) => {
            if (res.statusCode === 200) {
              resolve(res.data);
            } else {
              reject(new Error(`请求失败: ${res.statusCode}`));
            }
          },
          fail: (err) => {
            reject(err);
          },
        });
      });

      wx.hideLoading();
      this.isGenerating = false;
      if (onSuccess && typeof onSuccess === 'function') {
        onSuccess(response.response); // 返回AI的回复文本
      }
    } catch (error) {
      console.error('AI聊天请求错误:', error);
      wx.hideLoading();
      wx.showToast({ title: '客服暂时走神了,请稍后再试', icon: 'none' });
      this.isGenerating = false;
      if (onError && typeof onError === 'function') {
        onError(error);
      }
    }
  }

  // 可选:重置当前会话,清空历史
  resetSession(newSessionId = null) {
    this.sessionId = newSessionId || 'mini_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
  }
}

module.exports = AIChatService;

然后在你的小程序页面(比如chatPage.js)里,就可以这样用了:

// pages/chat/chatPage.js
const AIChatService = require('../../utils/aiChat.js');

Page({
  data: {
    messageList: [], // 消息列表 {type: 'user'/'ai', content: '...'}
    inputValue: '',
    aiService: null,
  },

  onLoad() {
    // 初始化AI聊天服务,可以传入一个固定的sessionId实现“记住我”功能
    const sessionId = wx.getStorageSync('ai_chat_session_id') || null;
    this.setData({
      aiService: new AIChatService(sessionId)
    });
    // 保存sessionId,下次进入同一会话
    if (!sessionId) {
      wx.setStorageSync('ai_chat_session_id', this.data.aiService.sessionId);
    }
  },

  onInputChange(e) {
    this.setData({ inputValue: e.detail.value });
  },

  sendMessage() {
    const userMsg = this.data.inputValue.trim();
    if (!userMsg) return;

    // 1. 更新UI,显示用户消息
    const newList = this.data.messageList.concat([{ type: 'user', content: userMsg }]);
    this.setData({ messageList: newList, inputValue: '' });

    // 2. 调用AI服务
    this.data.aiService.sendMessage(
      userMsg,
      // 成功回调
      (aiResponse) => {
        const updatedList = this.data.messageList.concat([{ type: 'ai', content: aiResponse }]);
        this.setData({ messageList: updatedList });
        // 滚动到底部
        wx.pageScrollTo({ selector: '#chat-end', duration: 300 });
      },
      // 失败回调
      (error) => {
        console.error('发送失败', error);
      }
    );
  },
})

这样封装的好处很明显:业务逻辑清晰,调用简单,还能方便地控制加载状态、错误提示,以及管理对话会话。

4. 关键问题:对话上下文管理与性能优化

刚才的Demo版用内存字典存历史,问题很大。一是数据丢,二是如果用户多了,内存肯定撑不住。在实际项目里,我换成了Redis。

# 改进后的历史管理部分 (utils/redis_manager.py)
import redis
import json
import hashlib

class ConversationManager:
    def __init__(self, redis_url='redis://localhost:6379', max_history_len=5):
        self.redis_client = redis.from_url(redis_url, decode_responses=True)
        self.max_len = max_history_len
        self.key_prefix = "chat_history:"
    
    def _get_key(self, session_id):
        # 对session_id做简单哈希,作为Redis key的一部分
        return self.key_prefix + hashlib.md5(session_id.encode()).hexdigest()
    
    def get_history(self, session_id):
        key = self._get_key(session_id)
        history_json = self.redis_client.get(key)
        if history_json:
            return json.loads(history_json)
        return []
    
    def save_history(self, session_id, history):
        key = self._get_key(session_id)
        # 只保存最近N轮对话
        trimmed_history = history[-self.max_len:]
        self.redis_client.setex(key, 3600 * 24, json.dumps(trimmed_history)) # 设置24小时过期

# 然后在FastAPI app中替换掉原来的字典
from utils.redis_manager import ConversationManager
conv_manager = ConversationManager()

@app.post("/chat")
async def chat_with_model(request: ChatRequest):
    history = conv_manager.get_history(request.session_id)
    # ... 构建prompt、生成回复 ...
    history.append((request.message, full_response))
    conv_manager.save_history(request.session_id, history)
    # ...

用了Redis之后,对话历史能持久化保存,并且可以设置过期时间,管理起来方便多了。

另一个问题是性能。虽然1.8B模型不算大,但如果小程序用户突然暴增,一个GPU实例可能处理不过来请求。我采取了两个简单的策略:

  1. 异步生成与超时控制:使用asyncio和模型本身的流式生成或异步支持,避免一个长请求阻塞其他请求。同时在代码里设置合理的超时时间。
  2. 请求队列与限流:在FastAPI前面加一个简单的内存队列(比如用asyncio.Queue),或者用更专业的celery,把生成任务丢到后台队列里处理,前端轮询结果。对于公开服务,一定要在Nginx或API网关层做限流,防止被刷。
# 简单的异步处理思路(伪代码)
from fastapi import BackgroundTasks
import asyncio

task_queue = asyncio.Queue()
result_cache = {} # 用redis更好

async def worker():
    while True:
        session_id, message = await task_queue.get()
        # ... 调用模型生成 ...
        result_cache[session_id] = generated_text
        task_queue.task_done()

@app.post("/chat/async")
async def async_chat(request: ChatRequest, background_tasks: BackgroundTasks):
    task_id = request.session_id + "_" + str(time.time())
    await task_queue.put((task_id, request.message))
    return {"task_id": task_id, "status": "queued"}

@app.get("/chat/result/{task_id}")
async def get_result(task_id: str):
    result = result_cache.get(task_id)
    if result:
        return {"status": "completed", "response": result}
    else:
        return {"status": "processing"}

对于小程序来说,短轮询(每隔几秒查一次结果)是可以接受的,这样能有效缓解瞬时高并发对模型推理的直接冲击。

5. 总结

把通义千问这样的模型集成到微信小程序里做客服,技术上已经非常可行了。整个过程就像搭积木:先把模型用FastAPI包装成一个可靠的HTTP服务,处理好对话上下文;然后在小程序端用清晰的方式封装调用逻辑;最后根据实际访问量,考虑用Redis存历史、用队列缓解并发压力。

我按照这个思路做的客服模块,已经稳定运行了一段时间。对于常见问题,比如“你们的服务怎么收费?”、“我的订单到哪了?”,模型都能给出准确、流畅的回答,用户体验还不错。当然,它毕竟不是万能的,对于特别复杂或者需要实时查数据库的问题,我们设定了规则,让AI引导用户转接人工客服。

这个方案的好处是自主可控,数据都在自己服务器上,心里踏实。成本也主要就是一台带GPU的云服务器,如果访问量不大,甚至中等配置的显卡都够用。如果你也想试试,建议先从简单的内存版本开始,跑通整个流程,然后再逐步引入Redis、队列这些组件,这样迭代起来会更顺畅。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐