万字复盘:从单 Agent 到 Multi-Agent 架构迁移的血泪史——踩过的坑、学到的招、重构的道

摘要/引言

你是否有过这样的经历:一开始用一个简单的单 Agent 搞定了所有业务,用户反馈还不错;但随着业务量暴增、需求复杂度飙升,那个曾经“小巧可爱”的 Agent 逐渐变成了“巨无霸”——提示词长达上千字,代码耦合得像“意大利面”,加一个新功能要回归所有测试,响应时间从秒级变成分钟级,偶尔还会因为一个 API 超时就全系统崩溃?

我和我的团队就经历了这样的“噩梦”。我们的电商客服 Agent 从最初每天几百个简单咨询,发展到每天几十万次复杂交互——订单查询、物流追踪、退换货、退款、投诉建议……单 Agent 彻底扛不住了。于是我们踏上了从单 Agent 到 Multi-Agent 架构的迁移之路,这一路踩了无数坑:Agent 职责划分不清导致“三个和尚没水喝”,通信机制混乱导致“消息满天飞找不到北”,容错性差导致“一荣俱荣一损俱损”,开发复杂度飙升导致“每个 Agent 都是独立项目维护要命”,知识共享困难导致“同一个问题不同回答”……

但踩坑的同时,我们也摸索出了一套行之有效的解决方案:用 DDD 划分 Agent 边界,用异步消息队列做通信,用服务注册与发现+熔断机制做容错,用 Agent 抽象层+统一基础设施降低开发成本,用统一向量数据库实现知识共享……最终我们成功完成了迁移,系统的耦合度降低了 90%,扩展性提升了 10 倍,响应时间从 15 秒降到 1 秒以内,容错性也大大增强。

本文将用万字篇幅,复盘我们从单 Agent 到 Multi-Agent 迁移的完整过程——从问题背景、迁移前的准备,到迁移中的核心坑与解决方案,再到迁移后的架构设计、代码实现、最佳实践,最后展望行业未来趋势。希望我们的血泪史能帮你少走弯路!


一、为什么要迁移?——业务增长下的单 Agent 困境

在讲迁移之前,我们得先搞清楚:单 Agent 到底是什么?它在什么场景下好用?又为什么会在业务增长时“掉链子”?

1.1 单 Agent 的定义与适用场景

核心概念:单 Agent

单 Agent 是指处于一定环境中,能够自主感知环境、自主决策、自主执行动作,以实现单一目标的智能体。它的核心要素包括:

  • 感知器:接收用户输入或环境信号(如文本、语音、API 响应);
  • 推理器:基于预定义规则、知识库或 LLM 进行决策;
  • 执行器:调用外部工具(如数据库、第三方 API)执行动作;
  • 知识库:存储业务规则、历史数据等知识。
适用场景

单 Agent 非常适合以下场景:

  1. 业务逻辑简单:流程单一,不需要复杂的分支或协作;
  2. 并发量小:每天请求量在几千以内;
  3. 无跨域需求:不需要和其他系统或模块深度协作。

比如我们最初的电商客服单 Agent:

  • 只处理“发货时间”“运费多少”“商品尺码”等 10 类简单问题;
  • 用 LangChain 的 LLMChain 实现,提示词只有 200 字;
  • 连接一个小型商品知识库,用 FAISS 做本地向量存储;
  • 每天请求量 300-500,响应时间 0.5-1 秒,用户满意度 95%。

那时候的代码真的“优雅”:

# 最初的单 Agent 代码(简化版)
from langchain_openai import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

# 1. 加载知识库
embeddings = OpenAIEmbeddings()
vector_store = FAISS.load_local("product_knowledge", embeddings)

# 2. 定义提示词模板
prompt = PromptTemplate(
    input_variables=["context", "question"],
    template="""你是电商客服小助手,请根据以下知识库回答用户问题。
    知识库:{context}
    用户问题:{question}
    回答:"""
)

# 3. 初始化 LLMChain
llm = OpenAI(temperature=0)
chain = LLMChain(llm=llm, prompt=prompt)

# 4. 处理用户请求
def handle_user_question(question):
    docs = vector_store.similarity_search(question, k=3)
    context = "\n".join([doc.page_content for doc in docs])
    response = chain.run(context=context, question=question)
    return response

# 测试
print(handle_user_question("这件衣服的发货时间是多久?"))

1.2 业务增长:单 Agent 的“至暗时刻”

