模型性能测试脚本
明白了,你们现在的环境非常“纯粹”:只有一个裸的GPT模型(比如gpt-4o-mini)接口,没有挂载内部知识库,也没有对话历史管理。这种情况下,前面的完整评估框架里有些项(如工具调用、基于私域知识的准确性)可以暂时跳过,我们把焦点缩到两个直接诉求上:
- 通过脚本测性能,具体能测哪些?怎么测?
- 这种纯GPT聊天机器人在内部能找到哪些落地的应用场景?
一、通过脚本可以测试哪些性能?(附带实用指标)
脚本测试的核心,就是把这个GPT接口当成一个“黑盒HTTP服务”,用并发的方式模拟用户发消息,然后收集数据。
1. 首Token延迟(TTFT,Time To First Token)
- 目标:从脚本发出请求,到服务端返回第一个有效内容(流式模式下第一个
data: {"choices":[{"delta":{"content":"..."}}]})的时间。 - 为什么重要:这是用户体感“快不快”的第一印象,超过2秒就会感到卡顿。
- 脚本关键点:记录请求发出时间 T0 和收到首个delta的时间 T1,差值就是TTFT。要开启流式(
stream: true)。
2. Token生成速率(TPS,Tokens Per Second)
- 目标:模型每秒钟能吐出多少个token(近似于每秒钟输出多少个单词/汉字)。
- 怎么算:在流式返回中,记录第一个token的时间 T1 和最后一个token的时间 T_end,用输出内容的总token数 除以 (T_end - T1)。注意不要包含首token前的等待时间。
- 直观体感:TPS太低,文字就像在“挤牙膏”,阅读体验很差。好的体验通常要求在 30~50 tokens/秒 以上。
3. 端到端完整响应时间
- 目标:从发完请求,到接收完整个回答的最后一个字符的时间。
- 细分:
- 短问短答(如:“你好”“今天星期几”):看基础延迟。
- 长文生成(如:“写一篇800字的产品介绍”):看流式持续时长是否在可接受范围。
4. 并发吞吐能力
- 目标:系统能同时撑住多少个用户在“同时生成内容”。
- 关键指标:
- 最大并发数:在错误率<1%的前提下,能同时维持的流式连接数。
- 每秒处理请求数(RPS):压力工具逐步加大请求频率,观察何时开始出现排队、延迟飙升或429限流错误。
5. 限流策略的行为表现
- 要观察的:
- 超出你的账号/部署的 RPM(每分钟请求数) 或 TPM(每分钟Token数) 时,接口是立刻返回HTTP 429错误,还是返回一个友好的“系统繁忙”消息?
- 返回429后,你的客户端脚本有没有合理的等待和重试?脚本可以故意触发限流,记录“被限流到成功恢复”的耗时。
6. 最大上下文长度与输入敏感性
- 测试:
- 故意发送接近模型上下文窗口长度的长文本(比如gpt-4o-mini是128k),看会不会报错、超时或反应极慢。
- 测试在超长输入下,首Token延迟是否显著恶化——这对后续场景(如长文总结、多轮对话)有重要影响。
脚本测试工具与最简单示例
推荐工具:k6(轻量,脚本化)或 Python + asyncio + aiohttp(灵活)。
以一个Python异步压测脚本的伪逻辑为例:
import asyncio, aiohttp, time, json
async def stream_request(session, url, headers, payload):
t_start = time.time()
first_token_time = None
token_count = 0
async with session.post(url, json=payload, headers=headers) as resp:
if resp.status != 200:
return {"error": resp.status, "time_total": time.time()-t_start}
async for line in resp.content:
if line.startswith(b'data: '):
data_str = line[6:].strip()
if data_str == b'[DONE]':
break
if first_token_time is None:
first_token_time = time.time()
# 粗略计数,实际可解析content并计算token
token_count += 1
t_end = time.time()
return {
"ttft": first_token_time - t_start if first_token_time else None,
"total_time": t_end - t_start,
"tokens": token_count,
"tps": token_count / (t_end - first_token_time) if first_token_time else 0
}
async def main():
# 构造用户消息,比如模拟不同长度的prompt
prompts = ["你好", "请用300字介绍人工智能", ...]
async with aiohttp.ClientSession() as session:
tasks = [stream_request(session, URL, HEADERS, {"messages":[{"role":"user","content":p}], "stream": True}) for p in prompts]
results = await asyncio.gather(*tasks)
# 统计分析 results 中的 ttft、tps 等
收集完数据后,主要看这些百分位:P50、P95、P99,尤其关注P99长尾延迟。
二、没有内部知识,纯GPT在内部能有哪些应用场景?
因为没有内部知识库,它不适合回答“公司今年年假政策”这类私域问题(会瞎编)。它的价值在于**“通用语言能力”,可以定位为一个内部通用的文本助手**。
以下是几个高价值、风险可控的内部应用场景:
-
邮件与公文润色 / 风格转换
- “帮我把这段汇报改得更正式/更有说服力。”
- “将这段技术周报翻译成英文,并总结三个要点。”
- 可预设系统提示词:
你是一名专业的商务写作助手,帮助员工优化邮件和文档。
-
代码解释、注释与生成(通用代码,非公司私库)
- “解释这段 Python 代码在做什么。”
- “写一个 bash 脚本,把文件夹里所有 pdf 文件移到子目录 archive 里。”
- “把这段 SQL 查询改写得更简洁。”
- 注意:需在内部明确禁止将包含密钥、内部架构的业务代码粘贴进去,可仅用于通用语言/框架问答。
-
头脑风暴与创意辅助
- “我们的产品是一个内部HR系统,请给我10个提升员工参与度的功能点子。”
- “要为年会视频起一个搞笑又得体的标题,提供20个选项。”
- 这类场景不在乎事实准确性,只激发灵感,非常安全。
-
通用知识问答(公共知识范围)
- “解释一下什么是零知识证明。”
- “比较react和vue在性能优化上的主要差异。”
- 它本质上是一个增强版的搜索引擎,可以帮员工快速扫盲陌生技术概念。
-
文本摘要与信息提取(处理已提供的文本)
- “将下面这篇会议纪要(用户刚粘贴的一段文字)总结为3条行动项。”
- “从这段产品反馈中提取出所有提到的bug,列表输出。”
- 信息源由用户在上下文中直接提供,不存在私域知识幻觉风险。
-
语言翻译
- 内部文档、技术白皮书的多语言快速翻译,比传统翻译工具更懂术语语境。
给这个机器人的定位建议
你可以直接用一句系统提示词给它定个位:
你是一个安全、通用的内部AI助手。你可以帮忙润色文本、解释代码、翻译、头脑风暴和回答公共知识问题。对于任何需要公司内部数据、人事财务信息、或你不知道确切答案的问题,请礼貌地说明你无法访问内部信息,并引导用户咨询相关部门。
这样边界清晰,既不会产生幻觉风险,又能实实在在省掉员工很多“爬格子”的时间。
下面提供可直接运行的脚本,用 Python (asyncio + aiohttp) 实现,均适配 OpenAI 兼容的 Chat Completions 流式接口。脚本会模拟并发用户、支持 ramp-up、收集延迟与吞吐指标,并在测试结束后输出统计摘要。
前置准备
- 被测接口地址:
https://your-endpoint/v1/chat/completions - API Key:通过环境变量
API_KEY传入 - 模型名称:通过环境变量
MODEL传入(默认gpt-4o-mini) - 需安装工具:
- k6: 官方安装方式(https://k6.io/docs/get-started/installation/)
- Python:
pip install aiohttp
脚本二:Python 异步脚本 (perf_test.py)
提供完整的并发控制、ramp-up 和指标统计,可离线分析。
import asyncio
import aiohttp
import time
import json
import os
import statistics
from dataclasses import dataclass, field
from typing import List
# 环境变量
BASE_URL = os.getenv("BASE_URL", "https://api.openai.com")
API_KEY = os.getenv("API_KEY", "")
MODEL = os.getenv("MODEL", "gpt-4o-mini")
URL = f"{BASE_URL}/v1/chat/completions"
# 测试配置(可按需修改)
CONCURRENT_USERS = 10 # 最大并发数
RAMP_UP_TIME = 30 # 爬升时间(秒)
STEADY_TIME = 60 # 稳定运行时间(秒)
COOLDOWN_TIME = 30 # 退出时间(秒)
TIMEOUT = aiohttp.ClientTimeout(total=120) # 单个请求超时
# 模拟消息列表
MESSAGES = [
{"role": "user", "content": "用300字介绍人工智能"},
{"role": "user", "content": "你好,请解释什么是机器学习"},
{"role": "user", "content": "写一段Python快速排序代码"},
{"role": "user", "content": "总结一下公司年会致辞的要点:……"},
]
@dataclass
class RequestResult:
start_time: float
ttft: float = 0.0 # 首Token延迟 (秒)
total_time: float = 0.0 # 总时间
token_count: int = 0 # 输出Token数(近似)
success: bool = False
error_msg: str = ""
@dataclass
class AggregatedStats:
total_requests: int = 0
success_count: int = 0
fail_count: int = 0
ttft_values: List[float] = field(default_factory=list)
total_time_values: List[float] = field(default_factory=list)
tps_values: List[float] = field(default_factory=list)
def add(self, r: RequestResult):
self.total_requests += 1
if r.success:
self.success_count += 1
self.ttft_values.append(r.ttft)
self.total_time_values.append(r.total_time)
if r.total_time > r.ttft and r.token_count > 0:
self.tps_values.append(r.token_count / (r.total_time - r.ttft))
else:
self.fail_count += 1
def report(self):
print("\n================= 压测报告 =================")
print(f"总请求数: {self.total_requests}")
print(f"成功: {self.success_count}, 失败: {self.fail_count}")
if self.ttft_values:
print(f"首Token延迟 (TTFT, 秒):")
print(f" 平均: {statistics.mean(self.ttft_values):.2f}")
print(f" P50: {percentile(self.ttft_values, 50):.2f}")
print(f" P95: {percentile(self.ttft_values, 95):.2f}")
print(f" P99: {percentile(self.ttft_values, 99):.2f}")
if self.total_time_values:
print(f"完整响应时间 (秒):")
print(f" 平均: {statistics.mean(self.total_time_values):.2f}")
print(f" P95: {percentile(self.total_time_values, 95):.2f}")
if self.tps_values:
print(f"输出速率 (Tokens/秒):")
print(f" 平均: {statistics.mean(self.tps_values):.2f}")
print(f" P95: {percentile(self.tps_values, 95):.2f}")
print("=============================================")
def percentile(data: List[float], p: int):
sorted_data = sorted(data)
k = (len(sorted_data)-1) * p / 100.0
f = int(k)
c = k - f
if f+1 < len(sorted_data):
return sorted_data[f] * (1-c) + sorted_data[f+1] * c
else:
return sorted_data[f]
async def single_request(session, idx):
msg = MESSAGES[idx % len(MESSAGES)]
payload = {
"model": MODEL,
"messages": [msg],
"stream": True,
"max_tokens": 500,
"stream_options": {"include_usage": True}
}
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream",
}
result = RequestResult(start_time=time.time())
try:
async with session.post(URL, json=payload, headers=headers, timeout=TIMEOUT) as resp:
if resp.status != 200:
body = await resp.text()
result.error_msg = f"HTTP {resp.status}: {body[:200]}"
return result
first_token = True
token_count = 0
last_token_time = 0.0
async for raw_line in resp.content:
# 分割按行处理,注意 chunk 可能不完整
pass
# 由于 aiohttp 流式迭代不方便处理行边界,采用更稳健的方式:
# 重组缓冲区读取
# 这里改为使用 resp.content.iter_any()
buffer = b""
async for chunk in resp.content.iter_any():
if not chunk:
break
buffer += chunk
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
if line.startswith(b'data: '):
data = line[6:].strip()
if data == b'[DONE]':
break
try:
json_data = json.loads(data)
delta = json_data.get("choices", [{}])[0].get("delta", {}).get("content")
if delta:
now = time.time()
if first_token:
result.ttft = now - result.start_time
first_token = False
last_token_time = now
# 粗略 token 计数
token_count += len(delta) / 3
except:
pass
result.total_time = time.time() - result.start_time
result.token_count = int(token_count)
result.success = True
except Exception as e:
result.error_msg = str(e)[:200]
return result
async def worker(name, semaphore, stats, duration, start_event):
"""持续发送请求的单个 worker,并发控制由 semaphore 实现"""
await start_event.wait() # 同步开始时间
end_time = time.time() + duration
idx = 0
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
async with semaphore:
res = await single_request(session, idx)
stats.add(res)
idx += 1
# 可选:打印实时进度
# print(f"[{name}] 完成第{idx}个请求, TTFT={res.ttft:.2f}s")
async def main():
# 检查必要环境变量
if not API_KEY:
print("请设置环境变量 API_KEY")
return
stats = AggregatedStats()
semaphore = asyncio.Semaphore(CONCURRENT_USERS)
start_event = asyncio.Event()
print(f"开始压测: 最大并发={CONCURRENT_USERS}, 爬升时间={RAMP_UP_TIME}s, 稳定时间={STEADY_TIME}s")
# 创建所有 worker(等于并发数)
workers = []
for i in range(CONCURRENT_USERS):
# 每个 worker 的工作持续时间 = ramp_up + steady + cooldown (简化,全部相同)
duration = RAMP_UP_TIME + STEADY_TIME + COOLDOWN_TIME
w = asyncio.create_task(worker(f"worker-{i}", semaphore, stats, duration, start_event))
workers.append(w)
# ramp-up 逐步释放信号量
delay_per_worker = RAMP_UP_TIME / CONCURRENT_USERS
print(f"每个 worker 启动间隔 {delay_per_worker:.1f}s")
start_event.set() # 所有 worker 可以开始等待信号量
# 初始信号量可以设为0,然后逐个增加?不,我们直接让 worker 争用 semaphore,但控制 worker 任务开始的时间。
# 这里采用更简单的策略:按时间差逐个创建 worker 任务。
# 重新设计:不使用统一 start_event,而是逐个启动 worker。
workers = []
for i in range(CONCURRENT_USERS):
# 每个 worker 在自身持续时间内工作
duration = RAMP_UP_TIME + STEADY_TIME + COOLDOWN_TIME
worker_task = asyncio.create_task(worker(f"worker-{i}", semaphore, stats, duration, asyncio.Event()))
# 立即触发 start_event
# 但需要限制并发,通过在 worker 内部用 semaphore 控制
# 在 worker 开始前 sleep 实现 ramp-up
# 简单方法:让每个 worker 在开始前先睡眠一个偏移量
# 修改 worker 函数,增加一个偏移量参数
# 为了方便,修改为调用另一个函数
async def delayed_worker(idx):
sleep_time = idx * delay_per_worker
await asyncio.sleep(sleep_time)
# 然后就和之前 worker 一样
await worker(f"worker-{idx}", semaphore, stats, STEADY_TIME, asyncio.Event())
# 重新启动
stats = AggregatedStats()
semaphore = asyncio.Semaphore(CONCURRENT_USERS)
tasks = []
for i in range(CONCURRENT_USERS):
tasks.append(asyncio.create_task(delayed_worker(i)))
# 等待所有 worker 完成
await asyncio.gather(*tasks)
stats.report()
if __name__ == "__main__":
asyncio.run(main())
注意:脚本中的
delayed_worker实现了 ramp-up,每个 worker 延迟启动,并在内部持续请求固定时长(STEADY_TIME)。最后所有 worker 结束时间可能略有差异,但基本覆盖了爬升+稳定阶段。
Python 运行命令
export API_KEY="sk-xxx"
export BASE_URL="https://your-proxy.com"
python perf_test.py
结果解读与后续
- TTFT P95 < 2s 表示用户体验流畅;若超过需检查网络、模型服务负载。
- TPS(输出速率):若平均低于 20 tokens/s,文字生成会有明显卡顿。
- 错误率:出现 429 说明触发限流,需调低并发或增加账号配额;出现 5xx 说明服务端不稳定。
- 最大并发:可逐步提高
CONCURRENT_USERS重复测试,直到错误率上升或延迟恶化,找到系统瓶颈。
两个脚本均可集成到 CI/CD 流程中,并可将指标推送到 Prometheus/Grafana 做长期监控。提供简单的阈值告警即可保障内部机器人的性能水位。
更多推荐


所有评论(0)