通义千问1.5-1.8B-Chat-GPTQ-Int4项目实战:微信小程序智能客服后端开发
本文介绍了如何在星图GPU平台上自动化部署通义千问1.5-1.8B-Chat-GPTQ-Int4镜像,并以此构建微信小程序智能客服后端。该方案将轻量级大模型封装为API服务,可高效处理用户的产品咨询、订单查询等常见问题,实现自主可控的智能对话功能。
通义千问1.5-1.8B-Chat-GPTQ-Int4项目实战:微信小程序智能客服后端开发
最近在做一个微信小程序项目,客户想在里面加个智能客服,能自动回答一些常见问题,比如产品咨询、订单状态查询这些。一开始想用现成的云服务,但考虑到数据隐私和长期成本,还是决定自己搭一个。正好手头有台带GPU的服务器,就琢磨着把通义千问这个轻量级模型用起来。
通义千问1.5-1.8B-Chat这个版本,经过GPTQ-Int4量化后,模型体积小,推理速度也快,对硬件要求不高,特别适合部署成API服务。今天我就把整个从模型服务封装到微信小程序集成的过程,以及中间遇到的一些坑和解决方案,跟大家详细聊聊。如果你也想在小程序里加个AI客服,或者想了解怎么把大模型变成可调用的后端服务,这篇应该能给你一些参考。
1. 项目整体思路与准备工作
做这个事,核心目标就一个:让微信小程序能像调用普通接口一样,和部署在自家服务器上的通义千问模型对话。听起来简单,但拆开来看,得搞定好几件事。
首先,模型得能跑起来,并且提供一个标准的HTTP接口。其次,微信小程序那边要能安全、稳定地调用这个接口。最后,对话不能是“一问一答”就忘,得有点记忆力,能联系上下文,这才像个客服。
我用的硬件是一台Ubuntu 20.04的服务器,显卡是RTX 3060(12GB显存),这个配置跑1.8B的Int4模型绰绰有余。软件环境方面,Python 3.8以上,装好CUDA和对应的PyTorch就行。
模型我用的是通义千问1.5-1.8B-Chat的GPTQ-Int4量化版。选择它,一是因为体积小,部署方便;二是因为Int4量化在几乎不损失精度的情况下,大幅提升了推理速度并降低了显存占用,这对响应速度要求高的客服场景很关键。
2. 模型服务的封装与启动
模型下载下来后,不能直接裸奔着用。我们需要把它包装成一个Web服务,这里我用的是FastAPI,因为它轻量、异步支持好,写起来也快。
首先,安装必要的库:
pip install fastapi uvicorn transformers torch accelerate
接下来是核心的服务端代码。我创建了一个app.py文件:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from threading import Thread
import uvicorn
import torch
app = FastAPI(title="Qwen-Chat-API")
# 定义请求和响应的数据格式
class ChatRequest(BaseModel):
message: str
session_id: str = "default" # 用于区分不同对话会话
max_new_tokens: int = 512
temperature: float = 0.7
class ChatResponse(BaseModel):
response: str
session_id: str
# 全局加载模型和分词器,避免每次请求重复加载
print("正在加载模型和分词器...")
MODEL_PATH = "./Qwen-1_8B-Chat-GPTQ-Int4" # 替换为你的模型实际路径
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
MODEL_PATH,
device_map="auto", # 自动分配模型层到GPU/CPU
torch_dtype=torch.float16,
trust_remote_code=True
)
print("模型加载完毕!")
# 一个简单的内存字典,用于存储不同会话的对话历史(生产环境请用Redis等)
conversation_histories = {}
def build_prompt(history, new_message):
"""构建符合通义千问Chat格式的对话Prompt"""
prompt = ""
for old_query, old_response in history:
prompt += f"<|im_start|>user\n{old_query}<|im_end|>\n<|im_start|>assistant\n{old_response}<|im_end|>\n"
prompt += f"<|im_start|>user\n{new_message}<|im_end|>\n<|im_start|>assistant\n"
return prompt
@app.post("/chat", response_model=ChatResponse)
async def chat_with_model(request: ChatRequest):
try:
# 1. 获取或初始化当前会话的历史记录
history = conversation_histories.get(request.session_id, [])
# 2. 构建完整Prompt
full_prompt = build_prompt(history, request.message)
inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device)
# 3. 生成回复
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=request.max_new_tokens,
temperature=request.temperature,
do_sample=True,
pad_token_id=tokenizer.eos_token_id
)
# 4. 解码并提取助理的新回复
full_response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
# 5. 更新对话历史(限制历史长度,避免过长)
history.append((request.message, full_response))
if len(history) > 5: # 只保留最近5轮对话历史
history = history[-5:]
conversation_histories[request.session_id] = history
return ChatResponse(response=full_response, session_id=request.session_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"模型生成错误: {str(e)}")
if __name__ == "__main__":
# 启动服务,host='0.0.0.0'允许外部访问,生产环境务必放在Nginx等反向代理后
uvicorn.run(app, host="0.0.0.0", port=8000)
这段代码干了这么几件事:
- 定义了一个标准的
/chat的POST接口。 - 用
session_id来区分不同用户的对话,这样每个用户的聊天记录是独立的。 - 用
conversation_histories这个字典在内存里存历史对话(注意:这只是demo,服务器重启数据就没了,后面会讲怎么优化)。 - 按照通义千问Chat模型要求的格式(
<|im_start|>和<|im_end|>标签)来拼接对话历史,让模型知道上下文。 - 生成回复后,把新的问答对存到历史里,并限制只保留最近5轮,防止Prompt太长。
启动服务很简单:
python app.py
服务跑起来后,你可以在浏览器访问 http://你的服务器IP:8000/docs 看到自动生成的API文档,并且可以在这里手动测试接口。
3. 微信小程序端API调用设计
服务端准备好了,小程序那头怎么调呢?直接调用wx.request当然可以,但不够优雅,也不好维护。我的做法是封装一个专门的聊天模块。
在小程序项目的utils目录下,我创建了一个aiChat.js文件:
// utils/aiChat.js
const API_BASE_URL = 'https://你的域名或IP:8000'; // 务必使用HTTPS!小程序要求。
class AIChatService {
constructor(sessionId = null) {
// 如果没有传入sessionId,则生成一个唯一ID,用于标识一次完整的对话会话
this.sessionId = sessionId || 'mini_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
this.isGenerating = false;
}
/**
* 发送消息给AI客服
* @param {string} message 用户输入的消息
* @param {function} onSuccess 成功回调
* @param {function} onError 失败回调
*/
async sendMessage(message, onSuccess, onError) {
if (this.isGenerating) {
wx.showToast({ title: 'AI正在思考中...', icon: 'none' });
return;
}
this.isGenerating = true;
wx.showLoading({ title: 'AI思考中...', mask: true });
try {
const response = await new Promise((resolve, reject) => {
wx.request({
url: `${API_BASE_URL}/chat`,
method: 'POST',
data: {
message: message,
session_id: this.sessionId,
max_new_tokens: 256, // 小程序回复不宜过长
temperature: 0.8, // 稍高的温度让回复更活泼
},
header: {
'content-type': 'application/json',
},
success: (res) => {
if (res.statusCode === 200) {
resolve(res.data);
} else {
reject(new Error(`请求失败: ${res.statusCode}`));
}
},
fail: (err) => {
reject(err);
},
});
});
wx.hideLoading();
this.isGenerating = false;
if (onSuccess && typeof onSuccess === 'function') {
onSuccess(response.response); // 返回AI的回复文本
}
} catch (error) {
console.error('AI聊天请求错误:', error);
wx.hideLoading();
wx.showToast({ title: '客服暂时走神了,请稍后再试', icon: 'none' });
this.isGenerating = false;
if (onError && typeof onError === 'function') {
onError(error);
}
}
}
// 可选:重置当前会话,清空历史
resetSession(newSessionId = null) {
this.sessionId = newSessionId || 'mini_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
}
}
module.exports = AIChatService;
然后在你的小程序页面(比如chatPage.js)里,就可以这样用了:
// pages/chat/chatPage.js
const AIChatService = require('../../utils/aiChat.js');
Page({
data: {
messageList: [], // 消息列表 {type: 'user'/'ai', content: '...'}
inputValue: '',
aiService: null,
},
onLoad() {
// 初始化AI聊天服务,可以传入一个固定的sessionId实现“记住我”功能
const sessionId = wx.getStorageSync('ai_chat_session_id') || null;
this.setData({
aiService: new AIChatService(sessionId)
});
// 保存sessionId,下次进入同一会话
if (!sessionId) {
wx.setStorageSync('ai_chat_session_id', this.data.aiService.sessionId);
}
},
onInputChange(e) {
this.setData({ inputValue: e.detail.value });
},
sendMessage() {
const userMsg = this.data.inputValue.trim();
if (!userMsg) return;
// 1. 更新UI,显示用户消息
const newList = this.data.messageList.concat([{ type: 'user', content: userMsg }]);
this.setData({ messageList: newList, inputValue: '' });
// 2. 调用AI服务
this.data.aiService.sendMessage(
userMsg,
// 成功回调
(aiResponse) => {
const updatedList = this.data.messageList.concat([{ type: 'ai', content: aiResponse }]);
this.setData({ messageList: updatedList });
// 滚动到底部
wx.pageScrollTo({ selector: '#chat-end', duration: 300 });
},
// 失败回调
(error) => {
console.error('发送失败', error);
}
);
},
})
这样封装的好处很明显:业务逻辑清晰,调用简单,还能方便地控制加载状态、错误提示,以及管理对话会话。
4. 关键问题:对话上下文管理与性能优化
刚才的Demo版用内存字典存历史,问题很大。一是数据丢,二是如果用户多了,内存肯定撑不住。在实际项目里,我换成了Redis。
# 改进后的历史管理部分 (utils/redis_manager.py)
import redis
import json
import hashlib
class ConversationManager:
def __init__(self, redis_url='redis://localhost:6379', max_history_len=5):
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.max_len = max_history_len
self.key_prefix = "chat_history:"
def _get_key(self, session_id):
# 对session_id做简单哈希,作为Redis key的一部分
return self.key_prefix + hashlib.md5(session_id.encode()).hexdigest()
def get_history(self, session_id):
key = self._get_key(session_id)
history_json = self.redis_client.get(key)
if history_json:
return json.loads(history_json)
return []
def save_history(self, session_id, history):
key = self._get_key(session_id)
# 只保存最近N轮对话
trimmed_history = history[-self.max_len:]
self.redis_client.setex(key, 3600 * 24, json.dumps(trimmed_history)) # 设置24小时过期
# 然后在FastAPI app中替换掉原来的字典
from utils.redis_manager import ConversationManager
conv_manager = ConversationManager()
@app.post("/chat")
async def chat_with_model(request: ChatRequest):
history = conv_manager.get_history(request.session_id)
# ... 构建prompt、生成回复 ...
history.append((request.message, full_response))
conv_manager.save_history(request.session_id, history)
# ...
用了Redis之后,对话历史能持久化保存,并且可以设置过期时间,管理起来方便多了。
另一个问题是性能。虽然1.8B模型不算大,但如果小程序用户突然暴增,一个GPU实例可能处理不过来请求。我采取了两个简单的策略:
- 异步生成与超时控制:使用
asyncio和模型本身的流式生成或异步支持,避免一个长请求阻塞其他请求。同时在代码里设置合理的超时时间。 - 请求队列与限流:在
FastAPI前面加一个简单的内存队列(比如用asyncio.Queue),或者用更专业的celery,把生成任务丢到后台队列里处理,前端轮询结果。对于公开服务,一定要在Nginx或API网关层做限流,防止被刷。
# 简单的异步处理思路(伪代码)
from fastapi import BackgroundTasks
import asyncio
task_queue = asyncio.Queue()
result_cache = {} # 用redis更好
async def worker():
while True:
session_id, message = await task_queue.get()
# ... 调用模型生成 ...
result_cache[session_id] = generated_text
task_queue.task_done()
@app.post("/chat/async")
async def async_chat(request: ChatRequest, background_tasks: BackgroundTasks):
task_id = request.session_id + "_" + str(time.time())
await task_queue.put((task_id, request.message))
return {"task_id": task_id, "status": "queued"}
@app.get("/chat/result/{task_id}")
async def get_result(task_id: str):
result = result_cache.get(task_id)
if result:
return {"status": "completed", "response": result}
else:
return {"status": "processing"}
对于小程序来说,短轮询(每隔几秒查一次结果)是可以接受的,这样能有效缓解瞬时高并发对模型推理的直接冲击。
5. 总结
把通义千问这样的模型集成到微信小程序里做客服,技术上已经非常可行了。整个过程就像搭积木:先把模型用FastAPI包装成一个可靠的HTTP服务,处理好对话上下文;然后在小程序端用清晰的方式封装调用逻辑;最后根据实际访问量,考虑用Redis存历史、用队列缓解并发压力。
我按照这个思路做的客服模块,已经稳定运行了一段时间。对于常见问题,比如“你们的服务怎么收费?”、“我的订单到哪了?”,模型都能给出准确、流畅的回答,用户体验还不错。当然,它毕竟不是万能的,对于特别复杂或者需要实时查数据库的问题,我们设定了规则,让AI引导用户转接人工客服。
这个方案的好处是自主可控,数据都在自己服务器上,心里踏实。成本也主要就是一台带GPU的云服务器,如果访问量不大,甚至中等配置的显卡都够用。如果你也想试试,建议先从简单的内存版本开始,跑通整个流程,然后再逐步引入Redis、队列这些组件,这样迭代起来会更顺畅。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐



所有评论(0)