好景不长,我们的电商平台业务爆发了:

  • 用户量增长:从 10 万涨到 100 万;
  • 请求量增长:从每天 500 涨到每天 50 万;
  • 需求复杂度增长:用户不再只问简单问题,而是问“我买的衣服尺码不对,想换大一号,同时退掉另一个订单的裤子,退款能一起到账吗?”“我的快递显示签收但没收到,帮我查快递员电话,同时问问能不能补发?”

这时候,单 Agent 开始出现各种问题,我们的“噩梦”开始了。

问题 1:业务逻辑严重耦合,代码变成“意大利面”

为了处理复杂需求,我们不断往提示词里加规则:

  • 加订单查询逻辑:“如果用户问订单,先调用订单 API 查询 order_id=xxx 的状态”;
  • 加物流查询逻辑:“如果用户问物流,先从订单里拿到 tracking_number,再调用物流 API”;
  • 加退换货逻辑:“如果用户申请退换货,先查订单是否在 7 天内,再查商品是否影响二次销售……”

到最后,提示词长达 2000 多字,像一本“小手册”;代码也从 50 行涨到 5000 行,所有逻辑耦合在一起:

  • 退换货逻辑里直接调用了订单 API、物流 API、库存 API、支付 API;
  • 加一个“积分兑换”功能,要改提示词、改订单查询代码、改支付代码;
  • 三个人同时改同一个代码库,每周合并代码都要解决 10+ 冲突。

有一次,我们改了订单 API 的返回字段(把 status 改成了 order_status),结果忘记改退换货逻辑里的字段引用,导致所有退换货请求都失败——用户反馈“系统崩溃了”,我们紧急回滚,花了 2 小时才恢复。

问题 2:性能瓶颈:响应时间从 1 秒变成 15 秒

单 Agent 是串行处理所有逻辑的:
比如用户问“退换货+退款”,单 Agent 要做以下步骤:

  1. 调用订单 API 查询订单 1 的状态(2 秒);
  2. 调用订单 API 查询订单 2 的状态(2 秒);
  3. 调用物流 API 安排取件(3 秒);
  4. 调用库存 API 更新库存(2 秒);
  5. 调用支付 API 处理退款(4 秒);
  6. 生成回答(1 秒)。

串行下来,总响应时间是 14 秒——用户等得不耐烦,纷纷投诉“客服太慢了”。

更糟的是,并发量上来后,单 Agent 的 LLM 和 API 调用都成了瓶颈:

  • LLM 的 RPM(每分钟请求数)有限制,50 并发下,很多请求要排队等 10 秒以上;
  • 订单 API 的连接池满了,很多请求直接超时。

那段时间,我们的监控仪表盘上全是“红条”——错误率从 0.1% 涨到 20%,用户满意度从 95% 掉到 60%。

问题 3:容错性差:“一个错误,全系统崩溃”

单 Agent 没有任何容错机制:

  • 调用物流 API 超时,整个 Agent 就直接报错“系统错误,请稍后再试”;
  • 向量数据库挂了,所有需要知识库的请求都失败;
  • 代码里有一个未捕获的异常,整个 Agent 进程就崩溃了,要手动重启。

有一次,第三方物流 API 因为维护停机了 2 小时——我们的电商客服系统也跟着“瘫痪”了 2 小时,用户只能打电话投诉,客服电话被打爆了。

问题 4:团队协作困难:“一个人扛所有,其他人插不上手”

最初只有一个开发负责这个单 Agent,后来业务复杂了,加了两个开发——但三个人根本没法协作:

  • 所有逻辑都在一个代码库里,改一个地方可能影响所有功能;
  • 测试要回归所有 100+ 测试用例,每次测试要花 4 小时;
  • 老开发离职了,新开发要花 1 个月才能看懂代码。

团队成员都很痛苦:“这哪是开发,这是在‘修古董’!”

1.3 破局:为什么 Multi-Agent 是解决方案?

就在我们快要“崩溃”的时候,Multi-Agent 架构进入了我们的视野。我们先搞清楚:Multi-Agent 到底是什么?它能解决单 Agent 的哪些问题?

核心概念:Multi-Agent System(MAS)

