万字复盘:从单 Agent 到 Multi-Agent 架构迁移的血泪史
万字复盘:从单 Agent 到 Multi-Agent 架构迁移的血泪史——踩过的坑、学到的招、重构的道
摘要/引言
你是否有过这样的经历:一开始用一个简单的单 Agent 搞定了所有业务,用户反馈还不错;但随着业务量暴增、需求复杂度飙升,那个曾经“小巧可爱”的 Agent 逐渐变成了“巨无霸”——提示词长达上千字,代码耦合得像“意大利面”,加一个新功能要回归所有测试,响应时间从秒级变成分钟级,偶尔还会因为一个 API 超时就全系统崩溃?
我和我的团队就经历了这样的“噩梦”。我们的电商客服 Agent 从最初每天几百个简单咨询,发展到每天几十万次复杂交互——订单查询、物流追踪、退换货、退款、投诉建议……单 Agent 彻底扛不住了。于是我们踏上了从单 Agent 到 Multi-Agent 架构的迁移之路,这一路踩了无数坑:Agent 职责划分不清导致“三个和尚没水喝”,通信机制混乱导致“消息满天飞找不到北”,容错性差导致“一荣俱荣一损俱损”,开发复杂度飙升导致“每个 Agent 都是独立项目维护要命”,知识共享困难导致“同一个问题不同回答”……
但踩坑的同时,我们也摸索出了一套行之有效的解决方案:用 DDD 划分 Agent 边界,用异步消息队列做通信,用服务注册与发现+熔断机制做容错,用 Agent 抽象层+统一基础设施降低开发成本,用统一向量数据库实现知识共享……最终我们成功完成了迁移,系统的耦合度降低了 90%,扩展性提升了 10 倍,响应时间从 15 秒降到 1 秒以内,容错性也大大增强。
本文将用万字篇幅,复盘我们从单 Agent 到 Multi-Agent 迁移的完整过程——从问题背景、迁移前的准备,到迁移中的核心坑与解决方案,再到迁移后的架构设计、代码实现、最佳实践,最后展望行业未来趋势。希望我们的血泪史能帮你少走弯路!
一、为什么要迁移?——业务增长下的单 Agent 困境
在讲迁移之前,我们得先搞清楚:单 Agent 到底是什么?它在什么场景下好用?又为什么会在业务增长时“掉链子”?
1.1 单 Agent 的定义与适用场景
核心概念:单 Agent
单 Agent 是指处于一定环境中,能够自主感知环境、自主决策、自主执行动作,以实现单一目标的智能体。它的核心要素包括:
- 感知器:接收用户输入或环境信号(如文本、语音、API 响应);
- 推理器:基于预定义规则、知识库或 LLM 进行决策;
- 执行器:调用外部工具(如数据库、第三方 API)执行动作;
- 知识库:存储业务规则、历史数据等知识。
适用场景
单 Agent 非常适合以下场景:
- 业务逻辑简单:流程单一,不需要复杂的分支或协作;
- 并发量小:每天请求量在几千以内;
- 无跨域需求:不需要和其他系统或模块深度协作。
比如我们最初的电商客服单 Agent:
- 只处理“发货时间”“运费多少”“商品尺码”等 10 类简单问题;
- 用 LangChain 的
LLMChain实现,提示词只有 200 字; - 连接一个小型商品知识库,用 FAISS 做本地向量存储;
- 每天请求量 300-500,响应时间 0.5-1 秒,用户满意度 95%。
那时候的代码真的“优雅”:
# 最初的单 Agent 代码(简化版)
from langchain_openai import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
# 1. 加载知识库
embeddings = OpenAIEmbeddings()
vector_store = FAISS.load_local("product_knowledge", embeddings)
# 2. 定义提示词模板
prompt = PromptTemplate(
input_variables=["context", "question"],
template="""你是电商客服小助手,请根据以下知识库回答用户问题。
知识库:{context}
用户问题:{question}
回答:"""
)
# 3. 初始化 LLMChain
llm = OpenAI(temperature=0)
chain = LLMChain(llm=llm, prompt=prompt)
# 4. 处理用户请求
def handle_user_question(question):
docs = vector_store.similarity_search(question, k=3)
context = "\n".join([doc.page_content for doc in docs])
response = chain.run(context=context, question=question)
return response
# 测试
print(handle_user_question("这件衣服的发货时间是多久?"))
1.2 业务增长:单 Agent 的“至暗时刻”
好景不长,我们的电商平台业务爆发了:
- 用户量增长:从 10 万涨到 100 万;
- 请求量增长:从每天 500 涨到每天 50 万;
- 需求复杂度增长:用户不再只问简单问题,而是问“我买的衣服尺码不对,想换大一号,同时退掉另一个订单的裤子,退款能一起到账吗?”“我的快递显示签收但没收到,帮我查快递员电话,同时问问能不能补发?”
这时候,单 Agent 开始出现各种问题,我们的“噩梦”开始了。
问题 1:业务逻辑严重耦合,代码变成“意大利面”
为了处理复杂需求,我们不断往提示词里加规则:
- 加订单查询逻辑:“如果用户问订单,先调用订单 API 查询 order_id=xxx 的状态”;
- 加物流查询逻辑:“如果用户问物流,先从订单里拿到 tracking_number,再调用物流 API”;
- 加退换货逻辑:“如果用户申请退换货,先查订单是否在 7 天内,再查商品是否影响二次销售……”
到最后,提示词长达 2000 多字,像一本“小手册”;代码也从 50 行涨到 5000 行,所有逻辑耦合在一起:
- 退换货逻辑里直接调用了订单 API、物流 API、库存 API、支付 API;
- 加一个“积分兑换”功能,要改提示词、改订单查询代码、改支付代码;
- 三个人同时改同一个代码库,每周合并代码都要解决 10+ 冲突。
有一次,我们改了订单 API 的返回字段(把 status 改成了 order_status),结果忘记改退换货逻辑里的字段引用,导致所有退换货请求都失败——用户反馈“系统崩溃了”,我们紧急回滚,花了 2 小时才恢复。
问题 2:性能瓶颈:响应时间从 1 秒变成 15 秒
单 Agent 是串行处理所有逻辑的:
比如用户问“退换货+退款”,单 Agent 要做以下步骤:
- 调用订单 API 查询订单 1 的状态(2 秒);
- 调用订单 API 查询订单 2 的状态(2 秒);
- 调用物流 API 安排取件(3 秒);
- 调用库存 API 更新库存(2 秒);
- 调用支付 API 处理退款(4 秒);
- 生成回答(1 秒)。
串行下来,总响应时间是 14 秒——用户等得不耐烦,纷纷投诉“客服太慢了”。
更糟的是,并发量上来后,单 Agent 的 LLM 和 API 调用都成了瓶颈:
- LLM 的 RPM(每分钟请求数)有限制,50 并发下,很多请求要排队等 10 秒以上;
- 订单 API 的连接池满了,很多请求直接超时。
那段时间,我们的监控仪表盘上全是“红条”——错误率从 0.1% 涨到 20%,用户满意度从 95% 掉到 60%。
问题 3:容错性差:“一个错误,全系统崩溃”
单 Agent 没有任何容错机制:
- 调用物流 API 超时,整个 Agent 就直接报错“系统错误,请稍后再试”;
- 向量数据库挂了,所有需要知识库的请求都失败;
- 代码里有一个未捕获的异常,整个 Agent 进程就崩溃了,要手动重启。
有一次,第三方物流 API 因为维护停机了 2 小时——我们的电商客服系统也跟着“瘫痪”了 2 小时,用户只能打电话投诉,客服电话被打爆了。
问题 4:团队协作困难:“一个人扛所有,其他人插不上手”
最初只有一个开发负责这个单 Agent,后来业务复杂了,加了两个开发——但三个人根本没法协作:
- 所有逻辑都在一个代码库里,改一个地方可能影响所有功能;
- 测试要回归所有 100+ 测试用例,每次测试要花 4 小时;
- 老开发离职了,新开发要花 1 个月才能看懂代码。
团队成员都很痛苦:“这哪是开发,这是在‘修古董’!”
1.3 破局:为什么 Multi-Agent 是解决方案?
就在我们快要“崩溃”的时候,Multi-Agent 架构进入了我们的视野。我们先搞清楚:Multi-Agent 到底是什么?它能解决单 Agent 的哪些问题?
核心概念:Multi-Agent System(MAS)
Multi-Agent System 是指由多个相互作用的 Agent 组成的系统,这些 Agent 通过协作、竞争或协商来完成单个 Agent 无法完成的任务。它的核心要素包括:
- Agent 群体:多个职责单一的 Agent(如用户交互 Agent、订单管理 Agent、物流追踪 Agent);
- 环境:Agent 所处的外部环境(如数据库、第三方 API、用户);
- 交互机制:Agent 之间的通信(消息队列)和协调(合同网、拍卖);
- 协作模式:中心化(有中央控制 Agent)、去中心化(无中央控制)、混合式。
单 Agent vs Multi-Agent:核心属性对比
为了更直观地看到差异,我们做了一个对比表格:
| 核心属性 | 单 Agent | Multi-Agent |
|---|---|---|
| 耦合度 | 高,所有业务逻辑耦合在一起 | 低,每个 Agent 职责单一,松耦合 |
| 扩展性 | 差,加新功能要改整个 Agent | 好,加新功能只需加新 Agent 或修改现有 Agent |
| 容错性 | 差,一个错误导致全系统崩溃 | 好,一个 Agent 崩溃不影响其他 Agent,有容错机制 |
| 性能 | 串行处理,性能受限于单 Agent 能力 | 并行处理,性能可水平扩展 |
| 开发复杂度 | 低,适合简单业务 | 高,需要处理协作、通信、容错等问题 |
| 维护成本 | 高,业务复杂后代码难以维护 | 低,每个 Agent 独立维护 |
| 适用场景 | 业务逻辑简单、流程单一、并发量小 | 业务逻辑复杂、流程多样、并发量大、需要协作 |
我们为什么选 Multi-Agent?
基于对比,Multi-Agent 刚好能解决我们的所有痛点:
- 解耦:把“巨无霸”拆成多个职责单一的 Agent,每个 Agent 只负责自己的业务;
- 扩展:加新功能只需加新 Agent,不用改现有代码;
- 性能:多个 Agent 并行处理任务,响应时间大幅降低;
- 容错:一个 Agent 崩溃,其他 Agent 还能工作,有容错机制;
- 协作:团队成员可以各自负责一个 Agent,互不干扰。
就这样,我们下定决心:迁移到 Multi-Agent 架构!
二、迁移前的准备:踩过的认知坑与技术选型坑
“不打无准备之仗”——但我们一开始的准备工作做得并不好,踩了很多认知和技术选型的坑。
2.1 认知坑:我们对 Multi-Agent 的 3 个误解
误解 1:Multi-Agent 就是“把代码拆成几个函数,放在不同服务里”
我们一开始的想法很简单:“不就是拆分吗?把订单查询、物流追踪、退换货分别拆成 3 个服务,用 HTTP 调用不就行了?”
说干就干,我们花了 1 周时间拆分:
- 订单服务:提供
GET /api/order/{order_id}接口; - 物流服务:提供
GET /api/logistics/{tracking_number}接口; - 退换货服务:直接调用订单服务和物流服务的 HTTP 接口。
结果呢?这根本不是 Multi-Agent——只是“分布式的单 Agent”:
- 耦合度还是很高:退换货服务直接依赖订单服务的 API 接口,订单服务改了接口字段,退换货服务也要跟着改;
- 没有协作机制:退换货服务调用订单服务超时了,不知道该重试还是该降级;
- 没有容错机制:订单服务挂了,退换货服务也跟着挂。
那次尝试以失败告终,我们才意识到:Multi-Agent 不是简单的代码拆分,而是架构和思维的双重转变。
误解 2:Multi-Agent 必须用最新的 LLM 框架(比如 AutoGen)
AutoGen 刚出来的时候,我们很兴奋:“微软出的框架,肯定好用!”于是花了 2 周时间研究 AutoGen,跑通了“两个 Agent 协作写代码”的例子。
但当我们想把 AutoGen 用到电商场景时,发现了问题:
- 文档不完善:AutoGen 的文档很简单,很多高级功能(比如持久化、监控)没有示例;
- 社区支持不足:遇到问题搜 Stack Overflow,几乎没有相关回答;
- 业务场景不匹配:AutoGen 更适合开放式的协作(比如写代码、做研究),而我们的电商场景是结构化的业务流程,需要更可控的协作模式。
比如 AutoGen 的对话是“自然语言驱动”的:Agent A 说“帮我查一下订单 123 的状态”,Agent B 理解后去查——但我们的电商场景需要严格的消息格式,不能靠“自然语言理解”来处理订单查询,否则容易出错。
最后我们放弃了 AutoGen,决定自己开发一个轻量级的 Agent 框架——更可控,更适合我们的业务。
误解 3:Multi-Agent 的性能一定比单 Agent 好
我们一开始以为:“把任务拆成多个 Agent 并行处理,性能肯定提升 10 倍!”
但实际测试后发现:如果任务之间有依赖关系,并行处理反而会增加通信开销,性能还不如单 Agent。
比如退换货流程:
- 必须先查订单状态(依赖 1);
- 订单状态验证通过后,才能安排物流取件(依赖 2);
- 物流取件成功后,才能更新库存(依赖 3);
- 库存更新成功后,才能处理退款(依赖 4)。
这 4 个步骤是完全串行的,拆成 4 个 Agent 后,每个步骤都要通过消息队列通信——通信开销反而让总响应时间从 14 秒变成了 16 秒!
那次测试给了我们一个教训:不是所有任务都要并行,要根据任务的依赖关系合理安排 Agent 的协作顺序。
2.2 技术选型坑:从“盲目跟风”到“按需选择”
踩了认知坑后,我们开始认真做技术选型——但还是踩了不少坑。
坑 1:通信机制选了共享内存(Redis),并发高了数据冲突严重
我们一开始想:“Agent 之间要共享数据,用 Redis 做共享内存不就行了?”
于是我们用 Redis 做共享存储:
- 订单管理 Agent 把订单状态写入 Redis:
SET order:123 status "已发货"; - 物流追踪 Agent 从 Redis 读取订单状态:
GET order:123。
结果并发量上来后(500 QPS),Redis 的写冲突很严重:
- 订单管理 Agent 刚把
order:123的状态改成“已发货”; - 物流追踪 Agent 还没读取到,就给用户返回了“未发货”;
- 库存管理 Agent 又把状态改成“已入库”——数据彻底乱了!
而且 Redis 的数据一致性很难保证:没有事务的话,多个 Agent 同时写同一个 key,数据就会覆盖;有事务的话,性能又会下降 80%。
那次事故让我们损失惨重:有 1000+ 订单的状态错误,我们花了 3 天才修复完数据。
坑 2:通信机制选了同步 HTTP,并发量上不去
放弃 Redis 后,我们选了同步 HTTP 做 Agent 之间的通信——毕竟大家都熟悉 HTTP。
但同步 HTTP 有一个致命问题:阻塞执行。
比如用户交互 Agent 发送请求给订单管理 Agent 后,要一直等待响应——这段时间内,用户交互 Agent 不能处理其他用户的请求,并发量最多只能到 50。
而且同步 HTTP 的容错性很差:订单管理 Agent 响应慢了,用户交互 Agent 就会超时;订单管理 Agent 挂了,用户交互 Agent 就会报错。
我们做了压力测试:500 并发下,同步 HTTP 的错误率是 50%,响应时间是 30 秒——完全不能用!
最终选择:异步消息队列(RabbitMQ)+ Protobuf
踩了两个通信机制的坑后,我们终于找到了合适的方案:异步消息队列(RabbitMQ)+ Protobuf。
为什么选 RabbitMQ?
- 异步通信:Agent 发送消息后不用等待响应,继续处理其他任务;
- 消息持久化:交换机、队列、消息都可以持久化,RabbitMQ 重启后消息不丢失;
- 重试机制:支持死信队列(DLQ)和延迟队列,失败的消息可以重试;
- 路由灵活:支持直连交换机、主题交换机、扇出交换机,灵活路由消息;
- 社区成熟:文档完善,社区支持好,很多公司都在用。
为什么选 Protobuf?
- 高效:比 JSON 体积小 30-50%,序列化/反序列化速度快 3-10 倍;
- 类型安全:有严格的类型检查,不容易出错;
- 跨语言:支持 Python、Java、Go 等多种语言,方便以后扩展。
其他技术选型的最终方案
除了通信机制,我们还做了以下技术选型:
| 技术类别 | 最终选择 | 选择理由 |
|---|---|---|
| Agent 框架 | 自研轻量级框架(SimpleMAS) | 更可控,更适合电商结构化业务流程 |
| 服务注册与发现 | Consul | 支持健康检查、服务发现、KV 存储,功能全面 |
| 配置中心 | Apollo | 支持配置热更新、多环境、权限管理,国内公司用得多 |
| 向量数据库 | Chroma | 轻量级,易于部署,支持多种嵌入模型 |
| 缓存 | Redis | 性能好,支持多种数据结构,用于缓存热点数据 |
| 日志系统 | ELK Stack(Elasticsearch+Logstash+Kibana) | 功能全面,支持日志收集、存储、分析、可视化 |
| 监控系统 | Prometheus + Grafana | Prometheus 适合时序数据监控,Grafana 可视化效果好 |
| 链路追踪 | Jaeger | 兼容 OpenTelemetry,追踪效果好,易于部署 |
2.3 核心概念梳理:为迁移打下理论基础
在开始迁移之前,我们梳理了 Multi-Agent 的核心概念,画了 ER 图和交互关系图,让团队成员对系统有一个清晰的认识。
核心概念 ER 图(Mermaid)
我们定义了 5 个核心实体:Agent、Message、Task、Knowledge Base、Environment,它们之间的关系如下:
核心交互流程:电商退换货场景(Mermaid 序列图)
为了让团队成员更直观地理解 Agent 之间的协作,我们画了电商退换货场景的序列图:
三、迁移过程中的核心坑与解决方案:从理论到实践的踩坑实录
准备工作做好后,我们开始了正式的迁移——这一路踩的坑最多,但也学到了最多。
3.1 坑 1:Agent 职责划分不清——“三个和尚没水喝”
问题背景
我们一开始划分 Agent 的时候,是“按功能模块随便拆”的:
- 拆了一个“退换货 Agent”;
- 拆了一个“退款 Agent”;
- 拆了一个“订单管理 Agent”。
结果发现:
- “退款”既属于“退换货”的一部分,又属于“支付”的一部分——两个 Agent 都写了退款逻辑,重复开发;
- 用户问“退款什么时候到账”,退换货 Agent 推给退款 Agent,退款 Agent 又推给支付 Agent——职责不清;
- 退款规则更新了,要同时改退换货 Agent、退款 Agent、支付 Agent——维护成本高。
有一次,用户申请退货退款,退换货 Agent 处理了退货,但忘记通知退款 Agent——用户等了 3 天还没收到退款,投诉到了消协!
问题描述
核心问题是:没有明确的 Agent 边界,导致职责重叠、推诿扯皮、重复开发。
问题解决:用领域驱动设计(DDD)划分边界上下文
痛定思痛,我们决定用 领域驱动设计(DDD) 来划分 Agent 的边界——这是我们迁移过程中做的最正确的决定之一!
什么是 DDD 的边界上下文?
边界上下文(Bounded Context)是 DDD 中的核心概念:它是一个语义边界,边界内的模型(概念、规则、逻辑)是统一的、一致的;边界之间通过明确的接口交互。
简单来说:每个边界上下文对应一个独立的业务领域,每个 Agent 对应一个边界上下文。
我们的 DDD 工作坊:梳理核心领域
我们组织了一次 DDD 工作坊,邀请了产品经理、开发、测试、客服一起参与,用了 2 天时间梳理电商客服的核心领域。
梳理步骤:
- 收集用户故事:列出所有用户需求(比如“查询订单”“申请退换货”“查物流”);
- 识别核心概念:从用户故事中提取核心概念(比如“订单”“物流”“库存”“支付”);
- 划分边界上下文:把相关的概念和规则放在同一个边界上下文里;
- 定义上下文映射:明确边界上下文之间的交互方式。
最终的边界上下文与 Agent 映射
经过梳理,我们划分了 5 个核心边界上下文,每个对应一个 Agent:
| 边界上下文 | 对应 Agent | 核心职责 | 对外接口(消息类型) |
|---|---|---|---|
| 用户交互域(User Interaction) | 用户交互 Agent(UIA) | 1. 与用户交互;2. 识别用户意图;3. 拆解任务;4. 协调其他 Agent;5. 生成回答 | user_chat、query_order_response 等 |
| 订单域(Order) | 订单管理 Agent(OMA) | 1. 订单全生命周期管理;2. 退换货状态管理;3. 订单验证 | query_order、update_order、return_order |
| 物流域(Logistics) | 物流追踪 Agent(LTA) | 1. 物流全生命周期管理;2. 取件安排;3. 物流追踪 | query_logistics、arrange_pickup |
| 库存域(Inventory) | 库存管理 Agent(IMA) | 1. 库存查询;2. 库存更新;3. 库存预警 | query_inventory、update_inventory |
| 支付域(Payment) | 支付 Agent(PMA) | 1. 支付全生命周期管理;2. 退款处理;3. 对账 | query_payment、refund_request |
关键原则:Agent 职责单一,“高内聚、低耦合”
每个 Agent 只负责自己边界上下文内的事情:
- 高内聚:Agent 内部的逻辑是紧密相关的(比如订单管理 Agent 只做订单相关的事);
- 低耦合:Agent 之间只通过明确的消息接口交互,不直接依赖对方的代码(比如订单管理 Agent 不需要知道物流追踪 Agent 的内部实现,只需要知道
arrange_pickup消息的格式)。
边界与外延
- 边界:每个 Agent 的边界就是它对应的边界上下文——订单管理 Agent 不负责物流,物流追踪 Agent 不负责支付;
- 外延:如果以后要加新业务(比如营销),只需要加一个“营销管理 Agent”,和现有的 Agent 通过消息队列交互即可,不需要修改现有代码。
3.2 坑 2:通信机制混乱——“消息满天飞,找不到北”
问题背景
虽然我们选了 RabbitMQ + Protobuf,但一开始没有定义统一的消息规范:
- 每个 Agent 自己定义消息格式:订单管理 Agent 的
QueryOrderRequest是{"order_id": "123"},物流追踪 Agent 的QueryLogisticsRequest是{"tracking_number": "456", "user_id": "789"}; - 没有统一的消息头:不知道消息是谁发的、发给谁的、什么时候发的;
- 没有消息状态追踪:消息发出去后,不知道有没有被收到、有没有被处理、处理成功还是失败。
有一次,用户交互 Agent 发送了一个退款请求给支付 Agent——但消息因为 RabbitMQ 队列满了丢失了,用户交互 Agent 不知道,等了 1 小时没收到响应,才发现消息丢了!
问题描述
核心问题是:没有统一的消息规范和状态追踪机制,导致消息丢失、混乱、不可控。
问题解决:定义统一的消息协议 + 消息状态追踪
步骤 1:定义统一的 Protobuf 消息协议
我们定义了一个统一的 Protobuf 消息协议,分为消息头和消息体两部分:
syntax = "proto3";
package agent.message;
// 消息优先级
enum MessagePriority {
LOW = 0;
NORMAL = 1;
HIGH = 2;
}
// 消息状态
enum MessageStatus {
PENDING = 0;
SENDING = 1;
SENT = 2;
RECEIVED = 3;
PROCESSED = 4;
FAILED = 5;
}
// 统一消息头
message MessageHeader {
string message_id = 1; // 消息唯一标识(UUID)
string sender_id = 2; // 发送者 Agent ID
string receiver_id = 3; // 接收者 Agent ID
string message_type = 4; // 消息类型(如 query_order)
int64 timestamp = 5; // 时间戳(毫秒)
MessagePriority priority = 6; // 优先级
MessageStatus status = 7; // 消息状态
optional string response_to = 8; // 响应的消息 ID(如果是响应消息)
int32 retry_count = 9; // 重试次数
}
// 消息体:用 oneof 支持多种业务消息
message MessageBody {
oneof payload {
// 用户聊天相关
UserChatRequest user_chat_request = 1;
UserChatResponse user_chat_response = 2;
// 订单相关
QueryOrderRequest query_order_request = 3;
QueryOrderResponse query_order_response = 4;
// 物流相关
QueryLogisticsRequest query_logistics_request = 5;
QueryLogisticsResponse query_logistics_response = 6;
// 支付相关
RefundRequest refund_request = 7;
RefundResponse refund_response = 8;
// 知识查询相关
KnowledgeQueryRequest knowledge_query_request = 9;
KnowledgeQueryResponse knowledge_query_response = 10;
}
}
// 统一消息格式
message AgentMessage {
MessageHeader header = 1;
MessageBody body = 2;
}
// ------------------------------
// 业务消息定义(示例)
// ------------------------------
// 用户聊天请求
message UserChatRequest {
string user_id = 1;
string session_id = 2;
string message = 3;
string channel = 4; // web/app/wx
}
// 用户聊天响应
message UserChatResponse {
string user_id = 1;
string session_id = 2;
string response = 3;
}
// 订单查询请求
message QueryOrderRequest {
string order_id = 1;
string user_id = 2;
}
// 订单查询响应
message QueryOrderResponse {
string order_id = 1;
string user_id = 2;
string order_status = 3;
int64 created_at = 4;
optional string error_message = 5;
}
// 退款请求
message RefundRequest {
string order_id = 1;
string user_id = 2;
double amount = 3;
string reason = 4;
}
// 退款响应
message RefundResponse {
string order_id = 1;
string user_id = 2;
string refund_status = 3; // processing/success/failed
optional string error_message = 4;
}
步骤 2:设计 RabbitMQ 的交换机和队列架构
我们设计了一个清晰的 RabbitMQ 交换机和队列架构:
架构说明:
- agent.exchange:直连交换机,根据
receiver_id路由消息到对应的队列; - 业务队列:每个 Agent 有一个专属队列(如
uia.queue),设置了死信交换机agent.exchange.dlx; - 死信交换机(DLX):处理失败的消息;
- 延迟交换机:让失败的消息延迟一段时间后重新发送到业务队列(重试);
- 失败队列:重试超过阈值(如 3 次)的消息进入失败队列,人工介入处理。
步骤 3:实现消息状态追踪
我们用 Redis 实现了消息状态追踪:
- 发送消息时,把消息状态写入 Redis:
SET message:{message_id} "SENDING"; - 消息被接收时,更新状态:
SET message:{message_id} "RECEIVED"; - 消息被处理时,更新状态:
SET message:{message_id} "PROCESSED"; - 消息失败时,更新状态:
SET message:{message_id} "FAILED"; - 同时设置过期时间(24 小时),避免 Redis 内存溢出。
数学模型:消息队列的吞吐量与延迟
为了优化消息队列的性能,我们引入了两个数学模型:
1. 吞吐量模型
吞吐量(Throughput)是指单位时间内处理的消息数量,公式为:
Throughput=NT Throughput = \frac{N}{T} Throughput=TN
其中:
- NNN:处理的消息总数;
- TTT:处理时间。
优化措施:
- 增加 Agent 的并发消费者数量(比如订单管理 Agent 启动 5 个消费者);
- 优化消息大小(用 Protobuf 代替 JSON);
- 调整 RabbitMQ 的配置(比如增加队列长度、调整 QoS)。
2. 延迟模型
延迟(Latency)是指消息从发送到被处理的时间,公式为:
Latency=Tqueue+Ttransmit+Tprocess Latency = T_{queue} + T_{transmit} + T_{process} Latency=Tqueue+Ttransmit+Tprocess
其中:
- TqueueT_{queue}Tqueue:消息在队列中的等待时间;
- TtransmitT_{transmit}Ttransmit:消息的传输时间;
- TprocessT_{process}Tprocess:Agent 处理消息的时间。
优化措施:
- 用优先级队列处理高优先级消息(比如用户投诉);
- 优化 Agent 的处理逻辑(比如减少数据库查询,用 Redis 缓存热点数据);
- 用更快的网络(比如内网通信)。
算法流程图:消息消费的容错流程(Mermaid)
我们实现了一个完善的消息消费容错流程:
3.3 坑 3:容错性差——“一荣俱荣,一损俱损”
问题背景
虽然我们有了消息队列的重试机制,但还是不够:
- 订单管理 Agent 因为数据库连接池满了而崩溃——所有订单查询请求都失败了;
- 支付 Agent 的第三方 API 超时率达到 80%——但用户交互 Agent 还是不断给它发消息,导致它彻底“挂掉”;
- 没有服务发现——用户交互 Agent 只知道一个订单管理 Agent 的 IP,它挂了之后,用户交互 Agent 就找不到其他健康的实例了。
有一次,第三方支付 API 维护了 2 小时——我们的支付 Agent 收到了 10 万+ 退款请求,全部超时,最后支付 Agent 的进程崩溃了,消息队列里积压了 20 万+ 消息,花了 1 天才处理完!
问题描述
核心问题是:没有完善的服务治理机制(服务发现、健康检查、熔断、降级),导致故障扩散、系统雪崩。
问题解决:引入完整的服务治理机制
我们引入了 **Consul(服务
更多推荐
所有评论(0)