【Dify精讲】第8章:Agent能力实现原理
Dify的Agent系统架构解析:揭秘让AI具备"手脚"的工程智慧。文章深入探究了Dify Agent的核心架构,重点分析了位于agent_chat/目录下的模块化设计,特别是Agent执行器的实现细节。该系统采用ReAct范式(推理-行动-观察循环),通过状态管理、工具调用限制和错误处理机制,实现了智能的任务分解和执行。执行器模块展示了如何协调LLM推理、工具调用和结果观察的
引言
还记得第一次看到 Dify 的 Agent 自动调用天气API、计算数学问题、搜索网络信息时,我内心的震撼吗?那种"AI真的在思考和行动"的感觉,让我这个老程序员都忍不住惊叹。
作为一个在AI领域深耕多年的工程师,我深知Agent系统的复杂性。它不仅要理解用户意图,还要学会选择合适的工具,制定执行计划,并根据结果动态调整策略。这背后的技术实现,远比表面看起来的"智能对话"复杂得多。
今天,让我们一起深入 Dify 的 Agent 系统,揭开这个让AI具备"手脚"的神秘面纱。从架构设计到工具调用,从推理引擎到自定义扩展,每一个细节都蕴含着深刻的工程智慧。
一、Agent架构设计
1.1 整体架构概览
打开 api/core/app/apps/agent_chat/
目录,你会发现 Dify Agent 的架构设计相当优雅:
agent_chat/
├── agent/ # Agent 核心逻辑
│ ├── agent_executor.py # Agent 执行器
│ ├── output_parser.py # 输出解析器
│ └── agent_loop.py # Agent 循环控制
├── app_runner.py # 应用运行器
├── app_config_manager.py # 配置管理器
└── generate/ # 生成逻辑
├── conversation_message_task.py
└── agent_chat_app_runner.py
这种架构体现了关注点分离的设计原则,每个模块都有明确的职责边界。
1.2 Agent执行器:大脑中枢
让我们从最核心的Agent执行器开始:
# api/core/app/apps/agent_chat/agent/agent_executor.py
class AgentExecutor:
"""Agent执行器 - Agent系统的核心控制器"""
def __init__(self,
app_config: AgentChatAppConfig,
model_config: ModelConfig,
tools: List[Tool],
memory: Optional[ConversationMemory] = None):
self.app_config = app_config
self.model_config = model_config
self.tools = {tool.name: tool for tool in tools}
self.memory = memory or ConversationMemory()
# 初始化推理引擎
self.reasoning_engine = self._create_reasoning_engine()
# 工具调用限制
self.max_iterations = app_config.agent.max_iteration
self.max_execution_time = app_config.agent.max_execution_time
# 状态管理
self.current_iteration = 0
self.execution_start_time = None
self.intermediate_steps = []
def run(self, query: str, **kwargs) -> AgentExecutorResult:
"""执行Agent推理和工具调用"""
logger.info(f"Agent开始执行查询: {query}")
self.execution_start_time = time.time()
self.current_iteration = 0
self.intermediate_steps = []
try:
# 1. 构建初始prompt
prompt = self._build_agent_prompt(query)
# 2. 进入Agent循环
result = self._agent_loop(prompt, query)
# 3. 构建最终结果
return AgentExecutorResult(
output=result.output,
intermediate_steps=self.intermediate_steps,
iterations=self.current_iteration,
execution_time=time.time() - self.execution_start_time
)
except AgentExecutionError as e:
logger.error(f"Agent执行失败: {str(e)}")
return AgentExecutorResult(
output=f"抱歉,执行过程中遇到错误: {str(e)}",
error=str(e),
intermediate_steps=self.intermediate_steps
)
def _agent_loop(self, prompt: str, original_query: str) -> AgentLoopResult:
"""Agent主循环 - 实现ReAct范式"""
while self.current_iteration < self.max_iterations:
# 检查执行时间限制
if time.time() - self.execution_start_time > self.max_execution_time:
raise AgentExecutionError("执行时间超限")
self.current_iteration += 1
logger.info(f"Agent循环第 {self.current_iteration} 轮")
# 1. 推理阶段 (Think)
llm_response = self._think(prompt)
# 2. 解析输出
parsed_output = self._parse_output(llm_response)
# 3. 判断是否需要行动
if parsed_output.is_final_answer:
logger.info("Agent得出最终答案")
return AgentLoopResult(
output=parsed_output.final_answer,
finish_reason="final_answer"
)
# 4. 行动阶段 (Act)
if parsed_output.tool_name:
action_result = self._act(
tool_name=parsed_output.tool_name,
tool_input=parsed_output.tool_input
)
# 5. 观察阶段 (Observe)
observation = self._observe(action_result)
# 6. 更新prompt用于下一轮推理
prompt = self._update_prompt_with_observation(
prompt, parsed_output, observation
)
# 记录中间步骤
self.intermediate_steps.append(AgentStep(
action=parsed_output,
observation=observation,
iteration=self.current_iteration
))
else:
logger.warning("Agent输出格式异常,尝试重新推理")
prompt = self._handle_invalid_output(prompt, llm_response)
# 达到最大迭代次数
raise AgentExecutionError(f"达到最大迭代次数 {self.max_iterations}")
设计亮点:
- ReAct范式:Think(推理) → Act(行动) → Observe(观察)的经典循环
- 状态管理:完整记录执行过程,便于调试和优化
- 安全控制:迭代次数和执行时间的双重限制
1.3 输出解析器:理解AI的"意图"
Agent的输出解析是整个系统的关键环节:
# api/core/app/apps/agent_chat/agent/output_parser.py
class AgentOutputParser:
"""Agent输出解析器"""
def __init__(self):
# 定义输出格式的正则表达式
self.thought_pattern = re.compile(r'Thought:\s*(.*?)(?=\n(?:Action|Final Answer))', re.DOTALL)
self.action_pattern = re.compile(r'Action:\s*(.*?)(?=\nAction Input)', re.DOTALL)
self.action_input_pattern = re.compile(r'Action Input:\s*(.*?)(?=\n(?:Observation|Thought|Final Answer|$))', re.DOTALL)
self.final_answer_pattern = re.compile(r'Final Answer:\s*(.*)', re.DOTALL)
def parse(self, llm_output: str) -> AgentAction:
"""解析LLM输出"""
llm_output = llm_output.strip()
try:
# 1. 尝试解析最终答案
final_answer_match = self.final_answer_pattern.search(llm_output)
if final_answer_match:
return AgentAction(
is_final_answer=True,
final_answer=final_answer_match.group(1).strip(),
thought=self._extract_thought(llm_output)
)
# 2. 解析工具调用
action_match = self.action_pattern.search(llm_output)
action_input_match = self.action_input_pattern.search(llm_output)
if action_match and action_input_match:
tool_name = action_match.group(1).strip()
tool_input_str = action_input_match.group(1).strip()
# 解析工具输入(可能是JSON格式)
tool_input = self._parse_tool_input(tool_input_str)
return AgentAction(
is_final_answer=False,
tool_name=tool_name,
tool_input=tool_input,
thought=self._extract_thought(llm_output)
)
# 3. 解析失败,尝试模糊匹配
return self._fuzzy_parse(llm_output)
except Exception as e:
logger.error(f"输出解析失败: {str(e)}")
raise OutputParsingError(f"无法解析Agent输出: {llm_output[:200]}...")
def _parse_tool_input(self, input_str: str) -> Union[str, dict]:
"""解析工具输入参数"""
input_str = input_str.strip()
# 尝试解析JSON
if input_str.startswith('{') and input_str.endswith('}'):
try:
return json.loads(input_str)
except json.JSONDecodeError:
logger.warning(f"JSON解析失败,使用原始字符串: {input_str}")
return input_str
# 尝试解析简单的key=value格式
if '=' in input_str and not input_str.startswith('"'):
try:
params = {}
for pair in input_str.split(','):
if '=' in pair:
key, value = pair.split('=', 1)
params[key.strip()] = value.strip().strip('"\'')
return params
except:
pass
return input_str
def _fuzzy_parse(self, llm_output: str) -> AgentAction:
"""模糊解析 - 处理格式不规范的输出"""
lines = llm_output.split('\n')
# 寻找可能的工具调用模式
for i, line in enumerate(lines):
line = line.strip().lower()
# 检查是否包含已知工具名称
for tool_name in self._get_available_tools():
if tool_name.lower() in line:
# 尝试提取参数
remaining_lines = lines[i+1:]
tool_input = self._extract_fuzzy_input(remaining_lines)
return AgentAction(
is_final_answer=False,
tool_name=tool_name,
tool_input=tool_input,
thought="模糊解析的工具调用"
)
# 无法识别工具调用,可能是想直接回答
return AgentAction(
is_final_answer=True,
final_answer=llm_output,
thought="无法识别明确的工具调用格式"
)
工程智慧:
- 多层次解析:从严格格式到模糊匹配,提高容错性
- JSON容错:支持多种参数格式,适应不同模型的输出习惯
- 错误处理:解析失败时提供有意义的错误信息
二、工具调用机制
2.1 工具抽象基类
Dify 的工具系统采用了经典的模板方法模式:
# api/core/tools/tool/builtin_tool.py
class BuiltinTool:
"""内置工具基类"""
def __init__(self):
self.identity = self.get_identity()
self.parameters = self.get_parameters()
def invoke(self, user_id: str, tool_parameters: dict) -> ToolInvokeMessage:
"""
调用工具
Args:
user_id: 用户ID
tool_parameters: 工具参数
Returns:
ToolInvokeMessage: 工具执行结果
"""
try:
# 1. 参数验证
self._validate_parameters(tool_parameters)
# 2. 权限检查
self._check_permissions(user_id)
# 3. 调用具体工具实现
result = self._invoke(user_id, tool_parameters)
# 4. 结果后处理
return self._post_process_result(result)
except ToolProviderCredentialValidationError as e:
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message=f"工具凭证验证失败: {str(e)}"
)
except Exception as e:
logger.error(f"工具调用异常: {str(e)}", exc_info=True)
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message=f"工具执行失败: {str(e)}"
)
def _invoke(self, user_id: str, tool_parameters: dict) -> ToolInvokeMessage:
"""
具体的工具实现逻辑 - 子类必须实现
"""
raise NotImplementedError
def _validate_parameters(self, parameters: dict):
"""参数验证"""
for param_name, param_config in self.parameters.items():
# 必填参数检查
if param_config.get('required', False) and param_name not in parameters:
raise ValueError(f"缺少必填参数: {param_name}")
# 参数类型检查
if param_name in parameters:
expected_type = param_config.get('type')
actual_value = parameters[param_name]
if not self._check_parameter_type(actual_value, expected_type):
raise ValueError(f"参数 {param_name} 类型错误,期望 {expected_type}")
def _check_parameter_type(self, value: any, expected_type: str) -> bool:
"""检查参数类型"""
type_mapping = {
'string': str,
'number': (int, float),
'boolean': bool,
'array': list,
'object': dict
}
expected_python_type = type_mapping.get(expected_type)
if expected_python_type:
return isinstance(value, expected_python_type)
return True
def get_identity(self) -> ToolIdentity:
"""获取工具身份信息 - 子类必须实现"""
raise NotImplementedError
def get_parameters(self) -> dict:
"""获取工具参数定义 - 子类必须实现"""
raise NotImplementedError
2.2 具体工具实现:以搜索工具为例
让我们看看一个具体的工具是如何实现的:
# api/core/tools/provider/builtin/searxng/tools/searxng_search.py
class SearxngSearchTool(BuiltinTool):
"""SearXNG搜索工具"""
def get_identity(self) -> ToolIdentity:
return ToolIdentity(
author="Dify",
name="searxng_search",
label=I18nObject(
en_US="SearXNG Search",
zh_Hans="SearXNG搜索"
),
icon="searxng.svg"
)
def get_parameters(self) -> dict:
return {
"query": {
"type": "string",
"required": True,
"label": I18nObject(
en_US="Search Query",
zh_Hans="搜索查询"
),
"human_description": I18nObject(
en_US="Enter search query",
zh_Hans="输入搜索查询"
),
"llm_description": "用于搜索的查询词"
},
"categories": {
"type": "string",
"required": False,
"default": "general",
"options": ["general", "news", "social media", "images", "videos"],
"label": I18nObject(
en_US="Search Categories",
zh_Hans="搜索类别"
)
},
"language": {
"type": "string",
"required": False,
"default": "auto",
"label": I18nObject(
en_US="Search Language",
zh_Hans="搜索语言"
)
}
}
def _invoke(self, user_id: str, tool_parameters: dict) -> ToolInvokeMessage:
"""执行搜索"""
query = tool_parameters.get('query', '')
categories = tool_parameters.get('categories', 'general')
language = tool_parameters.get('language', 'auto')
# 1. 获取搜索引擎配置
searxng_config = self._get_searxng_config()
# 2. 构建搜索请求
search_params = {
'q': query,
'categories': categories,
'format': 'json',
'language': language
}
try:
# 3. 发送搜索请求
response = requests.get(
f"{searxng_config['base_url']}/search",
params=search_params,
timeout=searxng_config.get('timeout', 10),
headers={'User-Agent': 'Dify-Agent/1.0'}
)
response.raise_for_status()
# 4. 解析搜索结果
search_data = response.json()
results = self._parse_search_results(search_data)
# 5. 格式化输出
formatted_results = self._format_results(results, query)
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message=formatted_results
)
except requests.RequestException as e:
logger.error(f"搜索请求失败: {str(e)}")
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message=f"搜索服务暂时不可用: {str(e)}"
)
def _parse_search_results(self, search_data: dict) -> List[dict]:
"""解析搜索结果"""
results = []
for item in search_data.get('results', [])[:10]: # 限制前10个结果
result = {
'title': item.get('title', ''),
'url': item.get('url', ''),
'content': item.get('content', ''),
'engine': item.get('engine', ''),
'score': item.get('score', 0)
}
# 清理HTML标签
result['title'] = self._clean_html(result['title'])
result['content'] = self._clean_html(result['content'])
results.append(result)
return results
def _format_results(self, results: List[dict], query: str) -> str:
"""格式化搜索结果"""
if not results:
return f"没有找到关于 '{query}' 的相关信息。"
formatted = f"搜索 '{query}' 的结果:\n\n"
for i, result in enumerate(results, 1):
formatted += f"{i}. **{result['title']}**\n"
formatted += f" URL: {result['url']}\n"
if result['content']:
# 截取内容摘要
content = result['content'][:200] + "..." if len(result['content']) > 200 else result['content']
formatted += f" 摘要: {content}\n"
formatted += "\n"
return formatted
def _clean_html(self, text: str) -> str:
"""清理HTML标签"""
import re
# 移除HTML标签
clean = re.compile('<.*?>')
return re.sub(clean, '', text).strip()
实现特点:
- 配置驱动:工具参数和元信息通过配置定义
- 错误处理:网络异常、解析错误的完善处理
- 结果格式化:友好的输出格式,便于Agent理解
2.3 工具管理器:统一调度中心
# api/core/tools/tool_manager.py
class ToolManager:
"""工具管理器"""
def __init__(self):
self.builtin_tools = {} # 内置工具
self.custom_tools = {} # 自定义工具
self.tool_providers = {} # 工具提供商
# 加载所有工具
self._load_builtin_tools()
self._load_custom_tools()
def get_tool(self, provider_name: str, tool_name: str) -> Optional[Tool]:
"""获取工具实例"""
tool_key = f"{provider_name}:{tool_name}"
# 先查找内置工具
if tool_key in self.builtin_tools:
return self.builtin_tools[tool_key]
# 再查找自定义工具
if tool_key in self.custom_tools:
return self.custom_tools[tool_key]
return None
def invoke_tool(self, user_id: str, provider_name: str, tool_name: str,
parameters: dict) -> ToolInvokeMessage:
"""调用工具"""
tool = self.get_tool(provider_name, tool_name)
if not tool:
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message=f"工具 {provider_name}:{tool_name} 不存在"
)
# 记录工具调用
self._log_tool_invocation(user_id, provider_name, tool_name, parameters)
try:
# 执行工具
start_time = time.time()
result = tool.invoke(user_id, parameters)
execution_time = time.time() - start_time
# 记录执行结果
self._log_tool_result(user_id, provider_name, tool_name,
execution_time, result.type)
return result
except Exception as e:
logger.error(f"工具调用异常: {str(e)}", exc_info=True)
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message=f"工具执行失败: {str(e)}"
)
def get_available_tools(self, user_id: str) -> List[ToolSummary]:
"""获取用户可用的工具列表"""
available_tools = []
# 检查内置工具
for tool_key, tool in self.builtin_tools.items():
if self._check_tool_permission(user_id, tool):
available_tools.append(ToolSummary(
provider=tool.identity.author,
name=tool.identity.name,
label=tool.identity.label,
description=tool.get_llm_description(),
parameters=tool.parameters
))
# 检查自定义工具
for tool_key, tool in self.custom_tools.items():
if self._check_tool_permission(user_id, tool):
available_tools.append(ToolSummary(
provider="custom",
name=tool.name,
label=tool.label,
description=tool.description,
parameters=tool.parameters
))
return available_tools
def _load_builtin_tools(self):
"""加载内置工具"""
builtin_tools_path = Path(__file__).parent / "provider" / "builtin"
for provider_path in builtin_tools_path.iterdir():
if provider_path.is_dir() and not provider_path.name.startswith('_'):
self._load_provider_tools(provider_path)
def _load_provider_tools(self, provider_path: Path):
"""加载特定提供商的工具"""
provider_name = provider_path.name
tools_path = provider_path / "tools"
if not tools_path.exists():
return
for tool_file in tools_path.glob("*.py"):
if tool_file.name.startswith('_'):
continue
try:
# 动态导入工具模块
module_name = f"core.tools.provider.builtin.{provider_name}.tools.{tool_file.stem}"
module = importlib.import_module(module_name)
# 查找工具类
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, BuiltinTool) and
attr != BuiltinTool):
# 实例化工具
tool_instance = attr()
tool_key = f"{provider_name}:{tool_instance.identity.name}"
self.builtin_tools[tool_key] = tool_instance
logger.info(f"加载内置工具: {tool_key}")
except Exception as e:
logger.error(f"加载工具失败 {tool_file}: {str(e)}")
三、ReAct框架实现
3.1 Prompt工程:Agent的"操作手册"
Agent的推理能力很大程度上依赖于精心设计的Prompt:
# api/core/app/apps/agent_chat/agent/structed_multi_dataset_router_agent.py
class StructuredMultiDatasetRouterAgent:
"""结构化多数据集路由Agent"""
def _get_agent_prompt(self, query: str, tools: List[Tool]) -> str:
"""构建Agent系统提示"""
# 1. 基础角色设定
system_prompt = """你是一个智能助手,能够使用多种工具来帮助用户解决问题。
你的工作流程是:
1. 仔细分析用户的问题
2. 思考需要使用什么工具来获取信息
3. 调用合适的工具
4. 基于工具返回的结果进行推理
5. 给出最终答案
重要规则:
- 必须按照指定格式输出你的思考过程和行动
- 如果工具调用失败,要尝试其他方法或承认无法完成
- 最终答案要准确、有用、友好"""
# 2. 工具描述
tools_description = self._build_tools_description(tools)
# 3. 输出格式说明
format_instructions = """
请严格按照以下格式输出:
Thought: 你对当前情况的思考和分析
Action: 要使用的工具名称
Action Input: 工具的输入参数(JSON格式)
接下来你会看到:
Observation: 工具执行的结果
然后你可以继续思考和行动,或者给出最终答案:
Thought: 基于观察结果的进一步思考
Final Answer: 你的最终回答
"""
# 4. 示例演示
examples = self._get_reasoning_examples()
# 5. 组装完整prompt
full_prompt = f"""{system_prompt}
可用工具:
{tools_description}
{format_instructions}
示例:
{examples}
现在开始:
用户问题: {query}
Thought:"""
return full_prompt
def _build_tools_description(self, tools: List[Tool]) -> str:
"""构建工具描述"""
descriptions = []
for tool in tools:
desc = f"""
工具名称: {tool.identity.name}
功能描述: {tool.get_llm_description()}
参数说明:"""
for param_name, param_config in tool.parameters.items():
required = "必填" if param_config.get('required', False) else "可选"
param_type = param_config.get('type', 'string')
param_desc = param_config.get('llm_description', '')
desc += f"\n - {param_name} ({param_type}, {required}): {param_desc}"
descriptions.append(desc)
return "\n".join(descriptions)
def _get_reasoning_examples(self) -> str:
"""获取推理示例"""
return """
示例1 - 搜索问题:
用户问题: 今天北京的天气怎么样?
Thought: 用户询问北京今天的天气情况,我需要使用天气查询工具来获取实时天气信息。
Action: weather_query
Action Input: {"location": "北京", "date": "today"}
Observation: 北京今天多云,气温15-25℃,东南风3级,湿度60%
Thought: 我已经获取了北京今天的天气信息,现在可以给出完整的回答。
Final Answer: 根据最新的天气信息,北京今天是多云天气,气温在15-25℃之间,东南风3级,湿度为60%。建议外出时可以穿轻薄的长袖衣物,无需担心降雨。
示例2 - 计算问题:
用户问题: 请帮我计算一下复利投资的收益
Thought: 用户想要计算复利投资收益,但没有提供具体的参数。我需要先询问相关信息。
Final Answer: 要计算复利投资收益,我需要知道以下信息:
1. 初始投资金额
2. 年利率
3. 投资期限
4. 复利计算频率(年复利/月复利等)
请提供这些信息,我就能帮您计算具体的收益了。
"""
3.2 推理引擎:思维链的实现
# api/core/app/apps/agent_chat/agent/agent_loop.py
class AgentLoop:
"""Agent推理循环控制器"""
def __init__(self, llm_client, output_parser, tool_manager):
self.llm_client = llm_client
self.output_parser = output_parser
self.tool_manager = tool_manager
# 推理控制参数
self.max_thought_length = 2000
self.reflection_threshold = 3 # 连续失败次数触发反思
self.current_failures = 0
def execute_reasoning_step(self, prompt: str, context: ReasoningContext) -> ReasoningResult:
"""执行单步推理"""
try:
# 1. 调用LLM进行推理
llm_response = self._call_llm_with_retry(prompt)
# 2. 解析输出
parsed_action = self.output_parser.parse(llm_response)
# 3. 验证输出合理性
if not self._validate_reasoning_output(parsed_action, context):
return self._handle_invalid_reasoning(parsed_action, context)
# 4. 执行行动(如果需要)
if not parsed_action.is_final_answer:
execution_result = self._execute_action(parsed_action, context)
return ReasoningResult(
action=parsed_action,
observation=execution_result,
success=True,
continue_reasoning=True
)
else:
return ReasoningResult(
action=parsed_action,
observation=None,
success=True,
continue_reasoning=False
)
except Exception as e:
logger.error(f"推理步骤执行失败: {str(e)}")
self.current_failures += 1
# 触发反思机制
if self.current_failures >= self.reflection_threshold:
return self._trigger_reflection(context, str(e))
return ReasoningResult(
action=None,
observation=f"推理失败: {str(e)}",
success=False,
continue_reasoning=True,
error=str(e)
)
def _validate_reasoning_output(self, action: AgentAction, context: ReasoningContext) -> bool:
"""验证推理输出的合理性"""
# 1. 检查思考内容长度
if action.thought and len(action.thought) > self.max_thought_length:
logger.warning("思考内容过长,可能存在循环推理")
return False
# 2. 检查工具调用的合理性
if not action.is_final_answer:
if not action.tool_name:
logger.warning("缺少工具名称")
return False
# 检查工具是否存在
available_tools = [tool.identity.name for tool in context.available_tools]
if action.tool_name not in available_tools:
logger.warning(f"尝试调用不存在的工具: {action.tool_name}")
return False
# 3. 检查是否陷入循环
if self._detect_reasoning_loop(action, context):
logger.warning("检测到推理循环")
return False
return True
def _detect_reasoning_loop(self, current_action: AgentAction, context: ReasoningContext) -> bool:
"""检测推理循环"""
if len(context.history) < 3:
return False
# 检查最近3步是否重复相同的行动
recent_actions = context.history[-3:]
current_signature = self._get_action_signature(current_action)
similar_count = 0
for historical_action in recent_actions:
if self._get_action_signature(historical_action) == current_signature:
similar_count += 1
return similar_count >= 2
def _get_action_signature(self, action: AgentAction) -> str:
"""获取行动签名用于循环检测"""
if action.is_final_answer:
return "final_answer"
else:
return f"{action.tool_name}:{str(action.tool_input)[:100]}"
def _trigger_reflection(self, context: ReasoningContext, error: str) -> ReasoningResult:
"""触发反思机制"""
logger.info("触发Agent反思机制")
reflection_prompt = f"""
你在解决问题时遇到了困难,让我们停下来反思一下:
原始问题: {context.original_query}
执行历史:
{self._format_execution_history(context.history)}
当前错误: {error}
请思考:
1. 你的方法是否正确?
2. 是否遗漏了什么重要信息?
3. 是否需要换一个角度或方法?
4. 你能否直接基于已有信息给出答案?
Thought:"""
try:
reflection_response = self.llm_client.chat([
{"role": "user", "content": reflection_prompt}
])
# 重置失败计数
self.current_failures = 0
return ReasoningResult(
action=AgentAction(
is_final_answer=False,
thought=reflection_response.strip(),
reflection=True
),
observation="已完成反思,准备调整策略",
success=True,
continue_reasoning=True
)
except Exception as e:
logger.error(f"反思机制执行失败: {str(e)}")
return ReasoningResult(
action=AgentAction(
is_final_answer=True,
final_answer="抱歉,我在处理这个问题时遇到了困难,无法继续执行。",
thought="反思失败,终止执行"
),
observation=None,
success=False,
continue_reasoning=False
)
3.3 记忆管理:上下文的艺术
# api/core/app/apps/agent_chat/agent/agent_memory.py
class AgentMemory:
"""Agent记忆管理器"""
def __init__(self, max_tokens: int = 8000, summary_ratio: float = 0.3):
self.max_tokens = max_tokens
self.summary_ratio = summary_ratio
# 存储不同类型的记忆
self.working_memory = [] # 工作记忆(当前会话)
self.episodic_memory = [] # 情景记忆(历史会话)
self.semantic_memory = {} # 语义记忆(知识提取)
# 记忆压缩器
self.memory_compressor = MemoryCompressor()
def add_interaction(self, query: str, response: str, intermediate_steps: List[AgentStep]):
"""添加交互记忆"""
interaction = {
'timestamp': datetime.utcnow(),
'query': query,
'response': response,
'steps': intermediate_steps,
'tokens': self._estimate_tokens(query + response)
}
self.working_memory.append(interaction)
# 检查是否需要压缩记忆
if self._get_total_tokens() > self.max_tokens:
self._compress_memory()
def get_relevant_context(self, current_query: str, max_context_tokens: int = 4000) -> str:
"""获取与当前查询相关的上下文"""
# 1. 计算查询与历史交互的相关性
relevant_interactions = self._find_relevant_interactions(current_query)
# 2. 构建上下文,优先包含最相关和最近的交互
context_parts = []
current_tokens = 0
for interaction in relevant_interactions:
interaction_text = self._format_interaction(interaction)
interaction_tokens = self._estimate_tokens(interaction_text)
if current_tokens + interaction_tokens <= max_context_tokens:
context_parts.append(interaction_text)
current_tokens += interaction_tokens
else:
break
return "\n\n".join(context_parts)
def _find_relevant_interactions(self, query: str) -> List[dict]:
"""查找相关的历史交互"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
if not self.working_memory:
return []
# 1. 构建文档向量
documents = [query] + [interaction['query'] for interaction in self.working_memory]
try:
vectorizer = TfidfVectorizer(stop_words='english', max_features=1000)
tfidf_matrix = vectorizer.fit_transform(documents)
# 2. 计算相似度
similarities = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:]).flatten()
# 3. 排序并返回
interactions_with_scores = list(zip(self.working_memory, similarities))
interactions_with_scores.sort(key=lambda x: (x[1], x[0]['timestamp']), reverse=True)
return [interaction for interaction, score in interactions_with_scores if score > 0.1]
except Exception as e:
logger.warning(f"相关性计算失败,使用时间排序: {str(e)}")
return sorted(self.working_memory, key=lambda x: x['timestamp'], reverse=True)
def _compress_memory(self):
"""压缩记忆"""
logger.info("开始压缩Agent记忆")
# 1. 保留最新的几个交互
recent_count = max(3, int(len(self.working_memory) * (1 - self.summary_ratio)))
recent_interactions = self.working_memory[-recent_count:]
old_interactions = self.working_memory[:-recent_count]
# 2. 压缩旧的交互
if old_interactions:
compressed_summary = self.memory_compressor.compress_interactions(old_interactions)
# 3. 更新记忆结构
self.episodic_memory.append({
'timestamp': datetime.utcnow(),
'summary': compressed_summary,
'interaction_count': len(old_interactions),
'time_range': (old_interactions[0]['timestamp'], old_interactions[-1]['timestamp'])
})
self.working_memory = recent_interactions
logger.info(f"记忆压缩完成,保留 {len(recent_interactions)} 个最新交互")
def extract_knowledge(self, interactions: List[dict]) -> dict:
"""从交互中提取知识"""
extracted_knowledge = {
'entities': set(), # 实体
'relationships': [], # 关系
'facts': [], # 事实
'preferences': [] # 用户偏好
}
for interaction in interactions:
# 简化的知识提取逻辑
query = interaction['query'].lower()
response = interaction['response'].lower()
# 提取实体(简单的关键词提取)
import re
entities = re.findall(r'\b[A-Z][a-z]+\b', interaction['query'] + ' ' + interaction['response'])
extracted_knowledge['entities'].update(entities)
# 提取偏好信息
if any(word in query for word in ['喜欢', '偏好', '习惯']):
extracted_knowledge['preferences'].append({
'statement': interaction['query'],
'timestamp': interaction['timestamp']
})
return extracted_knowledge
四、自定义Agent开发
4.1 Agent开发框架
# api/core/app/apps/agent_chat/agent/custom_agent_base.py
class CustomAgentBase:
"""自定义Agent基类"""
def __init__(self, config: AgentConfig):
self.config = config
self.tools = []
self.memory = AgentMemory()
self.metrics = AgentMetrics()
# 可覆盖的组件
self.reasoner = self.create_reasoner()
self.planner = self.create_planner()
self.executor = self.create_executor()
def create_reasoner(self) -> BaseReasoner:
"""创建推理器 - 子类可覆盖"""
return DefaultReasoner(self.config.model_config)
def create_planner(self) -> BasePlanner:
"""创建规划器 - 子类可覆盖"""
return DefaultPlanner()
def create_executor(self) -> BaseExecutor:
"""创建执行器 - 子类可覆盖"""
return DefaultExecutor(self.tools)
def run(self, query: str, **kwargs) -> AgentResult:
"""执行Agent任务"""
# 1. 任务分析
task = self.analyze_task(query, **kwargs)
# 2. 制定计划
plan = self.planner.create_plan(task)
# 3. 执行计划
result = self.executor.execute_plan(plan)
# 4. 记录和学习
self._record_execution(query, result)
return result
def analyze_task(self, query: str, **kwargs) -> Task:
"""分析任务 - 子类可覆盖以实现特定的任务理解逻辑"""
return Task(
query=query,
context=kwargs,
complexity=self._estimate_complexity(query),
required_capabilities=self._identify_capabilities(query)
)
def _estimate_complexity(self, query: str) -> str:
"""估算任务复杂度"""
# 简单的复杂度评估
if len(query.split()) < 10:
return "simple"
elif len(query.split()) < 30:
return "medium"
else:
return "complex"
def _identify_capabilities(self, query: str) -> List[str]:
"""识别所需能力"""
capabilities = []
# 基于关键词的简单识别
capability_keywords = {
'search': ['搜索', '查找', '搜', 'search', 'find'],
'calculation': ['计算', '算', 'calculate', '数学'],
'analysis': ['分析', '评估', 'analyze', 'evaluate'],
'generation': ['生成', '创建', '写', 'generate', 'create', 'write']
}
query_lower = query.lower()
for capability, keywords in capability_keywords.items():
if any(keyword in query_lower for keyword in keywords):
capabilities.append(capability)
return capabilities
4.2 专门化Agent实现:代码助手
# api/core/app/apps/agent_chat/agent/code_assistant_agent.py
class CodeAssistantAgent(CustomAgentBase):
"""代码助手Agent"""
def __init__(self, config: AgentConfig):
super().__init__(config)
# 专门的工具集
self.code_tools = [
CodeExecutionTool(),
CodeAnalysisTool(),
DocumentationSearchTool(),
StackOverflowSearchTool()
]
self.tools.extend(self.code_tools)
# 代码相关的记忆
self.code_context = CodeContext()
def create_reasoner(self) -> BaseReasoner:
"""创建专门的代码推理器"""
return CodeReasoner(
model_config=self.config.model_config,
code_context=self.code_context
)
def analyze_task(self, query: str, **kwargs) -> Task:
"""分析代码相关任务"""
task = super().analyze_task(query, **kwargs)
# 识别编程语言
detected_language = self._detect_programming_language(query)
if detected_language:
task.metadata['language'] = detected_language
# 识别任务类型
task_type = self._classify_code_task(query)
task.metadata['task_type'] = task_type
return task
def _detect_programming_language(self, query: str) -> Optional[str]:
"""检测编程语言"""
language_patterns = {
'python': ['python', 'py', 'def ', 'import ', 'from '],
'javascript': ['javascript', 'js', 'function', 'const ', 'let '],
'java': ['java', 'public class', 'public static'],
'cpp': ['c++', 'cpp', '#include', 'std::'],
'sql': ['sql', 'select', 'from', 'where', 'join']
}
query_lower = query.lower()
for language, patterns in language_patterns.items():
if any(pattern in query_lower for pattern in patterns):
return language
return None
def _classify_code_task(self, query: str) -> str:
"""分类代码任务"""
task_patterns = {
'debug': ['错误', '调试', 'bug', 'error', 'debug'],
'optimize': ['优化', '改进', 'optimize', 'improve'],
'explain': ['解释', '说明', 'explain', 'what does'],
'generate': ['写', '生成', '创建', 'write', 'generate', 'create'],
'review': ['审查', '检查', 'review', 'check']
}
query_lower = query.lower()
for task_type, patterns in task_patterns.items():
if any(pattern in query_lower for pattern in patterns):
return task_type
return 'general'
class CodeReasoner(BaseReasoner):
"""代码推理器"""
def __init__(self, model_config: ModelConfig, code_context: CodeContext):
super().__init__(model_config)
self.code_context = code_context
def reason(self, task: Task) -> ReasoningResult:
"""代码相关的推理"""
language = task.metadata.get('language')
task_type = task.metadata.get('task_type')
# 构建专门化的推理prompt
reasoning_prompt = self._build_code_reasoning_prompt(task, language, task_type)
# 调用LLM进行推理
response = self.llm_client.chat([
{"role": "system", "content": self._get_code_system_prompt()},
{"role": "user", "content": reasoning_prompt}
])
return self._parse_code_reasoning_response(response)
def _get_code_system_prompt(self) -> str:
"""获取代码助手的系统提示"""
return """你是一个专业的代码助手,具备以下能力:
1. 理解和分析各种编程语言的代码
2. 发现并修复代码中的bug
3. 优化代码性能和可读性
4. 解释复杂的代码逻辑
5. 生成高质量的代码
在回答时,请:
- 提供清晰的解释和分析
- 给出具体的代码示例
- 考虑最佳实践和性能优化
- 如果需要更多信息,主动询问"""
4.3 工具开发指南
# api/core/tools/tool/custom_code_execution_tool.py
class CodeExecutionTool(BuiltinTool):
"""代码执行工具"""
def get_identity(self) -> ToolIdentity:
return ToolIdentity(
author="Dify",
name="code_execution",
label=I18nObject(
en_US="Code Execution",
zh_Hans="代码执行"
),
icon="code.svg"
)
def get_parameters(self) -> dict:
return {
"language": {
"type": "string",
"required": True,
"options": ["python", "javascript", "bash"],
"label": I18nObject(en_US="Programming Language", zh_Hans="编程语言"),
"llm_description": "要执行的代码语言"
},
"code": {
"type": "string",
"required": True,
"label": I18nObject(en_US="Code", zh_Hans="代码"),
"llm_description": "要执行的代码内容"
},
"timeout": {
"type": "number",
"required": False,
"default": 30,
"label": I18nObject(en_US="Timeout (seconds)", zh_Hans="超时时间(秒)"),
"llm_description": "代码执行超时时间"
}
}
def _invoke(self, user_id: str, tool_parameters: dict) -> ToolInvokeMessage:
"""执行代码"""
language = tool_parameters.get('language')
code = tool_parameters.get('code')
timeout = tool_parameters.get('timeout', 30)
# 安全检查
if self._is_dangerous_code(code, language):
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message="检测到危险代码,拒绝执行"
)
try:
# 根据语言选择执行器
executor = self._get_code_executor(language)
result = executor.execute(code, timeout=timeout)
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message=f"执行结果:\n```\n{result.output}\n```\n\n执行时间: {result.execution_time:.2f}秒"
)
except CodeExecutionError as e:
return ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.TEXT,
message=f"代码执行失败:{str(e)}"
)
def _is_dangerous_code(self, code: str, language: str) -> bool:
"""检查危险代码"""
dangerous_patterns = {
'python': [
'os.system', 'subprocess', 'eval(', 'exec(',
'__import__', 'open(', 'file(', 'input(',
'raw_input(', 'compile('
],
'javascript': [
'require(', 'process.', 'global.',
'eval(', 'Function(', 'setTimeout',
'setInterval'
],
'bash': [
'rm ', 'sudo', 'chmod', 'wget', 'curl',
'>', '>>', '|', '&', ';'
]
}
patterns = dangerous_patterns.get(language, [])
code_lower = code.lower()
return any(pattern.lower() in code_lower for pattern in patterns)
def _get_code_executor(self, language: str):
"""获取代码执行器"""
executors = {
'python': PythonExecutor(),
'javascript': JavaScriptExecutor(),
'bash': BashExecutor()
}
executor = executors.get(language)
if not executor:
raise ValueError(f"不支持的编程语言: {language}")
return executor
五、性能优化与监控
5.1 Agent性能监控
# api/core/app/apps/agent_chat/monitoring/agent_monitor.py
class AgentPerformanceMonitor:
"""Agent性能监控器"""
def __init__(self):
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'avg_response_time': 0.0,
'avg_iterations': 0.0,
'tool_usage_stats': {},
'error_patterns': {}
}
self.execution_logs = []
def monitor_execution(self, func):
"""监控Agent执行的装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
execution_id = str(uuid.uuid4())
logger.info(f"开始监控Agent执行: {execution_id}")
try:
result = func(*args, **kwargs)
# 记录成功执行
execution_time = time.time() - start_time
self._record_successful_execution(execution_id, result, execution_time)
return result
except Exception as e:
# 记录失败执行
execution_time = time.time() - start_time
self._record_failed_execution(execution_id, str(e), execution_time)
raise
return wrapper
def _record_successful_execution(self, execution_id: str, result: AgentExecutorResult, execution_time: float):
"""记录成功执行"""
self.metrics['total_requests'] += 1
self.metrics['successful_requests'] += 1
# 更新平均响应时间
self._update_average('avg_response_time', execution_time)
# 更新平均迭代次数
self._update_average('avg_iterations', result.iterations)
# 统计工具使用情况
for step in result.intermediate_steps:
tool_name = step.action.tool_name
if tool_name:
self.metrics['tool_usage_stats'][tool_name] = \
self.metrics['tool_usage_stats'].get(tool_name, 0) + 1
# 记录执行日志
self.execution_logs.append({
'execution_id': execution_id,
'timestamp': datetime.utcnow(),
'success': True,
'execution_time': execution_time,
'iterations': result.iterations,
'tools_used': [step.action.tool_name for step in result.intermediate_steps if step.action.tool_name]
})
def _record_failed_execution(self, execution_id: str, error: str, execution_time: float):
"""记录失败执行"""
self.metrics['total_requests'] += 1
self.metrics['failed_requests'] += 1
# 统计错误模式
error_type = self._classify_error(error)
self.metrics['error_patterns'][error_type] = \
self.metrics['error_patterns'].get(error_type, 0) + 1
# 记录执行日志
self.execution_logs.append({
'execution_id': execution_id,
'timestamp': datetime.utcnow(),
'success': False,
'error': error,
'execution_time': execution_time
})
def get_performance_report(self) -> dict:
"""获取性能报告"""
total = self.metrics['total_requests']
if total == 0:
return {'message': '暂无执行数据'}
success_rate = self.metrics['successful_requests'] / total
return {
'overview': {
'total_requests': total,
'success_rate': f"{success_rate:.2%}",
'avg_response_time': f"{self.metrics['avg_response_time']:.2f}s",
'avg_iterations': f"{self.metrics['avg_iterations']:.1f}"
},
'tool_usage': self.metrics['tool_usage_stats'],
'error_analysis': self.metrics['error_patterns'],
'recommendations': self._generate_recommendations()
}
def _generate_recommendations(self) -> List[str]:
"""生成优化建议"""
recommendations = []
# 响应时间建议
if self.metrics['avg_response_time'] > 10:
recommendations.append("平均响应时间较长,建议优化工具调用或增加缓存")
# 迭代次数建议
if self.metrics['avg_iterations'] > 5:
recommendations.append("平均迭代次数较多,建议优化Prompt或工具选择逻辑")
# 成功率建议
success_rate = self.metrics['successful_requests'] / max(self.metrics['total_requests'], 1)
if success_rate < 0.9:
recommendations.append("成功率较低,建议检查错误处理和工具可靠性")
return recommendations
六、总结与实战建议
通过深入剖析 Dify 的 Agent 系统,我们看到了一个生产级 AI Agent 的完整实现:
核心设计
原理
- ReAct框架:Think-Act-Observe的经典循环,让AI具备推理和行动能力
- 工具抽象:统一的工具接口,支持灵活扩展和组合
- 记忆管理:多层次的记忆结构,平衡性能和上下文完整性
- 安全控制:完善的执行限制和危险代码检测机制
实战经验总结
Prompt工程关键点:
- 清晰的角色定义和工作流程说明
- 具体的输出格式约束和示例演示
- 错误恢复和异常处理的指导
工具设计最佳实践:
- 参数验证和类型检查必不可少
- 统一的错误处理和日志记录
- 考虑安全性,特别是代码执行类工具
性能优化要点:
- 合理的迭代次数和执行时间限制
- 智能的记忆压缩和上下文管理
- 工具调用的缓存和批量处理
常见陷阱及解决方案:
# 陷阱1:推理循环
# 解决方案:循环检测和反思机制
def detect_and_break_loop(self, action_history):
if len(action_history) >= 3:
recent_actions = action_history[-3:]
if len(set(recent_actions)) == 1: # 重复相同动作
return self.trigger_reflection()
return False
# 陷阱2:工具调用失败导致整个流程中断
# 解决方案:优雅降级和重试机制
def robust_tool_call(self, tool_name, parameters):
max_retries = 3
for attempt in range(max_retries):
try:
return self.tool_manager.invoke_tool(tool_name, parameters)
except Exception as e:
if attempt == max_retries - 1:
return f"工具调用失败,但我会尝试基于现有信息回答:{str(e)}"
time.sleep(2 ** attempt) # 指数退避
# 陷阱3:Token消耗过快
# 解决方案:智能上下文管理
def manage_context_efficiently(self, conversation_history):
# 保留关键信息,压缩冗余内容
key_interactions = self.extract_key_interactions(conversation_history)
compressed_context = self.compress_non_critical_content(conversation_history)
return key_interactions + compressed_context
扩展开发指南
开发自定义Agent的步骤:
- 需求分析:明确Agent要解决的具体问题域
- 工具准备:开发或集成必要的工具和API
- 推理逻辑:设计适合领域的推理流程
- 测试验证:构建完整的测试用例集
- 性能优化:监控和调优Agent表现
工具开发建议:
# 好的工具设计示例
class WeatherQueryTool(BuiltinTool):
def _invoke(self, user_id: str, tool_parameters: dict) -> ToolInvokeMessage:
try:
# 1. 参数标准化
location = self._normalize_location(tool_parameters.get('location'))
# 2. 缓存检查
cache_key = f"weather:{location}:{date.today()}"
cached_result = self.cache.get(cache_key)
if cached_result:
return cached_result
# 3. API调用
weather_data = self.weather_api.get_current_weather(location)
# 4. 结果处理
formatted_result = self._format_weather_data(weather_data)
# 5. 缓存存储
self.cache.set(cache_key, formatted_result, ttl=1800) # 30分钟
return formatted_result
except Exception as e:
return self._handle_error(e, tool_parameters)
未来发展方向
Dify Agent系统的设计为未来扩展留下了充足空间:
- 多Agent协作:支持多个Agent并行工作和协商
- 学习能力增强:从历史交互中学习和改进
- 更智能的规划:基于强化学习的动态规划能力
- 跨模态能力:整合视觉、语音等多模态输入
下一章,我们将深入探讨 Dify 的插件系统与扩展机制,看看如何让这个平台具备无限的可扩展性。相信那里会有更多关于系统架构和插件设计的精彩内容等着我们!
记住,一个优秀的Agent不仅要能够"思考",更要能够"行动",而且要能够从每次行动中"学习"。在你的实践中,始终关注这三个维度的平衡,你就能构建出真正智能的AI助手。
如果你在开发自己的Agent系统时遇到问题,或者对某些设计有独特见解,欢迎在评论区分享。让我们一起推动AI Agent技术的发展!
更多推荐
所有评论(0)