Multi-Agent System 是指由多个相互作用的 Agent 组成的系统,这些 Agent 通过协作、竞争或协商来完成单个 Agent 无法完成的任务。它的核心要素包括:

  • Agent 群体:多个职责单一的 Agent(如用户交互 Agent、订单管理 Agent、物流追踪 Agent);
  • 环境:Agent 所处的外部环境(如数据库、第三方 API、用户);
  • 交互机制:Agent 之间的通信(消息队列)和协调(合同网、拍卖);
  • 协作模式:中心化(有中央控制 Agent)、去中心化(无中央控制)、混合式。
单 Agent vs Multi-Agent:核心属性对比

为了更直观地看到差异,我们做了一个对比表格:

核心属性 单 Agent Multi-Agent
耦合度 高,所有业务逻辑耦合在一起 低,每个 Agent 职责单一,松耦合
扩展性 差,加新功能要改整个 Agent 好,加新功能只需加新 Agent 或修改现有 Agent
容错性 差,一个错误导致全系统崩溃 好,一个 Agent 崩溃不影响其他 Agent,有容错机制
性能 串行处理,性能受限于单 Agent 能力 并行处理,性能可水平扩展
开发复杂度 低,适合简单业务 高,需要处理协作、通信、容错等问题
维护成本 高,业务复杂后代码难以维护 低,每个 Agent 独立维护
适用场景 业务逻辑简单、流程单一、并发量小 业务逻辑复杂、流程多样、并发量大、需要协作
我们为什么选 Multi-Agent?

基于对比,Multi-Agent 刚好能解决我们的所有痛点:

  1. 解耦:把“巨无霸”拆成多个职责单一的 Agent,每个 Agent 只负责自己的业务;
  2. 扩展:加新功能只需加新 Agent,不用改现有代码;
  3. 性能:多个 Agent 并行处理任务,响应时间大幅降低;
  4. 容错:一个 Agent 崩溃,其他 Agent 还能工作,有容错机制;
  5. 协作:团队成员可以各自负责一个 Agent,互不干扰。

就这样,我们下定决心:迁移到 Multi-Agent 架构!


二、迁移前的准备:踩过的认知坑与技术选型坑

“不打无准备之仗”——但我们一开始的准备工作做得并不好,踩了很多认知和技术选型的坑。

2.1 认知坑:我们对 Multi-Agent 的 3 个误解

误解 1:Multi-Agent 就是“把代码拆成几个函数,放在不同服务里”

我们一开始的想法很简单:“不就是拆分吗?把订单查询、物流追踪、退换货分别拆成 3 个服务,用 HTTP 调用不就行了?”

说干就干,我们花了 1 周时间拆分:

  • 订单服务:提供 GET /api/order/{order_id} 接口;
  • 物流服务:提供 GET /api/logistics/{tracking_number} 接口;
  • 退换货服务:直接调用订单服务和物流服务的 HTTP 接口。

结果呢?这根本不是 Multi-Agent——只是“分布式的单 Agent”:

  • 耦合度还是很高:退换货服务直接依赖订单服务的 API 接口,订单服务改了接口字段,退换货服务也要跟着改;
  • 没有协作机制:退换货服务调用订单服务超时了,不知道该重试还是该降级;
  • 没有容错机制:订单服务挂了,退换货服务也跟着挂。

那次尝试以失败告终,我们才意识到:Multi-Agent 不是简单的代码拆分,而是架构和思维的双重转变

误解 2:Multi-Agent 必须用最新的 LLM 框架(比如 AutoGen)

AutoGen 刚出来的时候,我们很兴奋:“微软出的框架,肯定好用!”于是花了 2 周时间研究 AutoGen,跑通了“两个 Agent 协作写代码”的例子。

但当我们想把 AutoGen 用到电商场景时,发现了问题:

  • 文档不完善:AutoGen 的文档很简单,很多高级功能(比如持久化、监控)没有示例;
  • 社区支持不足:遇到问题搜 Stack Overflow,几乎没有相关回答;
  • 业务场景不匹配:AutoGen 更适合开放式的协作(比如写代码、做研究),而我们的电商场景是结构化的业务流程,需要更可控的协作模式。

比如 AutoGen 的对话是“自然语言驱动”的:Agent A 说“帮我查一下订单 123 的状态”,Agent B 理解后去查——但我们的电商场景需要严格的消息格式,不能靠“自然语言理解”来处理订单查询,否则容易出错。

最后我们放弃了 AutoGen,决定自己开发一个轻量级的 Agent 框架——更可控,更适合我们的业务。

