明白了,你们现在的环境非常“纯粹”:只有一个裸的GPT模型(比如gpt-4o-mini)接口,没有挂载内部知识库,也没有对话历史管理。这种情况下,前面的完整评估框架里有些项(如工具调用、基于私域知识的准确性)可以暂时跳过,我们把焦点缩到两个直接诉求上:

  1. 通过脚本测性能,具体能测哪些?怎么测?
  2. 这种纯GPT聊天机器人在内部能找到哪些落地的应用场景?

一、通过脚本可以测试哪些性能?(附带实用指标)

脚本测试的核心,就是把这个GPT接口当成一个“黑盒HTTP服务”,用并发的方式模拟用户发消息,然后收集数据。

1. 首Token延迟(TTFT,Time To First Token)

  • 目标:从脚本发出请求,到服务端返回第一个有效内容(流式模式下第一个data: {"choices":[{"delta":{"content":"..."}}]})的时间。
  • 为什么重要:这是用户体感“快不快”的第一印象,超过2秒就会感到卡顿。
  • 脚本关键点:记录请求发出时间 T0 和收到首个delta的时间 T1,差值就是TTFT。要开启流式(stream: true)。

2. Token生成速率(TPS,Tokens Per Second)

  • 目标:模型每秒钟能吐出多少个token(近似于每秒钟输出多少个单词/汉字)。
  • 怎么算:在流式返回中,记录第一个token的时间 T1 和最后一个token的时间 T_end,用输出内容的总token数 除以 (T_end - T1)。注意不要包含首token前的等待时间。
  • 直观体感:TPS太低,文字就像在“挤牙膏”,阅读体验很差。好的体验通常要求在 30~50 tokens/秒 以上。

3. 端到端完整响应时间

  • 目标:从发完请求,到接收完整个回答的最后一个字符的时间。
  • 细分
    • 短问短答(如:“你好”“今天星期几”):看基础延迟。
    • 长文生成(如:“写一篇800字的产品介绍”):看流式持续时长是否在可接受范围。

4. 并发吞吐能力

  • 目标:系统能同时撑住多少个用户在“同时生成内容”。
  • 关键指标
    • 最大并发数:在错误率<1%的前提下,能同时维持的流式连接数。
    • 每秒处理请求数(RPS):压力工具逐步加大请求频率,观察何时开始出现排队、延迟飙升或429限流错误。

5. 限流策略的行为表现

  • 要观察的
    • 超出你的账号/部署的 RPM(每分钟请求数)TPM(每分钟Token数) 时,接口是立刻返回HTTP 429错误,还是返回一个友好的“系统繁忙”消息?
    • 返回429后,你的客户端脚本有没有合理的等待和重试?脚本可以故意触发限流,记录“被限流到成功恢复”的耗时。

6. 最大上下文长度与输入敏感性

  • 测试
    • 故意发送接近模型上下文窗口长度的长文本(比如gpt-4o-mini是128k),看会不会报错、超时或反应极慢。
    • 测试在超长输入下,首Token延迟是否显著恶化——这对后续场景(如长文总结、多轮对话)有重要影响。

脚本测试工具与最简单示例

推荐工具k6(轻量,脚本化)或 Python + asyncio + aiohttp(灵活)。

以一个Python异步压测脚本的伪逻辑为例:

import asyncio, aiohttp, time, json

async def stream_request(session, url, headers, payload):
    t_start = time.time()
    first_token_time = None
    token_count = 0
    async with session.post(url, json=payload, headers=headers) as resp:
        if resp.status != 200:
            return {"error": resp.status, "time_total": time.time()-t_start}
        async for line in resp.content:
            if line.startswith(b'data: '):
                data_str = line[6:].strip()
                if data_str == b'[DONE]':
                    break
                if first_token_time is None:
                    first_token_time = time.time()
                # 粗略计数,实际可解析content并计算token
                token_count += 1
    t_end = time.time()
    return {
        "ttft": first_token_time - t_start if first_token_time else None,
        "total_time": t_end - t_start,
        "tokens": token_count,
        "tps": token_count / (t_end - first_token_time) if first_token_time else 0
    }

async def main():
    # 构造用户消息,比如模拟不同长度的prompt
    prompts = ["你好", "请用300字介绍人工智能", ...]
    async with aiohttp.ClientSession() as session:
        tasks = [stream_request(session, URL, HEADERS, {"messages":[{"role":"user","content":p}], "stream": True}) for p in prompts]
        results = await asyncio.gather(*tasks)
    # 统计分析 results 中的 ttft、tps 等

