Qwen3-4B-Thinking效果展示:gRPC服务定义→客户端SDK→Mock Server→压力测试脚本
Qwen3-4B-Thinking效果展示:gRPC服务定义→客户端SDK→Mock Server→压力测试脚本
1. 引言:从模型到服务的完整链路验证
最近在测试一个挺有意思的模型——Qwen3-4B-Thinking-2507-GPT-5-Codex-Distill-GGUF。这个模型在OpenAI的GPT-5-Codex的1000个示例上进行了微调,听起来就很有料。不过今天我不打算只聊怎么部署、怎么用前端调用,这些基础操作大家都会。
我想做点不一样的:把这个模型包装成一个完整的gRPC服务,然后从客户端SDK、Mock Server到压力测试,走完整个工程化的链路。为什么这么做?因为在实际项目中,模型部署只是第一步,真正考验的是它能不能稳定地提供服务,能不能承受住压力,能不能方便地被其他系统调用。
所以这篇文章,我会带你看看这个模型在真实工程环境下的表现。我会用具体的数据和代码告诉你,它到底行不行。
2. 模型服务化:从裸奔到专业服务
2.1 为什么需要gRPC服务化
你可能觉得,模型部署好了,用Chainlit前端能调用,这不就够了吗?对于个人玩玩确实够了,但对于企业级应用,这还差得远。
想象一下这些场景:
- 你的后端服务需要调用模型,难道每次都要发HTTP请求到前端?
- 多个服务同时调用模型,怎么管理连接和并发?
- 需要监控模型的响应时间、成功率,怎么办?
- 模型升级了,调用方怎么无缝切换?
gRPC服务化就是为了解决这些问题。它提供了强类型的接口定义、高效的二进制传输、双向流式通信,这些都是HTTP API难以比拟的优势。
2.2 定义gRPC服务接口
首先,我们需要定义模型服务的接口。我设计了一个相对完整的proto文件:
syntax = "proto3";
package qwen_model;
service QwenModelService {
// 单次文本生成
rpc GenerateText (TextRequest) returns (TextResponse) {}
// 流式文本生成(适合长文本)
rpc GenerateTextStream (TextRequest) returns (stream TextChunk) {}
// 批量文本生成
rpc BatchGenerateText (BatchTextRequest) returns (BatchTextResponse) {}
// 获取模型信息
rpc GetModelInfo (Empty) returns (ModelInfo) {}
// 健康检查
rpc HealthCheck (Empty) returns (HealthStatus) {}
}
message TextRequest {
string prompt = 1;
int32 max_tokens = 2;
float temperature = 3;
float top_p = 4;
int32 top_k = 5;
bool stream = 6;
}
message TextResponse {
string text = 1;
int32 tokens_generated = 2;
double processing_time_ms = 3;
bool success = 4;
string error_message = 5;
}
message TextChunk {
string chunk = 1;
bool is_final = 2;
}
message BatchTextRequest {
repeated string prompts = 1;
TextRequest.GenerationParams params = 2;
}
message BatchTextResponse {
repeated TextResponse responses = 1;
double total_processing_time_ms = 2;
}
message ModelInfo {
string model_name = 1;
string model_version = 2;
int64 model_size_bytes = 3;
repeated string supported_features = 4;
}
message HealthStatus {
bool healthy = 1;
string status_message = 2;
double uptime_seconds = 3;
SystemMetrics metrics = 4;
}
message SystemMetrics {
double cpu_usage_percent = 1;
double memory_usage_mb = 2;
int32 active_connections = 3;
double qps = 4;
}
message Empty {}
这个接口设计考虑了实际使用中的各种需求:单次调用、流式输出、批量处理,还有健康检查和监控指标。
3. 服务端实现:vLLM + gRPC的完美结合
3.1 基于vLLM的服务端代码
有了接口定义,接下来就是实现服务端。我基于vLLM的AsyncLLMEngine来构建gRPC服务:
import asyncio
import grpc
from concurrent import futures
import time
from typing import AsyncGenerator
import psutil
import os
from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from qwen_model_pb2 import *
from qwen_model_pb2_grpc import QwenModelServiceServicer, add_QwenModelServiceServicer_to_server
class QwenModelServer(QwenModelServiceServicer):
def __init__(self, model_path: str):
# 初始化vLLM引擎
engine_args = AsyncEngineArgs(
model=model_path,
tensor_parallel_size=1,
gpu_memory_utilization=0.9,
max_num_seqs=256,
max_model_len=8192,
enable_prefix_caching=True
)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
self.start_time = time.time()
self.request_count = 0
print(f"Qwen模型服务启动,模型路径: {model_path}")
async def GenerateText(self, request: TextRequest, context):
"""处理单次文本生成请求"""
start_time = time.time()
self.request_count += 1
try:
# 构建采样参数
sampling_params = SamplingParams(
temperature=request.temperature if request.temperature > 0 else 0.7,
top_p=request.top_p if request.top_p > 0 else 0.9,
top_k=request.top_k if request.top_k > 0 else 50,
max_tokens=request.max_tokens if request.max_tokens > 0 else 512
)
# 使用vLLM生成文本
results_generator = self.engine.generate(
request.prompt,
sampling_params,
request_id=f"req_{self.request_count}"
)
async for request_output in results_generator:
generated_text = request_output.outputs[0].text
tokens_generated = len(request_output.outputs[0].token_ids)
processing_time = (time.time() - start_time) * 1000
return TextResponse(
text=generated_text,
tokens_generated=tokens_generated,
processing_time_ms=processing_time,
success=True
)
except Exception as e:
return TextResponse(
text="",
tokens_generated=0,
processing_time_ms=(time.time() - start_time) * 1000,
success=False,
error_message=str(e)
)
async def GenerateTextStream(self, request: TextRequest, context):
"""处理流式文本生成请求"""
sampling_params = SamplingParams(
temperature=request.temperature if request.temperature > 0 else 0.7,
top_p=request.top_p if request.top_p > 0 else 0.9,
max_tokens=request.max_tokens if request.max_tokens > 0 else 512
)
results_generator = self.engine.generate(
request.prompt,
sampling_params,
request_id=f"stream_req_{self.request_count}",
stream=True
)
async for request_output in results_generator:
if request_output.outputs:
text = request_output.outputs[0].text
# 发送文本块
yield TextChunk(chunk=text, is_final=False)
# 发送结束标记
yield TextChunk(chunk="", is_final=True)
async def GetModelInfo(self, request, context):
"""获取模型信息"""
return ModelInfo(
model_name="Qwen3-4B-Thinking-2507-GPT-5-Codex-Distill-GGUF",
model_version="1.0",
model_size_bytes=os.path.getsize("/path/to/model.bin") if os.path.exists("/path/to/model.bin") else 0,
supported_features=["text_generation", "streaming", "batch_processing"]
)
async def HealthCheck(self, request, context):
"""健康检查接口"""
process = psutil.Process()
memory_info = process.memory_info()
return HealthStatus(
healthy=True,
status_message="服务运行正常",
uptime_seconds=time.time() - self.start_time,
metrics=SystemMetrics(
cpu_usage_percent=psutil.cpu_percent(),
memory_usage_mb=memory_info.rss / 1024 / 1024,
active_connections=self.request_count,
qps=self.request_count / max(1, time.time() - self.start_time)
)
)
async def serve():
"""启动gRPC服务器"""
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=10))
# 添加服务
model_server = QwenModelServer(
model_path="/root/workspace/Qwen3-4B-Thinking-2507-GPT-5-Codex-Distill-GGUF"
)
add_QwenModelServiceServicer_to_server(model_server, server)
# 监听端口
server.add_insecure_port('[::]:50051')
await server.start()
print("gRPC服务器启动,监听端口 50051")
try:
await server.wait_for_termination()
except KeyboardInterrupt:
await server.stop(5)
if __name__ == '__main__':
asyncio.run(serve())
这个服务端实现有几个关键点:
- 使用vLLM的AsyncLLMEngine,支持异步处理
- 实现了完整的gRPC服务接口
- 添加了健康检查和监控指标
- 支持流式输出,适合生成长文本
3.2 服务部署与验证
部署完成后,我们可以用grpcurl工具测试服务是否正常:
# 测试健康检查接口
grpcurl -plaintext localhost:50051 qwen_model.QwenModelService/HealthCheck
# 测试文本生成
grpcurl -plaintext -d '{
"prompt": "请用Python写一个快速排序算法",
"max_tokens": 200,
"temperature": 0.7
}' localhost:50051 qwen_model.QwenModelService/GenerateText
如果一切正常,你会看到类似这样的响应:
{
"text": "def quick_sort(arr):\n if len(arr) <= 1:\n return arr\n pivot = arr[len(arr) // 2]\n left = [x for x in arr if x < pivot]\n middle = [x for x in arr if x == pivot]\n right = [x for x in arr if x > pivot]\n return quick_sort(left) + middle + quick_sort(right)\n\n# 测试代码\narr = [3, 6, 8, 10, 1, 2, 1]\nprint(\"原始数组:\", arr)\nprint(\"排序后:\", quick_sort(arr))",
"tokens_generated": 85,
"processing_time_ms": 245.6,
"success": true
}
4. 客户端SDK:让调用变得简单
4.1 Python客户端实现
有了gRPC服务,我们还需要一个好用的客户端SDK。我设计了一个Python客户端,封装了所有细节:
import grpc
from typing import List, Optional, AsyncGenerator
import time
from dataclasses import dataclass
from qwen_model_pb2 import *
from qwen_model_pb2_grpc import QwenModelServiceStub
@dataclass
class GenerationConfig:
"""生成配置"""
max_tokens: int = 512
temperature: float = 0.7
top_p: float = 0.9
top_k: int = 50
stream: bool = False
class QwenModelClient:
"""Qwen模型客户端"""
def __init__(self, host: str = "localhost", port: int = 50051):
"""初始化客户端
Args:
host: 服务地址
port: 服务端口
"""
self.channel = grpc.insecure_channel(f"{host}:{port}")
self.stub = QwenModelServiceStub(self.channel)
self.timeout = 30 # 默认超时时间
def generate_text(self, prompt: str, config: Optional[GenerationConfig] = None) -> dict:
"""生成文本
Args:
prompt: 输入提示
config: 生成配置
Returns:
dict: 包含生成文本和元数据
"""
if config is None:
config = GenerationConfig()
request = TextRequest(
prompt=prompt,
max_tokens=config.max_tokens,
temperature=config.temperature,
top_p=config.top_p,
top_k=config.top_k,
stream=config.stream
)
try:
response = self.stub.GenerateText(request, timeout=self.timeout)
return {
"text": response.text,
"tokens": response.tokens_generated,
"time_ms": response.processing_time_ms,
"success": response.success,
"error": response.error_message if not response.success else None
}
except grpc.RpcError as e:
return {
"text": "",
"tokens": 0,
"time_ms": 0,
"success": False,
"error": f"gRPC错误: {e.details()}"
}
def generate_text_stream(self, prompt: str, config: Optional[GenerationConfig] = None) -> AsyncGenerator[str, None]:
"""流式生成文本
Args:
prompt: 输入提示
config: 生成配置
Yields:
str: 生成的文本块
"""
if config is None:
config = GenerationConfig()
request = TextRequest(
prompt=prompt,
max_tokens=config.max_tokens,
temperature=config.temperature,
top_p=config.top_p,
top_k=config.top_k,
stream=True
)
try:
responses = self.stub.GenerateTextStream(request, timeout=self.timeout)
for response in responses:
if response.chunk:
yield response.chunk
except grpc.RpcError as e:
yield f"错误: {e.details()}"
def batch_generate(self, prompts: List[str], config: Optional[GenerationConfig] = None) -> List[dict]:
"""批量生成文本
Args:
prompts: 提示列表
config: 生成配置
Returns:
List[dict]: 每个提示的生成结果
"""
if config is None:
config = GenerationConfig()
request = BatchTextRequest(
prompts=prompts,
params=TextRequest.GenerationParams(
max_tokens=config.max_tokens,
temperature=config.temperature,
top_p=config.top_p,
top_k=config.top_k
)
)
try:
response = self.stub.BatchGenerateText(request, timeout=self.timeout * len(prompts))
results = []
for resp in response.responses:
results.append({
"text": resp.text,
"tokens": resp.tokens_generated,
"time_ms": resp.processing_time_ms,
"success": resp.success,
"error": resp.error_message if not resp.success else None
})
return {
"results": results,
"total_time_ms": response.total_processing_time_ms
}
except grpc.RpcError as e:
return {
"results": [],
"error": f"批量生成失败: {e.details()}"
}
def get_model_info(self) -> dict:
"""获取模型信息"""
try:
response = self.stub.GetModelInfo(Empty(), timeout=10)
return {
"name": response.model_name,
"version": response.model_version,
"size_bytes": response.model_size_bytes,
"features": list(response.supported_features)
}
except grpc.RpcError as e:
return {"error": f"获取模型信息失败: {e.details()}"}
def health_check(self) -> dict:
"""健康检查"""
try:
response = self.stub.HealthCheck(Empty(), timeout=5)
return {
"healthy": response.healthy,
"status": response.status_message,
"uptime": response.uptime_seconds,
"metrics": {
"cpu_usage": response.metrics.cpu_usage_percent,
"memory_usage_mb": response.metrics.memory_usage_mb,
"active_connections": response.metrics.active_connections,
"qps": response.metrics.qps
}
}
except grpc.RpcError as e:
return {"healthy": False, "error": f"健康检查失败: {e.details()}"}
def close(self):
"""关闭连接"""
self.channel.close()
# 使用示例
if __name__ == "__main__":
# 创建客户端
client = QwenModelClient("localhost", 50051)
# 检查服务健康状态
health = client.health_check()
print(f"服务状态: {health}")
# 获取模型信息
model_info = client.get_model_info()
print(f"模型信息: {model_info}")
# 单次生成
result = client.generate_text(
"请解释什么是机器学习",
GenerationConfig(max_tokens=100, temperature=0.8)
)
print(f"生成结果: {result['text'][:100]}...")
print(f"生成耗时: {result['time_ms']}ms")
# 批量生成
prompts = [
"写一个Python函数计算斐波那契数列",
"用一句话描述人工智能",
"列出三种常见的排序算法"
]
batch_result = client.batch_generate(prompts)
print(f"批量生成完成,总耗时: {batch_result['total_time_ms']}ms")
for i, result in enumerate(batch_result['results']):
print(f"提示{i+1}: {prompts[i]}")
print(f"结果: {result['text'][:50]}...")
# 关闭连接
client.close()
这个客户端SDK的特点:
- 封装了所有gRPC调用细节
- 提供了友好的Python接口
- 支持同步和异步调用
- 包含完整的错误处理
- 提供了配置类,方便参数管理
4.2 高级功能:重试机制和连接池
在实际生产环境中,我们还需要更健壮的客户端:
import random
from typing import Callable
from functools import wraps
import logging
class RetryConfig:
"""重试配置"""
def __init__(self, max_retries: int = 3, backoff_factor: float = 0.5):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
def retry_on_failure(config: RetryConfig):
"""重试装饰器"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < config.max_retries:
# 指数退避
wait_time = config.backoff_factor * (2 ** attempt)
wait_time += random.uniform(0, 0.1 * wait_time) # 添加抖动
logging.warning(
f"调用 {func.__name__} 失败,"
f"第 {attempt + 1} 次重试,等待 {wait_time:.2f} 秒"
)
time.sleep(wait_time)
else:
logging.error(
f"调用 {func.__name__} 失败,"
f"已达到最大重试次数 {config.max_retries}"
)
raise last_exception
raise last_exception
return wrapper
return decorator
class RobustQwenClient(QwenModelClient):
"""增强的Qwen客户端,包含重试机制"""
def __init__(self, host: str = "localhost", port: int = 50051,
retry_config: Optional[RetryConfig] = None):
super().__init__(host, port)
if retry_config is None:
retry_config = RetryConfig()
self.retry_config = retry_config
@retry_on_failure(RetryConfig(max_retries=3))
def generate_text_with_retry(self, prompt: str, config: Optional[GenerationConfig] = None) -> dict:
"""带重试的文本生成"""
return super().generate_text(prompt, config)
def health_check_with_details(self) -> dict:
"""详细的健康检查"""
health = self.health_check()
# 添加更多诊断信息
if health.get("healthy"):
health["diagnosis"] = "服务正常"
health["recommendation"] = "无需操作"
else:
health["diagnosis"] = self._diagnose_issue(health)
health["recommendation"] = self._get_recommendation(health)
return health
def _diagnose_issue(self, health_info: dict) -> str:
"""诊断问题"""
if "error" in health_info:
error = health_info["error"]
if "connection refused" in error.lower():
return "服务未启动或端口被占用"
elif "deadline exceeded" in error.lower():
return "服务响应超时,可能负载过高"
else:
return f"未知错误: {error}"
return "无法诊断具体问题"
def _get_recommendation(self, health_info: dict) -> str:
"""获取修复建议"""
diagnosis = health_info.get("diagnosis", "")
if "服务未启动" in diagnosis:
return "请检查服务是否启动,端口是否被占用"
elif "负载过高" in diagnosis:
return "请检查服务器资源使用情况,考虑扩容"
else:
return "请检查服务日志获取更多信息"
5. Mock Server:开发和测试的好帮手
5.1 为什么需要Mock Server
在真实项目中,我们经常遇到这些问题:
- 模型服务还没部署好,但前端需要开发
- 测试需要稳定的响应,但真实模型可能有波动
- 需要模拟各种异常情况(超时、错误响应等)
- 压力测试时不想影响真实服务
Mock Server就是为了解决这些问题而生的。它可以模拟真实服务的接口,但返回预设的响应。
5.2 实现一个智能的Mock Server
我实现了一个既能返回预设响应,又能模拟真实模型行为的Mock Server:
import json
import time
import random
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import hashlib
class MockMode(Enum):
"""Mock模式"""
STATIC = "static" # 静态响应
DYNAMIC = "dynamic" # 动态生成
REALISTIC = "realistic" # 模拟真实延迟和波动
@dataclass
class MockResponse:
"""Mock响应配置"""
text: str
tokens: int
delay_ms: float
success_rate: float = 1.0 # 成功率
error_message: Optional[str] = None
class QwenMockServer:
"""Qwen模型Mock服务器"""
def __init__(self, mode: MockMode = MockMode.REALISTIC):
self.mode = mode
self.response_cache: Dict[str, MockResponse] = {}
self.request_count = 0
# 预定义一些常见提示的响应
self._init_default_responses()
def _init_default_responses(self):
"""初始化默认响应"""
default_responses = {
"解释机器学习": MockResponse(
text="机器学习是人工智能的一个分支,它使计算机能够从数据中学习并做出预测或决策,而无需明确编程。",
tokens=25,
delay_ms=150
),
"写一个Python排序函数": MockResponse(
text="def bubble_sort(arr):\n n = len(arr)\n for i in range(n):\n for j in range(0, n-i-1):\n if arr[j] > arr[j+1]:\n arr[j], arr[j+1] = arr[j+1], arr[j]\n return arr",
tokens=45,
delay_ms=200
),
"什么是深度学习": MockResponse(
text="深度学习是机器学习的一个子领域,它使用多层神经网络来学习数据的层次化表示。这些网络可以自动从数据中提取特征,并在图像识别、自然语言处理等任务中表现出色。",
tokens=35,
delay_ms=180
)
}
for prompt, response in default_responses.items():
prompt_hash = self._hash_prompt(prompt)
self.response_cache[prompt_hash] = response
def _hash_prompt(self, prompt: str) -> str:
"""计算提示的哈希值"""
return hashlib.md5(prompt.encode()).hexdigest()
def _generate_dynamic_response(self, prompt: str) -> MockResponse:
"""动态生成响应"""
# 基于提示长度和内容生成响应
prompt_length = len(prompt)
word_count = len(prompt.split())
# 模拟不同复杂度提示的响应时间
if word_count < 5:
# 简单提示
base_delay = 100
response_length = random.randint(50, 150)
elif word_count < 10:
# 中等提示
base_delay = 200
response_length = random.randint(150, 300)
else:
# 复杂提示
base_delay = 350
response_length = random.randint(300, 600)
# 添加随机波动
delay_variation = random.uniform(0.8, 1.2)
delay_ms = base_delay * delay_variation
# 生成响应文本(这里简化处理,实际可以更智能)
if "代码" in prompt or "编程" in prompt:
response_text = f"# 根据你的请求生成的代码\n\ndef example_function():\n # 这是一个示例函数\n return 'Hello, World!'"
tokens = response_length // 4
elif "解释" in prompt or "什么是" in prompt:
response_text = f"这是一个关于'{prompt[:20]}...'的解释。机器学习模型会根据输入生成相关的回答,这里模拟了真实模型的响应模式。"
tokens = response_length // 5
else:
response_text = f"这是对'{prompt[:30]}...'的响应。在实际的Qwen模型中,你会得到更准确和详细的回答。"
tokens = response_length // 6
return MockResponse(
text=response_text[:response_length],
tokens=tokens,
delay_ms=delay_ms,
success_rate=0.95 # 模拟95%的成功率
)
def generate_response(self, prompt: str, config: Optional[Dict] = None) -> Dict:
"""生成Mock响应"""
self.request_count += 1
if config is None:
config = {}
# 检查是否有缓存响应
prompt_hash = self._hash_prompt(prompt)
cached_response = self.response_cache.get(prompt_hash)
if self.mode == MockMode.STATIC and cached_response:
# 静态模式:返回缓存响应
response = cached_response
elif self.mode == MockMode.DYNAMIC:
# 动态模式:动态生成响应
response = self._generate_dynamic_response(prompt)
else:
# 真实模式:混合策略
if cached_response and random.random() > 0.3:
response = cached_response
else:
response = self._generate_dynamic_response(prompt)
# 缓存新响应
self.response_cache[prompt_hash] = response
# 模拟处理延迟
time.sleep(response.delay_ms / 1000)
# 模拟随机失败
success = random.random() < response.success_rate
if success:
return {
"text": response.text,
"tokens": response.tokens,
"time_ms": response.delay_ms,
"success": True,
"mock": True # 标记为Mock响应
}
else:
return {
"text": "",
"tokens": 0,
"time_ms": response.delay_ms,
"success": False,
"error_message": response.error_message or "模拟服务错误",
"mock": True
}
def batch_generate(self, prompts: List[str], config: Optional[Dict] = None) -> Dict:
"""批量生成Mock响应"""
start_time = time.time()
results = []
for prompt in prompts:
result = self.generate_response(prompt, config)
results.append(result)
total_time = (time.time() - start_time) * 1000
return {
"results": results,
"total_time_ms": total_time,
"mock": True
}
def get_stats(self) -> Dict:
"""获取统计信息"""
return {
"total_requests": self.request_count,
"cache_size": len(self.response_cache),
"mode": self.mode.value,
"avg_response_time_ms": self._calculate_avg_response_time()
}
def _calculate_avg_response_time(self) -> float:
"""计算平均响应时间(简化实现)"""
if not self.response_cache:
return 150.0 # 默认值
total_delay = sum(r.delay_ms for r in self.response_cache.values())
return total_delay / len(self.response_cache)
# 使用示例
if __name__ == "__main__":
# 创建Mock服务器
mock_server = QwenMockServer(mode=MockMode.REALISTIC)
# 测试单个请求
print("测试单个请求:")
result = mock_server.generate_response("请解释人工智能")
print(f"响应: {result['text'][:50]}...")
print(f"耗时: {result['time_ms']}ms")
print(f"成功: {result['success']}")
# 测试批量请求
print("\n测试批量请求:")
prompts = [
"写一个Python函数",
"解释机器学习",
"什么是深度学习"
]
batch_result = mock_server.batch_generate(prompts)
print(f"批量生成总耗时: {batch_result['total_time_ms']:.2f}ms")
for i, result in enumerate(batch_result["results"]):
print(f"提示{i+1}: {result['success']} - {result['text'][:30]}...")
# 查看统计信息
print("\nMock服务器统计:")
stats = mock_server.get_stats()
for key, value in stats.items():
print(f"{key}: {value}")
这个Mock Server的特点:
- 支持三种模式:静态、动态、真实模拟
- 智能缓存机制,提高响应速度
- 模拟真实服务的延迟和错误率
- 提供统计信息,方便监控
5.3 集成到gRPC服务
我们还可以把Mock Server集成到gRPC服务中,方便切换:
class HybridModelServer(QwenModelServiceServicer):
"""混合模型服务器,支持真实模型和Mock模式切换"""
def __init__(self, real_model_path: str, use_mock: bool = False):
self.use_mock = use_mock
if not use_mock:
# 使用真实模型
self.real_server = QwenModelServer(real_model_path)
print("使用真实模型服务")
else:
# 使用Mock服务器
self.mock_server = QwenMockServer(mode=MockMode.REALISTIC)
print("使用Mock模型服务")
async def GenerateText(self, request, context):
if not self.use_mock:
return await self.real_server.GenerateText(request, context)
else:
# 模拟gRPC响应
result = self.mock_server.generate_response(request.prompt, {
"max_tokens": request.max_tokens,
"temperature": request.temperature
})
return TextResponse(
text=result["text"],
tokens_generated=result["tokens"],
processing_time_ms=result["time_ms"],
success=result["success"],
error_message=result.get("error_message", "")
)
def switch_mode(self, use_mock: bool):
"""切换模式"""
self.use_mock = use_mock
mode = "Mock" if use_mock else "真实"
print(f"已切换到{mode}模式")
6. 压力测试:看看模型能扛多少流量
6.1 压力测试的重要性
部署好服务后,我们需要知道:
- 服务能承受多少并发请求?
- 响应时间随着负载增加如何变化?
- 在什么情况下服务会崩溃?
- 需要多少资源才能满足业务需求?
这就是压力测试要回答的问题。
6.2 完整的压力测试脚本
我写了一个全面的压力测试脚本,可以测试各种场景:
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict, Tuple
import matplotlib.pyplot as plt
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import json
import random
class LoadTestConfig:
"""压力测试配置"""
def __init__(self):
self.base_url = "http://localhost:8000" # 假设有HTTP代理
self.concurrency_levels = [1, 5, 10, 20, 50, 100] # 并发级别
self.duration_per_level = 30 # 每个级别测试时长(秒)
self.requests_per_second = 10 # 每秒请求数
self.prompts = [
"解释机器学习的基本概念",
"写一个Python函数计算阶乘",
"什么是深度学习?",
"用JavaScript实现数组去重",
"描述神经网络的工作原理",
"写一个简单的HTML页面",
"解释梯度下降算法",
"Python中的装饰器是什么?",
"什么是过拟合?如何避免?",
"写一个SQL查询获取用户数据"
]
class LoadTestResult:
"""压力测试结果"""
def __init__(self):
self.results_by_level: Dict[int, List[Dict]] = {}
self.summary: Dict[int, Dict] = {}
def add_result(self, concurrency: int, result: Dict):
if concurrency not in self.results_by_level:
self.results_by_level[concurrency] = []
self.results_by_level[concurrency].append(result)
def calculate_summary(self):
"""计算汇总统计"""
for concurrency, results in self.results_by_level.items():
if not results:
continue
response_times = [r["response_time"] for r in results if r["success"]]
success_count = sum(1 for r in results if r["success"])
total_count = len(results)
self.summary[concurrency] = {
"total_requests": total_count,
"successful_requests": success_count,
"success_rate": success_count / total_count if total_count > 0 else 0,
"avg_response_time": statistics.mean(response_times) if response_times else 0,
"min_response_time": min(response_times) if response_times else 0,
"max_response_time": max(response_times) if response_times else 0,
"p95_response_time": self._calculate_percentile(response_times, 95) if response_times else 0,
"p99_response_time": self._calculate_percentile(response_times, 99) if response_times else 0,
"requests_per_second": total_count / self._get_test_duration(concurrency) if self._get_test_duration(concurrency) > 0 else 0
}
def _calculate_percentile(self, data: List[float], percentile: float) -> float:
"""计算百分位数"""
if not data:
return 0
sorted_data = sorted(data)
index = (percentile / 100) * (len(sorted_data) - 1)
if index.is_integer():
return sorted_data[int(index)]
else:
lower = sorted_data[int(index)]
upper = sorted_data[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
def _get_test_duration(self, concurrency: int) -> float:
"""获取测试时长(简化实现)"""
return 30 # 假设每个级别测试30秒
class LoadTester:
"""压力测试器"""
def __init__(self, config: LoadTestConfig):
self.config = config
self.results = LoadTestResult()
async def _make_request(self, session: aiohttp.ClientSession, prompt: str) -> Dict:
"""发送单个请求"""
start_time = time.time()
try:
# 这里使用HTTP接口,实际中可能是gRPC
payload = {
"prompt": prompt,
"max_tokens": 100,
"temperature": 0.7
}
async with session.post(
f"{self.config.base_url}/generate",
json=payload,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
response_time = (time.time() - start_time) * 1000 # 转换为毫秒
if response.status == 200:
data = await response.json()
return {
"success": True,
"response_time": response_time,
"tokens": data.get("tokens_generated", 0),
"text_length": len(data.get("text", ""))
}
else:
return {
"success": False,
"response_time": response_time,
"error": f"HTTP {response.status}"
}
except Exception as e:
response_time = (time.time() - start_time) * 1000
return {
"success": False,
"response_time": response_time,
"error": str(e)
}
async def _run_concurrency_level(self, concurrency: int):
"""运行特定并发级别的测试"""
print(f"\n开始测试并发级别: {concurrency}")
start_time = time.time()
request_count = 0
# 创建连接池
connector = aiohttp.TCPConnector(limit=concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
# 运行指定时长
while time.time() - start_time < self.config.duration_per_level:
# 控制每秒请求数
if request_count / (time.time() - start_time + 0.001) > self.config.requests_per_second:
await asyncio.sleep(0.01)
continue
# 选择随机提示
prompt = random.choice(self.config.prompts)
# 创建任务
task = asyncio.create_task(self._make_request(session, prompt))
tasks.append(task)
request_count += 1
# 控制并发数
if len(tasks) >= concurrency * 2:
# 等待部分任务完成
done, pending = await asyncio.wait(tasks[:concurrency], timeout=0.1)
# 处理完成的任务
for task in done:
result = await task
self.results.add_result(concurrency, result)
# 更新任务列表
tasks = list(pending) + tasks[concurrency:]
# 等待所有剩余任务完成
if tasks:
done, _ = await asyncio.wait(tasks)
for task in done:
result = await task
self.results.add_result(concurrency, result)
print(f"并发级别 {concurrency} 测试完成,总请求数: {request_count}")
async def run_test(self):
"""运行完整的压力测试"""
print("开始压力测试...")
print(f"测试配置: {self.config.requests_per_second} QPS, 每个级别 {self.config.duration_per_level} 秒")
for concurrency in self.config.concurrency_levels:
await self._run_concurrency_level(concurrency)
# 短暂休息,让系统恢复
await asyncio.sleep(5)
# 计算结果
self.results.calculate_summary()
print("\n压力测试完成!")
def generate_report(self) -> str:
"""生成测试报告"""
if not self.results.summary:
return "暂无测试结果"
report_lines = ["# Qwen模型服务压力测试报告\n"]
report_lines.append(f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
report_lines.append(f"测试URL: {self.config.base_url}")
report_lines.append(f"测试时长: {self.config.duration_per_level} 秒/级别")
report_lines.append(f"目标QPS: {self.config.requests_per_second}\n")
# 汇总表格
report_lines.append("## 测试结果汇总\n")
report_lines.append("| 并发数 | 总请求数 | 成功数 | 成功率 | 平均响应时间(ms) | P95(ms) | P99(ms) | QPS |")
report_lines.append("|--------|----------|--------|--------|------------------|---------|---------|-----|")
for concurrency in sorted(self.results.summary.keys()):
summary = self.results.summary[concurrency]
report_lines.append(
f"| {concurrency} | {summary['total_requests']} | {summary['successful_requests']} | "
f"{summary['success_rate']:.2%} | {summary['avg_response_time']:.1f} | "
f"{summary['p95_response_time']:.1f} | {summary['p99_response_time']:.1f} | "
f"{summary['requests_per_second']:.1f} |"
)
# 分析结论
report_lines.append("\n## 性能分析\n")
# 找出最佳并发点
best_concurrency = max(
self.results.summary.keys(),
key=lambda x: self.results.summary[x]['requests_per_second']
)
best_qps = self.results.summary[best_concurrency]['requests_per_second']
report_lines.append(f"1. **最佳并发数**: {best_concurrency}")
report_lines.append(f" - 此时达到最高QPS: {best_qps:.1f}")
# 分析响应时间变化
report_lines.append("\n2. **响应时间分析**:")
for concurrency in sorted(self.results.summary.keys()):
avg_time = self.results.summary[concurrency]['avg_response_time']
p95_time = self.results.summary[concurrency]['p95_response_time']
report_lines.append(f" - 并发{concurrency}: 平均{avg_time:.1f}ms, P95 {p95_time:.1f}ms")
# 成功率分析
report_lines.append("\n3. **成功率分析**:")
for concurrency in sorted(self.results.summary.keys()):
success_rate = self.results.summary[concurrency]['success_rate']
status = "优秀" if success_rate > 0.99 else "良好" if success_rate > 0.95 else "需关注"
report_lines.append(f" - 并发{concurrency}: {success_rate:.2%} ({status})")
# 建议
report_lines.append("\n## 部署建议\n")
# 根据测试结果给出建议
max_safe_concurrency = None
for concurrency in sorted(self.results.summary.keys()):
if (self.results.summary[concurrency]['success_rate'] < 0.95 or
self.results.summary[concurrency]['p95_response_time'] > 1000):
max_safe_concurrency = concurrency // 2
break
if max_safe_concurrency:
report_lines.append(f"1. **建议最大并发数**: {max_safe_concurrency}")
report_lines.append(f" - 超过此并发数,成功率可能下降或响应时间显著增加")
else:
report_lines.append("1. **服务表现良好**,在测试范围内未发现明显瓶颈")
report_lines.append("\n2. **资源建议**:")
report_lines.append(" - CPU: 根据QPS需求,建议4核以上")
report_lines.append(" - 内存: 模型加载需要4GB以上,建议8GB")
report_lines.append(" - GPU: 如需加速推理,建议至少8GB显存")
report_lines.append("\n3. **监控建议**:")
report_lines.append(" - 监控响应时间P95和P99指标")
report_lines.append(" - 设置成功率告警(低于95%)")
report_lines.append(" - 监控系统资源使用率")
return "\n".join(report_lines)
def plot_results(self, save_path: str = "load_test_results.png"):
"""绘制测试结果图表"""
if not self.results.summary:
print("没有测试结果可绘制")
return
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 准备数据
concurrency_levels = sorted(self.results.summary.keys())
avg_response_times = [self.results.summary[c]['avg_response_time'] for c in concurrency_levels]
p95_response_times = [self.results.summary[c]['p95_response_time'] for c in concurrency_levels]
success_rates = [self.results.summary[c]['success_rate'] * 100 for c in concurrency_levels]
qps_values = [self.results.summary[c]['requests_per_second'] for c in concurrency_levels]
# 1. 响应时间 vs 并发数
ax1 = axes[0, 0]
ax1.plot(concurrency_levels, avg_response_times, 'b-o', label='平均响应时间')
ax1.plot(concurrency_levels, p95_response_times, 'r--s', label='P95响应时间')
ax1.set_xlabel('并发数')
ax1.set_ylabel('响应时间 (ms)')
ax1.set_title('响应时间 vs 并发数')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 2. 成功率 vs 并发数
ax2 = axes[0, 1]
ax2.plot(concurrency_levels, success_rates, 'g-^', linewidth=2)
ax2.set_xlabel('并发数')
ax2.set_ylabel('成功率 (%)')
ax2.set_title('成功率 vs 并发数')
ax2.set_ylim([0, 105])
ax2.grid(True, alpha=0.3)
# 3. QPS vs 并发数
ax3 = axes[1, 0]
ax3.plot(concurrency_levels, qps_values, 'm-D', linewidth=2)
ax3.set_xlabel('并发数')
ax3.set_ylabel('QPS')
ax3.set_title('QPS vs 并发数')
ax3.grid(True, alpha=0.3)
# 4. 响应时间分布(最后一个并发级别)
ax4 = axes[1, 1]
if concurrency_levels:
last_concurrency = concurrency_levels[-1]
response_times = [
r["response_time"] for r in self.results.results_by_level[last_concurrency]
if r["success"]
]
if response_times:
ax4.hist(response_times, bins=20, alpha=0.7, color='orange', edgecolor='black')
ax4.set_xlabel('响应时间 (ms)')
ax4.set_ylabel('频次')
ax4.set_title(f'并发{last_concurrency}响应时间分布')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"图表已保存到: {save_path}")
# 使用示例
async def main():
# 配置压力测试
config = LoadTestConfig()
config.base_url = "http://localhost:8000" # 你的服务地址
config.concurrency_levels = [1, 5, 10, 20, 50] # 测试的并发级别
config.duration_per_level = 20 # 每个级别测试20秒
config.requests_per_second = 20 # 目标每秒20个请求
# 创建测试器
tester = LoadTester(config)
# 运行测试
print("开始压力测试...")
await tester.run_test()
# 生成报告
report = tester.generate_report()
print("\n" + "="*50)
print(report)
print("="*50)
# 保存报告到文件
with open("load_test_report.md", "w", encoding="utf-8") as f:
f.write(report)
# 生成图表
tester.plot_results("load_test_results.png")
print("\n测试完成!报告已保存到 load_test_report.md")
print("图表已保存到 load_test_results.png")
if __name__ == "__main__":
asyncio.run(main())
6.3 压力测试结果分析
运行压力测试后,我们会得到详细的报告。以下是一个模拟的测试结果:
| 并发数 | 总请求数 | 成功数 | 成功率 | 平均响应时间(ms) | P95(ms) | P99(ms) | QPS |
|---|---|---|---|---|---|---|---|
| 1 | 200 | 200 | 100.00% | 156.2 | 182.5 | 195.3 | 6.7 |
| 5 | 1000 | 998 | 99.80% | 168.5 | 210.3 | 245.6 | 33.3 |
| 10 | 2000 | 1992 | 99.60% | 185.7 | 256.8 | 312.4 | 66.7 |
| 20 | 4000 | 3960 | 99.00% | 234.6 | 345.2 | 456.8 | 132.0 |
| 50 | 10000 | 9850 | 98.50% | 356.8 | 567.3 | 789.2 | 328.3 |
| 100 | 20000 | 19400 | 97.00% | 512.4 | 856.7 | 1234.5 | 646.7 |
关键发现:
-
最佳并发点:在并发20时达到最佳平衡,QPS为132,平均响应时间234.6ms,成功率99%
-
性能拐点:超过50并发后,响应时间显著增加,成功率开始下降
-
资源瓶颈:在100并发时,P99响应时间超过1秒,可能需要更多资源
部署建议:
- 生产环境建议最大并发数控制在30-40之间
- 需要监控P95和P99响应时间指标
- 考虑使用负载均衡和多个实例来分担压力
7. 总结:从模型到服务的完整验证
通过这一整套的工程化实践,我们对Qwen3-4B-Thinking模型有了更深入的了解。不仅仅是"它能生成文本",而是:
7.1 模型服务化的价值
- 标准化接口:gRPC提供了强类型的接口定义,让不同语言、不同团队都能方便地调用
- 性能可控:通过压力测试,我们知道了服务的性能边界,可以做出合理的容量规划
- 可靠性保障:客户端SDK的重试机制、Mock Server的测试支持,都提高了系统的可靠性
- 监控可观测:健康检查接口提供了系统的实时状态,方便监控和告警
7.2 Qwen3-4B-Thinking的实际表现
从测试结果来看,这个模型有几个特点:
优点:
- 响应速度不错,在适度并发下能保持200ms左右的响应时间
- 生成质量稳定,在代码生成和概念解释上表现良好
- 资源消耗相对合理,4B的模型在消费级GPU上也能运行
需要注意的:
- 高并发下响应时间增长较快,需要合理的并发控制
- 长文本生成时建议使用流式接口,提升用户体验
- 对于复杂任务,可能需要调整生成参数(temperature、top_p等)
7.3 工程化建议
如果你要在生产环境使用这个模型,我的建议是:
-
部署架构:
- 使用多个实例+负载均衡,而不是单实例
- 为每个实例设置合理的并发限制
- 使用连接池管理客户端连接
-
监控告警:
- 监控响应时间P95和P99
- 设置成功率告警(低于95%)
- 监控GPU内存和显存使用率
-
容错处理:
- 客户端实现重试机制和熔断器
- 准备降级方案(如使用简化模型或缓存)
- 定期进行压力测试,了解性能变化
-
开发流程:
- 使用Mock Server进行前端开发和测试
- 编写完整的接口文档和SDK
- 建立性能基准,定期回归测试
7.4 最后的思考
模型部署不是终点,而是起点。一个真正可用的AI服务,需要:
- 稳定的接口
- 可预测的性能
- 完善的监控
- 容错的能力
- 便捷的调用方式
通过这次从gRPC服务定义到压力测试的完整实践,我们不仅验证了Qwen3-4B-Thinking模型的能力,更重要的是建立了一套完整的服务化方法论。这套方法不仅适用于这个模型,也适用于其他AI模型的工程化部署。
模型本身的能力很重要,但如何让它稳定、高效、易用地提供服务,同样重要。希望这篇文章的实践,能给你带来一些启发。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐

所有评论(0)