误解 3:Multi-Agent 的性能一定比单 Agent 好

我们一开始以为:“把任务拆成多个 Agent 并行处理,性能肯定提升 10 倍!”

但实际测试后发现:如果任务之间有依赖关系,并行处理反而会增加通信开销,性能还不如单 Agent

比如退换货流程:

  1. 必须先查订单状态(依赖 1);
  2. 订单状态验证通过后,才能安排物流取件(依赖 2);
  3. 物流取件成功后,才能更新库存(依赖 3);
  4. 库存更新成功后,才能处理退款(依赖 4)。

这 4 个步骤是完全串行的,拆成 4 个 Agent 后,每个步骤都要通过消息队列通信——通信开销反而让总响应时间从 14 秒变成了 16 秒!

那次测试给了我们一个教训:不是所有任务都要并行,要根据任务的依赖关系合理安排 Agent 的协作顺序

2.2 技术选型坑:从“盲目跟风”到“按需选择”

踩了认知坑后,我们开始认真做技术选型——但还是踩了不少坑。

坑 1:通信机制选了共享内存(Redis),并发高了数据冲突严重

我们一开始想:“Agent 之间要共享数据,用 Redis 做共享内存不就行了?”

于是我们用 Redis 做共享存储:

  • 订单管理 Agent 把订单状态写入 Redis:SET order:123 status "已发货"
  • 物流追踪 Agent 从 Redis 读取订单状态:GET order:123

结果并发量上来后(500 QPS),Redis 的写冲突很严重:

  • 订单管理 Agent 刚把 order:123 的状态改成“已发货”;
  • 物流追踪 Agent 还没读取到,就给用户返回了“未发货”;
  • 库存管理 Agent 又把状态改成“已入库”——数据彻底乱了!

而且 Redis 的数据一致性很难保证:没有事务的话,多个 Agent 同时写同一个 key,数据就会覆盖;有事务的话,性能又会下降 80%。

那次事故让我们损失惨重:有 1000+ 订单的状态错误,我们花了 3 天才修复完数据。

坑 2:通信机制选了同步 HTTP,并发量上不去

放弃 Redis 后,我们选了同步 HTTP 做 Agent 之间的通信——毕竟大家都熟悉 HTTP。

但同步 HTTP 有一个致命问题:阻塞执行
比如用户交互 Agent 发送请求给订单管理 Agent 后,要一直等待响应——这段时间内,用户交互 Agent 不能处理其他用户的请求,并发量最多只能到 50。

而且同步 HTTP 的容错性很差:订单管理 Agent 响应慢了,用户交互 Agent 就会超时;订单管理 Agent 挂了,用户交互 Agent 就会报错。

我们做了压力测试:500 并发下,同步 HTTP 的错误率是 50%,响应时间是 30 秒——完全不能用!

最终选择:异步消息队列(RabbitMQ)+ Protobuf

踩了两个通信机制的坑后,我们终于找到了合适的方案:异步消息队列(RabbitMQ)+ Protobuf

为什么选 RabbitMQ?
  1. 异步通信:Agent 发送消息后不用等待响应,继续处理其他任务;
  2. 消息持久化:交换机、队列、消息都可以持久化,RabbitMQ 重启后消息不丢失;
  3. 重试机制:支持死信队列(DLQ)和延迟队列,失败的消息可以重试;
  4. 路由灵活:支持直连交换机、主题交换机、扇出交换机,灵活路由消息;
  5. 社区成熟:文档完善,社区支持好,很多公司都在用。
为什么选 Protobuf?
  1. 高效:比 JSON 体积小 30-50%,序列化/反序列化速度快 3-10 倍;
  2. 类型安全:有严格的类型检查,不容易出错;
  3. 跨语言:支持 Python、Java、Go 等多种语言,方便以后扩展。
其他技术选型的最终方案

除了通信机制,我们还做了以下技术选型:

技术类别 最终选择 选择理由
Agent 框架 自研轻量级框架(SimpleMAS) 更可控,更适合电商结构化业务流程
服务注册与发现 Consul 支持健康检查、服务发现、KV 存储,功能全面
配置中心 Apollo 支持配置热更新、多环境、权限管理,国内公司用得多
向量数据库 Chroma 轻量级,易于部署,支持多种嵌入模型
缓存 Redis 性能好,支持多种数据结构,用于缓存热点数据
日志系统 ELK Stack(Elasticsearch+Logstash+Kibana) 功能全面,支持日志收集、存储、分析、可视化
监控系统 Prometheus + Grafana Prometheus 适合时序数据监控,Grafana 可视化效果好
链路追踪 Jaeger 兼容 OpenTelemetry,追踪效果好,易于部署