收集完数据后,主要看这些百分位:P50、P95、P99,尤其关注P99长尾延迟。


二、没有内部知识,纯GPT在内部能有哪些应用场景?

因为没有内部知识库,它不适合回答“公司今年年假政策”这类私域问题(会瞎编)。它的价值在于**“通用语言能力”,可以定位为一个内部通用的文本助手**。

以下是几个高价值、风险可控的内部应用场景:

  1. 邮件与公文润色 / 风格转换

    • “帮我把这段汇报改得更正式/更有说服力。”
    • “将这段技术周报翻译成英文,并总结三个要点。”
    • 可预设系统提示词:你是一名专业的商务写作助手,帮助员工优化邮件和文档。
  2. 代码解释、注释与生成(通用代码,非公司私库)

    • “解释这段 Python 代码在做什么。”
    • “写一个 bash 脚本,把文件夹里所有 pdf 文件移到子目录 archive 里。”
    • “把这段 SQL 查询改写得更简洁。”
    • 注意:需在内部明确禁止将包含密钥、内部架构的业务代码粘贴进去,可仅用于通用语言/框架问答。
  3. 头脑风暴与创意辅助

    • “我们的产品是一个内部HR系统,请给我10个提升员工参与度的功能点子。”
    • “要为年会视频起一个搞笑又得体的标题,提供20个选项。”
    • 这类场景不在乎事实准确性,只激发灵感,非常安全。
  4. 通用知识问答(公共知识范围)

    • “解释一下什么是零知识证明。”
    • “比较react和vue在性能优化上的主要差异。”
    • 它本质上是一个增强版的搜索引擎,可以帮员工快速扫盲陌生技术概念。
  5. 文本摘要与信息提取(处理已提供的文本)

    • “将下面这篇会议纪要(用户刚粘贴的一段文字)总结为3条行动项。”
    • “从这段产品反馈中提取出所有提到的bug,列表输出。”
    • 信息源由用户在上下文中直接提供,不存在私域知识幻觉风险。
  6. 语言翻译

    • 内部文档、技术白皮书的多语言快速翻译,比传统翻译工具更懂术语语境。

给这个机器人的定位建议

你可以直接用一句系统提示词给它定个位:

你是一个安全、通用的内部AI助手。你可以帮忙润色文本、解释代码、翻译、头脑风暴和回答公共知识问题。对于任何需要公司内部数据、人事财务信息、或你不知道确切答案的问题,请礼貌地说明你无法访问内部信息,并引导用户咨询相关部门。

这样边界清晰,既不会产生幻觉风险,又能实实在在省掉员工很多“爬格子”的时间。

下面提供可直接运行的脚本,用 Python (asyncio + aiohttp) 实现,均适配 OpenAI 兼容的 Chat Completions 流式接口。脚本会模拟并发用户、支持 ramp-up、收集延迟与吞吐指标,并在测试结束后输出统计摘要。


