【learn-claude-code】S09AgentTeams - Agent 团队:一个 Lead + 多个 Teammate,收件箱通信
多Agent协作架构 本文介绍了基于"领导+团队成员"的多Agent协作系统设计。核心架构采用一个领导Agent(Lead)与多个成员Agent(Teammate)通过异步消息总线(MessageBus)进行协作的模式。系统实现了以下关键特性: 组件化重构:将系统划分为Agent入口、核心组件(循环、工具、团队管理等)和公共设施 状态管理:通过State接口和ThreadLoc
·
核心理念
“一个 Lead + 多个 Teammate,通过收件箱异步通信协作” – 多 Agent 协作的基础架构。
- 源码:https://github.com/xiayongchao/learn-claude-code-4j/blob/main/src/main/java/org/jc/agents/S09AgentTeams.java
- 原版:https://github.com/shareAI-lab/learn-claude-code
- 上篇:S08BackgroundTasks - 后台任务
上篇回顾
上篇文章我们实现了后台任务系统,让慢操作不阻塞 Agent 继续工作。
问题
之前所有 Agent 都是独立工作的,单一 Agent 无法:
- 并行处理多个任务
- 分工协作完成复杂目标
- 维护团队成员状态
我们需要:多 Agent 协作架构。
解决方案
┌─────────────────┐
│ Lead │ (领导:协调者)
│ (Agent Loop) │
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Teammate │ │ Teammate │ │ Teammate │
│ Alice │ │ Bob │ │ Charlie │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└──────────────┴──────────────┘
│
┌────────▼────────┐
│ MessageBus │ (消息总线)
│ (收件箱系统) │
└─────────────────┘
重构概览
从第9课开始,项目进行了全面的组件化重构:
src/main/java/org/jc/
├── agents/ # Agent 入口类
├── component/ # 核心组件
│ ├── loop/ # ReAct 循环
│ ├── tool/ # 工具集
│ ├── team/ # 团队管理
│ ├── state/ # 状态管理
│ ├── inbox/ # 消息收件箱
│ └── util/ # 工具类
└── Commons.java # 公共常量
依赖注入:Google Guice
使用 Guice 进行依赖管理:
public class S09AgentTeams extends AbstractModule {
@Override
protected void configure() {
// 领导工具
Multibinder<LeadTool> leadToolBinder = Multibinder.newSetBinder(binder(), LeadTool.class);
leadToolBinder.addBinding().to(BashTool.class);
leadToolBinder.addBinding().to(ReadFileTool.class);
// ...
// 队员工具
Multibinder<TeammateTool> teammateToolBinder = Multibinder.newSetBinder(binder(), TeammateTool.class);
// ...
// 核心服务
bind(OpenAIClient.class).toInstance(Commons.getClient());
bind(MessageBus.class).in(Singleton.class);
bind(Team.class).in(Singleton.class);
bind(ReActs.class).to(ReActsImpl.class);
}
}
核心组件详解
1. State 接口与 BaseState 基类
public interface State {
String getName();
String getRole();
String getModel();
String getPrompt();
String getWorkDir();
List<ChatCompletionMessageParam> getMessages();
ReentrantLock getShutdownLock();
ReentrantLock getPlanLock();
ReentrantLock getClaimTaskLock(); // 新增:任务认领锁
}
public class BaseState implements State {
protected String name;
protected String role;
protected String model;
protected String prompt;
protected String workDir;
protected List<ChatCompletionMessageParam> messages;
protected ReentrantLock shutdownLock;
protected ReentrantLock planLock;
protected ReentrantLock claimTaskLock;
// ... getter/setter
}
LeadState vs TeammateState:
| 属性 | LeadState | TeammateState |
|---|---|---|
| 继承 | BaseState | BaseState |
| messages | ✓ | ✓ |
| shutdown/idle | - | ✓ |
| maxLoopTimes | - | ✓ |
| 锁 | 3个共享 | 继承自 Lead |
2. States ThreadLocal 状态传递
public class States {
private static final ThreadLocal<State> CTX = new ThreadLocal<>();
public static void set(State state) { CTX.set(state); }
public static State get() { return CTX.get(); }
public static LeadState lead() { return (LeadState) get(); }
public static TeammateState teammate() { return (TeammateState) get(); }
public static void clear() { CTX.remove(); }
}
3. ReActs 接口
public interface ReActs {
ChatCompletionMessageParam start(LeadState state); // 领导同步执行
void start(TeammateState state); // 队员异步执行
}
4. ReActsImpl 实现
public class ReActsImpl implements ReActs {
// 线程池配置:核心5,最大10
private final ThreadPoolExecutor theadPools = new ThreadPoolExecutor(5, 10,
300, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50),
AiThreadFactory.create("loop", true));
private final LeadReAct leadReAct;
private final TeammateReAct teammateReAct;
public ChatCompletionMessageParam start(LeadState state) {
try {
States.set(state);
leadReAct.loop();
return States.lead().getLastMessage();
} finally {
States.clear();
}
}
public void start(TeammateState state) {
theadPools.submit(() -> {
try {
States.set(state);
teammateReAct.loop();
} finally {
States.clear();
}
});
}
}
5. FileUtils 工具类
public class FileUtils {
// 新增:创建目录路径
public static Path getOrCreateDirPath(Path path) throws IOException {
if (!FileUtils.exists(path)) {
Files.createDirectories(path);
}
return path;
}
// 签名变更:resolve(path, subPath, isFile, init)
// isFile: true=文件, false=目录
// init: true=不存在则创建
public static Path resolve(Path path, String subPath, boolean isFile, boolean init) throws IOException {
Path resolvePath = path.resolve(subPath);
if (init) {
if (isFile) {
return FileUtils.getOrCreateFilePath(resolvePath);
}
return FileUtils.getOrCreateDirPath(resolvePath);
}
return resolvePath;
}
}
6. MessageBus 消息总线
public class MessageBus {
private Path getInboxPath(String to) {
return FileUtils.resolve(workDir,
String.format("inbox/%s.jsonl", to), true, true); // true=文件, true=初始化
}
public void send(String to, InBoxMessage message) throws IOException {
Path inboxPath = this.getInboxPath(to);
FileUtils.write(inboxPath, JSON.toJSONString(message) + "\n");
}
public List<InBoxMessage> readInbox(String name, boolean clear) throws IOException {
List<InBoxMessage> messages = FileUtils.readList(inboxPath, InBoxMessage.class);
if (clear) {
FileUtils.clear(inboxPath); // 读取后清空
}
return messages;
}
}
7. Team 团队管理
public class Team {
public void writeTeamConfig(TeamConfig teamConfig) {
Path teamConfigPath = FileUtils.resolve(workDir, "team/config.json", true, true);
FileUtils.write(teamConfigPath, teamConfig);
}
public TeamConfig readTeamConfig() {
Path teamConfigPath = FileUtils.resolve(workDir, "team/config.json", true, true);
return FileUtils.read(teamConfigPath, TeamConfig.class);
}
public void setTeammateIdle() { /* 更新状态为 idle */ }
public void setTeammateShutdown() { /* 更新状态为 shutdown */ }
public void setTeammateWorking() { /* 更新状态为 working */ }
public String render() { /* 列出团队成员 */ }
public List<String> getTeammateNames() { /* 获取成员名称列表 */ }
}
配置文件 team/config.json:
{
"teamName": "default",
"teammates": [
{"name": "alice", "role": "programmer", "status": "working"},
{"name": "bob", "role": "tester", "status": "idle"}
]
}
8. Tool 工具体系
public interface Tool extends LeadTool, TeammateTool {
String name();
ChatCompletionTool definition();
String call(String arguments);
}
public abstract class BaseTool<T> implements Tool {
private final String name;
private final Class<T> tClass;
private final ChatCompletionTool definition;
@Override
public String call(String arguments) {
return this.doCall(JSON.parseObject(arguments, tClass));
}
public abstract String doCall(T arguments);
}
9. LeadReAct 领导循环
public class LeadReAct {
public void loop() {
List<ChatCompletionMessageParam> messages = States.get().getMessages();
while (true) {
this.readInbox(messages); // 读取收件箱
ChatCompletionAssistantMessageParam assistantMessage = this.chat(messages);
Optional<List<ChatCompletionMessageToolCall>> toolCallsOptional = assistantMessage.toolCalls();
if (toolCallsOptional.isEmpty()) break;
this.callTools(toolCallsOptional.get(), messages);
}
}
}
10. S09SpawnTeammateTool 创建队员
public class S09SpawnTeammateTool extends BaseTool<SpawnTeammateToolArgs> {
public String doCall(SpawnTeammateToolArgs arguments) {
TeammateState state = new TeammateState();
state.setName(name);
state.setRole(role);
state.setPrompt(String.format("你是 '%s', 角色: %s, 工作目录 %s...",
name, role, workDir));
state.setMaxLoopTimes(50);
this.teammate(name, role); // 注册到团队
this.reActs.start(state); // 异步启动
return String.format("创建 '%s' (角色: %s)", name, role);
}
}
执行流程图
1. Lead 调用 spawnTeammate("alice", "programmer", "编写代码")
│
▼
2. 创建 TeammateState,设置 name/role/prompt
│
▼
3. Team 注册 alice 到 team/config.json
│
▼
4. ReActs.start() 提交到线程池异步执行
│
▼
5. S09TeammateReAct.loop() 开始工作
│
▼
6. Alice 可通过 sendMessage 与 Lead 通信
新增工具清单
| 工具 | 所有者 | 功能 |
|---|---|---|
| spawnTeammate | Lead | 创建团队成员 |
| sendMessage | Both | 发送消息给队友 |
| readInbox | Both | 读取收件箱 |
| broadcast | Lead | 广播消息给所有队友 |
| listTeammates | Lead | 列出团队成员 |
相对 s08 的变更
| 组件 | s08 | s09 |
|---|---|---|
| Agent 模式 | 单 Agent | Lead + 多 Teammate |
| 通信方式 | - | MessageBus 收件箱 |
| 依赖注入 | 无 | Guice |
| 代码组织 | 单文件 | 组件化 |
| 执行方式 | 同步 | 同步 + 异步线程池 |
| 状态管理 | - | BaseState + 三把锁 |
| 线程池 | - | 核心5,最大10 |
试试看
生成 alice(程序员)和 bob(测试人员)让 alice 给 bob 发送一条消息 "你好 Bob"向所有队友广播 "状态更新:第一阶段已完成"查看收件箱中的所有消息
核心要义
“One Lead + Multiple Teammates, communicating via async inbox”
领导协调,队员协作,收件箱异步通信
设计原则:
- 组件化:职责分离,易于扩展
- ThreadLocal:跨线程传递状态
- 线程池:队员异步并行工作(核心5,最大10)
- 文件存储:团队配置持久化
- 锁机制:BaseState 提供 shutdownLock/planLock/claimTaskLock
更多推荐



所有评论(0)