2.3 核心概念梳理:为迁移打下理论基础

在开始迁移之前,我们梳理了 Multi-Agent 的核心概念,画了 ER 图和交互关系图,让团队成员对系统有一个清晰的认识。

核心概念 ER 图(Mermaid)

我们定义了 5 个核心实体:Agent、Message、Task、Knowledge Base、Environment,它们之间的关系如下:

发送/接收

执行

关联

触发

包含

AGENT

string

agent_id

PK

Agent 唯一标识

string

agent_name

Agent 名称

string

agent_type

Agent 类型(用户交互/订单管理/物流追踪等)

string

status

Agent 状态(健康/异常/离线)

datetime

created_at

创建时间

datetime

updated_at

更新时间

MESSAGE

string

message_id

PK

消息唯一标识

string

sender_id

FK

发送者 Agent ID

string

receiver_id

FK

接收者 Agent ID

string

message_type

消息类型(查询/命令/通知/响应)

bytes

payload

Protobuf 序列化后的消息体

string

status

消息状态(待发送/发送中/已处理/失败)

int

retry_count

重试次数

datetime

created_at

创建时间

datetime

updated_at

更新时间

TASK

string

task_id

PK

任务唯一标识

string

agent_id

FK

执行任务的 Agent ID

string

task_type

任务类型(订单查询/物流追踪等)

bytes

input

任务输入

bytes

output

任务输出

string

status

任务状态(待执行/执行中/已完成/失败)

int

retry_count

重试次数

datetime

created_at

创建时间

datetime

updated_at

更新时间

KNOWLEDGE_BASE

string

kb_id

PK

知识库唯一标识

string

agent_id

FK

关联的 Agent ID(全局知识库为空)

string

kb_name

知识库名称

string

kb_type

知识库类型(结构化/非结构化/向量)

string

storage_path

存储路径

datetime

created_at

创建时间

datetime

updated_at

更新时间

ENVIRONMENT

string

env_id

PK

环境唯一标识

string

env_name

环境名称

json

env_state

环境状态

datetime

created_at

创建时间

datetime

updated_at

更新时间

核心交互流程:电商退换货场景(Mermaid 序列图)

为了让团队成员更直观地理解 Agent 之间的协作,我们画了电商退换货场景的序列图:

支付Agent 库存管理Agent 物流追踪Agent 订单管理Agent 消息队列(RabbitMQ) 统一向量知识库 用户交互Agent 用户 支付Agent 库存管理Agent 物流追踪Agent 订单管理Agent 消息队列(RabbitMQ) 统一向量知识库 用户交互Agent 用户 我买的衣服尺码不对,想换大一号,订单123;同时退掉裤子,订单456,退款199元 检索退换货规则 返回规则:7天内、不影响二次销售 用LLM识别意图、拆解任务 发送订单查询请求(OrderQueryRequest)- 订单123 发送订单查询请求(OrderQueryRequest)- 订单456 分发订单123查询请求 分发订单456查询请求 验证订单状态(已签收、在7天内) 订单123验证通过响应 订单456验证通过响应 转发订单123响应 转发订单456响应 好的,正在为您处理,请稍候... 发送换货取件请求(LogisticsPickupRequest)- 订单123 发送退货取件请求(LogisticsPickupRequest)- 订单456 分发订单123取件请求 分发订单456取件请求 安排快递员明天10点取件 取件安排响应 取件安排响应 转发取件响应 已安排快递员明天10点取件,请准备好商品 快递员取件成功 取件成功通知 取件成功通知 转发取件成功通知 更新订单123状态为“换货中” 更新订单456状态为“退货中” 库存更新请求(InventoryUpdateRequest)- 订单123 库存更新请求(InventoryUpdateRequest)- 订单456 分发订单123库存请求 分发订单456库存请求 增加衣服大一号库存,减少原尺码库存 增加裤子库存 库存更新响应 库存更新响应 转发库存响应 更新订单123状态为“待发货” 退款请求(RefundRequest)- 订单456 分发退款请求 处理退款199元 退款响应 转发退款响应 更新订单456状态为“已完成” 订单123发货通知 订单456退款完成通知 转发发货通知 转发退款通知 您的换货已发货,退款已到账,请查收!