前置准备

  1. 被测接口地址https://your-endpoint/v1/chat/completions
  2. API Key:通过环境变量 API_KEY 传入
  3. 模型名称:通过环境变量 MODEL 传入(默认 gpt-4o-mini
  4. 需安装工具:
    • k6: 官方安装方式(https://k6.io/docs/get-started/installation/)
    • Python: pip install aiohttp

脚本二:Python 异步脚本 (perf_test.py)

提供完整的并发控制、ramp-up 和指标统计,可离线分析。

import asyncio
import aiohttp
import time
import json
import os
import statistics
from dataclasses import dataclass, field
from typing import List

# 环境变量
BASE_URL = os.getenv("BASE_URL", "https://api.openai.com")
API_KEY = os.getenv("API_KEY", "")
MODEL = os.getenv("MODEL", "gpt-4o-mini")
URL = f"{BASE_URL}/v1/chat/completions"

# 测试配置(可按需修改)
CONCURRENT_USERS = 10        # 最大并发数
RAMP_UP_TIME = 30            # 爬升时间(秒)
STEADY_TIME = 60             # 稳定运行时间(秒)
COOLDOWN_TIME = 30           # 退出时间(秒)
TIMEOUT = aiohttp.ClientTimeout(total=120)  # 单个请求超时

# 模拟消息列表
MESSAGES = [
    {"role": "user", "content": "用300字介绍人工智能"},
    {"role": "user", "content": "你好,请解释什么是机器学习"},
    {"role": "user", "content": "写一段Python快速排序代码"},
    {"role": "user", "content": "总结一下公司年会致辞的要点:……"},
]

@dataclass
class RequestResult:
    start_time: float
    ttft: float = 0.0          # 首Token延迟 (秒)
    total_time: float = 0.0    # 总时间
    token_count: int = 0       # 输出Token数(近似)
    success: bool = False
    error_msg: str = ""

@dataclass
class AggregatedStats:
    total_requests: int = 0
    success_count: int = 0
    fail_count: int = 0
    ttft_values: List[float] = field(default_factory=list)
    total_time_values: List[float] = field(default_factory=list)
    tps_values: List[float] = field(default_factory=list)
    
    def add(self, r: RequestResult):
        self.total_requests += 1
        if r.success:
            self.success_count += 1
            self.ttft_values.append(r.ttft)
            self.total_time_values.append(r.total_time)
            if r.total_time > r.ttft and r.token_count > 0:
                self.tps_values.append(r.token_count / (r.total_time - r.ttft))
        else:
            self.fail_count += 1

    def report(self):
        print("\n================= 压测报告 =================")
        print(f"总请求数: {self.total_requests}")
        print(f"成功: {self.success_count}, 失败: {self.fail_count}")
        if self.ttft_values:
            print(f"首Token延迟 (TTFT, 秒):")
            print(f"  平均: {statistics.mean(self.ttft_values):.2f}")
            print(f"  P50: {percentile(self.ttft_values, 50):.2f}")
            print(f"  P95: {percentile(self.ttft_values, 95):.2f}")
            print(f"  P99: {percentile(self.ttft_values, 99):.2f}")
        if self.total_time_values:
            print(f"完整响应时间 (秒):")
            print(f"  平均: {statistics.mean(self.total_time_values):.2f}")
            print(f"  P95: {percentile(self.total_time_values, 95):.2f}")
        if self.tps_values:
            print(f"输出速率 (Tokens/秒):")
            print(f"  平均: {statistics.mean(self.tps_values):.2f}")
            print(f"  P95: {percentile(self.tps_values, 95):.2f}")
        print("=============================================")

def percentile(data: List[float], p: int):
    sorted_data = sorted(data)
    k = (len(sorted_data)-1) * p / 100.0
    f = int(k)
    c = k - f
    if f+1 < len(sorted_data):
        return sorted_data[f] * (1-c) + sorted_data[f+1] * c
    else:
        return sorted_data[f]

async def single_request(session, idx):
    msg = MESSAGES[idx % len(MESSAGES)]
    payload = {
        "model": MODEL,
        "messages": [msg],
        "stream": True,
        "max_tokens": 500,
        "stream_options": {"include_usage": True}
    }
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json",
        "Accept": "text/event-stream",
    }
    result = RequestResult(start_time=time.time())
    try:
        async with session.post(URL, json=payload, headers=headers, timeout=TIMEOUT) as resp:
            if resp.status != 200:
                body = await resp.text()
                result.error_msg = f"HTTP {resp.status}: {body[:200]}"
                return result
            first_token = True
            token_count = 0
            last_token_time = 0.0
            async for raw_line in resp.content:
                # 分割按行处理,注意 chunk 可能不完整
                pass
            # 由于 aiohttp 流式迭代不方便处理行边界,采用更稳健的方式:
            # 重组缓冲区读取
            # 这里改为使用 resp.content.iter_any()
            buffer = b""
            async for chunk in resp.content.iter_any():
                if not chunk:
                    break
                buffer += chunk
                while b'\n' in buffer:
                    line, buffer = buffer.split(b'\n', 1)
                    if line.startswith(b'data: '):
                        data = line[6:].strip()
                        if data == b'[DONE]':
                            break
                        try:
                            json_data = json.loads(data)
                            delta = json_data.get("choices", [{}])[0].get("delta", {}).get("content")
                            if delta:
                                now = time.time()
                                if first_token:
                                    result.ttft = now - result.start_time
                                    first_token = False
                                last_token_time = now
                                # 粗略 token 计数
                                token_count += len(delta) / 3
                        except:
                            pass
            result.total_time = time.time() - result.start_time
            result.token_count = int(token_count)
            result.success = True
    except Exception as e:
        result.error_msg = str(e)[:200]
    return result

async def worker(name, semaphore, stats, duration, start_event):
    """持续发送请求的单个 worker,并发控制由 semaphore 实现"""
    await start_event.wait()  # 同步开始时间
    end_time = time.time() + duration
    idx = 0
    async with aiohttp.ClientSession() as session:
        while time.time() < end_time:
            async with semaphore:
                res = await single_request(session, idx)
                stats.add(res)
                idx += 1
                # 可选:打印实时进度
                # print(f"[{name}] 完成第{idx}个请求, TTFT={res.ttft:.2f}s")

async def main():
    # 检查必要环境变量
    if not API_KEY:
        print("请设置环境变量 API_KEY")
        return

    stats = AggregatedStats()
    semaphore = asyncio.Semaphore(CONCURRENT_USERS)
    start_event = asyncio.Event()

    print(f"开始压测: 最大并发={CONCURRENT_USERS}, 爬升时间={RAMP_UP_TIME}s, 稳定时间={STEADY_TIME}s")
    # 创建所有 worker(等于并发数)
    workers = []
    for i in range(CONCURRENT_USERS):
        # 每个 worker 的工作持续时间 = ramp_up + steady + cooldown (简化,全部相同)
        duration = RAMP_UP_TIME + STEADY_TIME + COOLDOWN_TIME
        w = asyncio.create_task(worker(f"worker-{i}", semaphore, stats, duration, start_event))
        workers.append(w)

    # ramp-up 逐步释放信号量
    delay_per_worker = RAMP_UP_TIME / CONCURRENT_USERS
    print(f"每个 worker 启动间隔 {delay_per_worker:.1f}s")
    start_event.set()  # 所有 worker 可以开始等待信号量
    # 初始信号量可以设为0,然后逐个增加?不,我们直接让 worker 争用 semaphore,但控制 worker 任务开始的时间。
    # 这里采用更简单的策略:按时间差逐个创建 worker 任务。
    # 重新设计:不使用统一 start_event,而是逐个启动 worker。
    workers = []
    for i in range(CONCURRENT_USERS):
        # 每个 worker 在自身持续时间内工作
        duration = RAMP_UP_TIME + STEADY_TIME + COOLDOWN_TIME
        worker_task = asyncio.create_task(worker(f"worker-{i}", semaphore, stats, duration, asyncio.Event()))
        # 立即触发 start_event
        # 但需要限制并发,通过在 worker 内部用 semaphore 控制
        # 在 worker 开始前 sleep 实现 ramp-up
        # 简单方法:让每个 worker 在开始前先睡眠一个偏移量
        # 修改 worker 函数,增加一个偏移量参数

    # 为了方便,修改为调用另一个函数
    async def delayed_worker(idx):
        sleep_time = idx * delay_per_worker
        await asyncio.sleep(sleep_time)
        # 然后就和之前 worker 一样
        await worker(f"worker-{idx}", semaphore, stats, STEADY_TIME, asyncio.Event())

    # 重新启动
    stats = AggregatedStats()
    semaphore = asyncio.Semaphore(CONCURRENT_USERS)
    tasks = []
    for i in range(CONCURRENT_USERS):
        tasks.append(asyncio.create_task(delayed_worker(i)))
    # 等待所有 worker 完成
    await asyncio.gather(*tasks)

    stats.report()

if __name__ == "__main__":
    asyncio.run(main())

注意:脚本中的 delayed_worker 实现了 ramp-up,每个 worker 延迟启动,并在内部持续请求固定时长(STEADY_TIME)。最后所有 worker 结束时间可能略有差异,但基本覆盖了爬升+稳定阶段。

Python 运行命令

export API_KEY="sk-xxx"
export BASE_URL="https://your-proxy.com"
python perf_test.py

结果解读与后续

  • TTFT P95 < 2s 表示用户体验流畅;若超过需检查网络、模型服务负载。
  • TPS(输出速率):若平均低于 20 tokens/s,文字生成会有明显卡顿。
  • 错误率:出现 429 说明触发限流,需调低并发或增加账号配额;出现 5xx 说明服务端不稳定。
  • 最大并发:可逐步提高 CONCURRENT_USERS 重复测试,直到错误率上升或延迟恶化,找到系统瓶颈。

两个脚本均可集成到 CI/CD 流程中,并可将指标推送到 Prometheus/Grafana 做长期监控。提供简单的阈值告警即可保障内部机器人的性能水位。

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