三、迁移过程中的核心坑与解决方案:从理论到实践的踩坑实录

准备工作做好后,我们开始了正式的迁移——这一路踩的坑最多,但也学到了最多。

3.1 坑 1:Agent 职责划分不清——“三个和尚没水喝”

问题背景

我们一开始划分 Agent 的时候,是“按功能模块随便拆”的:

  • 拆了一个“退换货 Agent”;
  • 拆了一个“退款 Agent”;
  • 拆了一个“订单管理 Agent”。

结果发现:

  • “退款”既属于“退换货”的一部分,又属于“支付”的一部分——两个 Agent 都写了退款逻辑,重复开发
  • 用户问“退款什么时候到账”,退换货 Agent 推给退款 Agent,退款 Agent 又推给支付 Agent——职责不清
  • 退款规则更新了,要同时改退换货 Agent、退款 Agent、支付 Agent——维护成本高

有一次,用户申请退货退款,退换货 Agent 处理了退货,但忘记通知退款 Agent——用户等了 3 天还没收到退款,投诉到了消协!

问题描述

核心问题是:没有明确的 Agent 边界,导致职责重叠、推诿扯皮、重复开发

问题解决:用领域驱动设计(DDD)划分边界上下文

痛定思痛,我们决定用 领域驱动设计(DDD) 来划分 Agent 的边界——这是我们迁移过程中做的最正确的决定之一!

什么是 DDD 的边界上下文?

边界上下文(Bounded Context)是 DDD 中的核心概念:它是一个语义边界,边界内的模型(概念、规则、逻辑)是统一的、一致的;边界之间通过明确的接口交互

简单来说:每个边界上下文对应一个独立的业务领域,每个 Agent 对应一个边界上下文

我们的 DDD 工作坊:梳理核心领域

我们组织了一次 DDD 工作坊,邀请了产品经理、开发、测试、客服一起参与,用了 2 天时间梳理电商客服的核心领域。

梳理步骤:

  1. 收集用户故事:列出所有用户需求(比如“查询订单”“申请退换货”“查物流”);
  2. 识别核心概念:从用户故事中提取核心概念(比如“订单”“物流”“库存”“支付”);
  3. 划分边界上下文:把相关的概念和规则放在同一个边界上下文里;
  4. 定义上下文映射:明确边界上下文之间的交互方式。
最终的边界上下文与 Agent 映射

经过梳理,我们划分了 5 个核心边界上下文,每个对应一个 Agent:

边界上下文 对应 Agent 核心职责 对外接口(消息类型)
用户交互域(User Interaction) 用户交互 Agent(UIA) 1. 与用户交互;2. 识别用户意图;3. 拆解任务;4. 协调其他 Agent;5. 生成回答 user_chat、query_order_response 等
订单域(Order) 订单管理 Agent(OMA) 1. 订单全生命周期管理;2. 退换货状态管理;3. 订单验证 query_order、update_order、return_order
物流域(Logistics) 物流追踪 Agent(LTA) 1. 物流全生命周期管理;2. 取件安排;3. 物流追踪 query_logistics、arrange_pickup
库存域(Inventory) 库存管理 Agent(IMA) 1. 库存查询;2. 库存更新;3. 库存预警 query_inventory、update_inventory
支付域(Payment) 支付 Agent(PMA) 1. 支付全生命周期管理;2. 退款处理;3. 对账 query_payment、refund_request
关键原则:Agent 职责单一,“高内聚、低耦合”

每个 Agent 只负责自己边界上下文内的事情:

  • 高内聚:Agent 内部的逻辑是紧密相关的(比如订单管理 Agent 只做订单相关的事);
  • 低耦合:Agent 之间只通过明确的消息接口交互,不直接依赖对方的代码(比如订单管理 Agent 不需要知道物流追踪 Agent 的内部实现,只需要知道 arrange_pickup 消息的格式)。
边界与外延
  • 边界:每个 Agent 的边界就是它对应的边界上下文——订单管理 Agent 不负责物流,物流追踪 Agent 不负责支付;
  • 外延:如果以后要加新业务(比如营销),只需要加一个“营销管理 Agent”,和现有的 Agent 通过消息队列交互即可,不需要修改现有代码。

3.2 坑 2:通信机制混乱——“消息满天飞,找不到北”

问题背景

虽然我们选了 RabbitMQ + Protobuf,但一开始没有定义统一的消息规范

  • 每个 Agent 自己定义消息格式:订单管理 Agent 的 QueryOrderRequest{"order_id": "123"},物流追踪 Agent 的 QueryLogisticsRequest{"tracking_number": "456", "user_id": "789"}
  • 没有统一的消息头:不知道消息是谁发的、发给谁的、什么时候发的;
  • 没有消息状态追踪:消息发出去后,不知道有没有被收到、有没有被处理、处理成功还是失败。

有一次,用户交互 Agent 发送了一个退款请求给支付 Agent——但消息因为 RabbitMQ 队列满了丢失了,用户交互 Agent 不知道,等了 1 小时没收到响应,才发现消息丢了!

问题描述

核心问题是:没有统一的消息规范和状态追踪机制,导致消息丢失、混乱、不可控

问题解决:定义统一的消息协议 + 消息状态追踪
步骤 1:定义统一的 Protobuf 消息协议

我们定义了一个统一的 Protobuf 消息协议,分为消息头消息体两部分:

syntax = "proto3";

package agent.message;

// 消息优先级
enum MessagePriority {
  LOW = 0;
  NORMAL = 1;
  HIGH = 2;
}

// 消息状态
enum MessageStatus {
  PENDING = 0;
  SENDING = 1;
  SENT = 2;
  RECEIVED = 3;
  PROCESSED = 4;
  FAILED = 5;
}

// 统一消息头
message MessageHeader {
  string message_id = 1; // 消息唯一标识(UUID)
  string sender_id = 2; // 发送者 Agent ID
  string receiver_id = 3; // 接收者 Agent ID
  string message_type = 4; // 消息类型(如 query_order)
  int64 timestamp = 5; // 时间戳(毫秒)
  MessagePriority priority = 6; // 优先级
  MessageStatus status = 7; // 消息状态
  optional string response_to = 8; // 响应的消息 ID(如果是响应消息)
  int32 retry_count = 9; // 重试次数
}

// 消息体:用 oneof 支持多种业务消息
message MessageBody {
  oneof payload {
    // 用户聊天相关
    UserChatRequest user_chat_request = 1;
    UserChatResponse user_chat_response = 2;
    // 订单相关
    QueryOrderRequest query_order_request = 3;
    QueryOrderResponse query_order_response = 4;
    // 物流相关
    QueryLogisticsRequest query_logistics_request = 5;
    QueryLogisticsResponse query_logistics_response = 6;
    // 支付相关
    RefundRequest refund_request = 7;
    RefundResponse refund_response = 8;
    // 知识查询相关
    KnowledgeQueryRequest knowledge_query_request = 9;
    KnowledgeQueryResponse knowledge_query_response = 10;
  }
}

// 统一消息格式
message AgentMessage {
  MessageHeader header = 1;
  MessageBody body = 2;
}

// ------------------------------
// 业务消息定义(示例)
// ------------------------------

// 用户聊天请求
message UserChatRequest {
  string user_id = 1;
  string session_id = 2;
  string message = 3;
  string channel = 4; // web/app/wx
}

// 用户聊天响应
message UserChatResponse {
  string user_id = 1;
  string session_id = 2;
  string response = 3;
}

// 订单查询请求
message QueryOrderRequest {
  string order_id = 1;
  string user_id = 2;
}

// 订单查询响应
message QueryOrderResponse {
  string order_id = 1;
  string user_id = 2;
  string order_status = 3;
  int64 created_at = 4;
  optional string error_message = 5;
}

// 退款请求
message RefundRequest {
  string order_id = 1;
  string user_id = 2;
  double amount = 3;
  string reason = 4;
}

// 退款响应
message RefundResponse {
  string order_id = 1;
  string user_id = 2;
  string refund_status = 3; // processing/success/failed
  optional string error_message = 4;
}
步骤 2:设计 RabbitMQ 的交换机和队列架构

我们设计了一个清晰的 RabbitMQ 交换机和队列架构

消费者(Agent)

队列层

交换机层

生产者(Agent)

routing_key=uia

routing_key=oma

routing_key=lta

消息失败

消息失败

消息失败

延迟后

重试超过阈值

用户交互Agent

订单管理Agent

物流追踪Agent

agent.exchange
(直连交换机)

agent.exchange.dlx
(死信交换机)

agent.exchange.delay
(延迟交换机)

uia.queue
(用户交互Agent队列)

oma.queue
(订单管理Agent队列)

lta.queue
(物流追踪Agent队列)

agent.queue.dlq
(死信队列)

agent.queue.delay
(延迟队列)

agent.queue.failed
(失败队列)

用户交互Agent

订单管理Agent

物流追踪Agent

架构说明:

  1. agent.exchange:直连交换机,根据 receiver_id 路由消息到对应的队列;
  2. 业务队列:每个 Agent 有一个专属队列(如 uia.queue),设置了死信交换机 agent.exchange.dlx
  3. 死信交换机(DLX):处理失败的消息;
  4. 延迟交换机:让失败的消息延迟一段时间后重新发送到业务队列(重试);
  5. 失败队列:重试超过阈值(如 3 次)的消息进入失败队列,人工介入处理。
步骤 3:实现消息状态追踪

我们用 Redis 实现了消息状态追踪

  • 发送消息时,把消息状态写入 Redis:SET message:{message_id} "SENDING"
  • 消息被接收时,更新状态:SET message:{message_id} "RECEIVED"
  • 消息被处理时,更新状态:SET message:{message_id} "PROCESSED"
  • 消息失败时,更新状态:SET message:{message_id} "FAILED"
  • 同时设置过期时间(24 小时),避免 Redis 内存溢出。
数学模型:消息队列的吞吐量与延迟

为了优化消息队列的性能,我们引入了两个数学模型:

1. 吞吐量模型

吞吐量(Throughput)是指单位时间内处理的消息数量,公式为:
Throughput=NT Throughput = \frac{N}{T} Throughput=TN
其中:

  • NNN:处理的消息总数;
  • TTT:处理时间。

优化措施:

  • 增加 Agent 的并发消费者数量(比如订单管理 Agent 启动 5 个消费者);
  • 优化消息大小(用 Protobuf 代替 JSON);
  • 调整 RabbitMQ 的配置(比如增加队列长度、调整 QoS)。
2. 延迟模型

延迟(Latency)是指消息从发送到被处理的时间,公式为:
Latency=Tqueue+Ttransmit+Tprocess Latency = T_{queue} + T_{transmit} + T_{process} Latency=Tqueue+Ttransmit+Tprocess
其中:

  • TqueueT_{queue}Tqueue:消息在队列中的等待时间;
  • TtransmitT_{transmit}Ttransmit:消息的传输时间;
  • TprocessT_{process}Tprocess:Agent 处理消息的时间。

优化措施:

  • 用优先级队列处理高优先级消息(比如用户投诉);
  • 优化 Agent 的处理逻辑(比如减少数据库查询,用 Redis 缓存热点数据);
  • 用更快的网络(比如内网通信)。
算法流程图:消息消费的容错流程(Mermaid)

我们实现了一个完善的消息消费容错流程:

Agent 从队列获取消息

消息格式是否正确?

记录日志,拒绝消息
(不重新入队)

更新消息状态为 RECEIVED

处理消息

处理是否成功?

确认消息(ACK)

更新消息状态为 PROCESSED

重试次数是否超过阈值?

增加重试次数
更新消息状态为 FAILED

拒绝消息,重新入队到死信队列

延迟队列延迟后
重新发送到业务队列

拒绝消息,不重新入队

消息进入失败队列

通知人工介入处理

3.3 坑 3:容错性差——“一荣俱荣,一损俱损”

问题背景

虽然我们有了消息队列的重试机制,但还是不够:

  • 订单管理 Agent 因为数据库连接池满了而崩溃——所有订单查询请求都失败了;
  • 支付 Agent 的第三方 API 超时率达到 80%——但用户交互 Agent 还是不断给它发消息,导致它彻底“挂掉”;
  • 没有服务发现——用户交互 Agent 只知道一个订单管理 Agent 的 IP,它挂了之后,用户交互 Agent 就找不到其他健康的实例了。

有一次,第三方支付 API 维护了 2 小时——我们的支付 Agent 收到了 10 万+ 退款请求,全部超时,最后支付 Agent 的进程崩溃了,消息队列里积压了 20 万+ 消息,花了 1 天才处理完!

问题描述

核心问题是:没有完善的服务治理机制(服务发现、健康检查、熔断、降级),导致故障扩散、系统雪崩

问题解决:引入完整的服务治理机制

我们引入了 **Consul(服务

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