前言:Agent从"玩具"走向"生产"

2024年,LLM Agent是开发者最热的实验方向;2026年,它已经变成了企业的核心生产力工具。但从实验到生产,中间有一道深壑——如何构建一个能稳定运行、可观测、可回滚的多步骤Agent系统?LangGraph给出了目前最接近工程化答案的方案。本文将深入拆解LangGraph的核心设计哲学,并给出一套可落地的生产级Agent工作流构建指南。—## 一、为什么需要LangGraph?LangChain的局限LangChain早期的Chain抽象适合线性任务:A→B→C。但真实业务中,Agent需要:- 条件分支:根据工具调用结果决定下一步- 循环执行:反复检索直到找到满意答案- 并行处理:同时调用多个工具- 状态持久化:跨轮次保存中间状态- 人工介入点:在关键节点暂停等待人工确认LangChain的AgentExecutor在这些场景下显得力不从心。LangGraph用图(Graph)替代链(Chain),用状态机的思维重新定义了Agent的执行逻辑。—## 二、LangGraph核心概念速查### 2.1 节点(Node)节点是图中的执行单元,本质是一个Python函数:pythonfrom langgraph.graph import StateGraph, ENDfrom typing import TypedDict, Annotatedimport operatorclass AgentState(TypedDict): messages: Annotated[list, operator.add] current_step: str tool_results: dict final_answer: str | Nonedef call_model(state: AgentState) -> AgentState: """调用LLM节点""" messages = state["messages"] response = llm.invoke(messages) return {"messages": [response], "current_step": "model_called"}def call_tool(state: AgentState) -> AgentState: """工具调用节点""" last_message = state["messages"][-1] tool_name = last_message.additional_kwargs["tool_calls"][0]["function"]["name"] tool_args = json.loads(last_message.additional_kwargs["tool_calls"][0]["function"]["arguments"]) result = tools_map[tool_name].invoke(tool_args) tool_message = ToolMessage(content=str(result), tool_call_id=last_message.additional_kwargs["tool_calls"][0]["id"]) return { "messages": [tool_message], "tool_results": {tool_name: result}, "current_step": "tool_called" }### 2.2 边(Edge)与条件边边定义了节点之间的流转逻辑:pythondef should_continue(state: AgentState) -> str: """条件路由函数""" last_message = state["messages"][-1] # 如果LLM没有调用工具,说明已得出结论 if not last_message.additional_kwargs.get("tool_calls"): return "end" # 检查工具调用类型 tool_name = last_message.additional_kwargs["tool_calls"][0]["function"]["name"] if tool_name == "final_answer": return "end" return "continue_tool"# 构建图workflow = StateGraph(AgentState)workflow.add_node("model", call_model)workflow.add_node("tool", call_tool)workflow.set_entry_point("model")workflow.add_conditional_edges( "model", should_continue, { "continue_tool": "tool", "end": END })workflow.add_edge("tool", "model")app = workflow.compile()### 2.3 State管理:Agent的记忆中枢LangGraph的State是整个执行流的共享上下文:pythonclass ProductionAgentState(TypedDict): # 对话历史(使用add reducer自动追加) messages: Annotated[list[BaseMessage], operator.add] # 任务信息 task_id: str task_type: str input_data: dict # 执行状态 iteration_count: int error_count: int is_completed: bool # 工具调用结果缓存 search_results: list[str] code_execution_results: list[dict] # 最终输出 final_report: str | None confidence_score: float—## 三、生产级设计模式### 3.1 ReAct模式(推理-行动-观察)pythonSYSTEM_PROMPT = """你是一个专业的数据分析Agent。按照以下格式执行任务:思考:分析当前情况,决定下一步行动行动:调用指定工具观察:记录工具返回结果...重复直到得出最终答案...最终答案:[完整的分析结论]可用工具:- search_web(query): 网络搜索- execute_python(code): 执行Python代码 - read_file(path): 读取文件内容- write_report(content): 生成最终报告"""def build_react_agent(tools: list, llm) -> CompiledGraph: model_with_tools = llm.bind_tools(tools) def reasoning_node(state: AgentState) -> AgentState: response = model_with_tools.invoke(state["messages"]) return { "messages": [response], "iteration_count": state["iteration_count"] + 1 } def should_act(state: AgentState) -> str: if state["iteration_count"] > 20: # 防止无限循环 return "force_end" last_msg = state["messages"][-1] if hasattr(last_msg, "tool_calls") and last_msg.tool_calls: return "act" return "end" graph = StateGraph(AgentState) graph.add_node("reason", reasoning_node) graph.add_node("act", ToolNode(tools)) graph.set_entry_point("reason") graph.add_conditional_edges("reason", should_act, { "act": "act", "end": END, "force_end": END }) graph.add_edge("act", "reason") return graph.compile()### 3.2 Plan-and-Execute模式(规划-执行分离)对于复杂长任务,先规划再执行更可靠:pythonclass PlanExecuteState(TypedDict): input: str plan: list[str] past_steps: Annotated[list[tuple], operator.add] response: str | Nonedef planner_node(state: PlanExecuteState) -> PlanExecuteState: """规划节点:将复杂任务分解为子任务列表""" plan_prompt = f""" 请将以下任务分解为5步以内的具体执行步骤,每步必须是可独立执行的原子操作: 任务:{state["input"]} 以JSON列表形式返回步骤,例如:["步骤1:...", "步骤2:...", "步骤3:..."] """ response = llm.invoke([HumanMessage(content=plan_prompt)]) steps = json.loads(response.content) return {"plan": steps}def executor_node(state: PlanExecuteState) -> PlanExecuteState: """执行节点:执行当前待完成步骤""" task = state["plan"][0] past = "\n".join([f"已完成:{s} -> 结果:{r}" for s, r in state["past_steps"]]) execution_result = executor_agent.invoke({ "input": f"执行步骤:{task}\n上下文:{past}" }) return {"past_steps": [(task, execution_result["output"])]}def replan_or_end(state: PlanExecuteState) -> str: """判断是否需要重新规划""" if len(state["plan"]) <= 1: return "generate_response" # 检查执行结果,决定是否继续或重新规划 last_result = state["past_steps"][-1][1] if "错误" in last_result or "失败" in last_result: return "replan" return "continue_execute"### 3.3 Multi-Agent协作模式python# 主控Agent(Supervisor)def supervisor_node(state: SupervisorState) -> SupervisorState: """分配任务给专业子Agent""" TEAM = ["researcher", "analyst", "writer"] supervisor_prompt = f""" 你是团队协调者,根据任务需求决定由哪个专家处理: - researcher:负责信息收集和网络搜索 - analyst:负责数据分析和代码执行 - writer:负责文档撰写和报告生成 - FINISH:所有工作已完成 当前任务状态:{state["messages"][-5:]} 下一步应该交给: """ response = llm_with_structured_output.invoke(supervisor_prompt) return {"next": response.next}—## 四、生产必备:持久化与检查点### 4.1 使用Checkpoint实现状态持久化pythonfrom langgraph.checkpoint.postgres import PostgresSaver# 使用PostgreSQL保存检查点DB_URI = "postgresql://user:password@localhost/langgraph_checkpoints"checkpointer = PostgresSaver.from_conn_string(DB_URI)# 编译时注入checkpointerapp = workflow.compile(checkpointer=checkpointer)# 运行时指定thread_id实现会话隔离config = {"configurable": {"thread_id": "user-123-task-456"}}result = await app.ainvoke(initial_state, config=config)# 可以随时恢复指定会话的状态saved_state = await app.aget_state(config)print(f"当前步骤:{saved_state.values['current_step']}")### 4.2 Human-in-the-Loop(人工干预节点)pythonfrom langgraph.types import interruptdef sensitive_operation_node(state: AgentState) -> AgentState: """需要人工审核的操作""" operation_summary = f""" 即将执行高风险操作: - 操作类型:{state['pending_action']['type']} - 影响范围:{state['pending_action']['scope']} - 预计成本:{state['pending_action']['estimated_cost']} 请确认是否继续(yes/no): """ # interrupt()会暂停图的执行,等待人工输入 human_input = interrupt(operation_summary) if human_input.lower() == "yes": result = execute_operation(state['pending_action']) return {"operation_result": result, "human_approved": True} else: return {"operation_result": "用户取消操作", "human_approved": False}—## 五、可观测性:让Agent行为可追踪### 5.1 集成LangSmith追踪pythonimport osos.environ["LANGCHAIN_TRACING_V2"] = "true"os.environ["LANGCHAIN_API_KEY"] = "your-api-key"os.environ["LANGCHAIN_PROJECT"] = "production-agent-v2"# 自定义回调收集关键指标from langchain.callbacks.base import BaseCallbackHandlerclass ProductionMetricsCallback(BaseCallbackHandler): def on_llm_start(self, serialized, prompts, **kwargs): self.start_time = time.time() def on_llm_end(self, response, **kwargs): latency = time.time() - self.start_time tokens = response.llm_output.get("token_usage", {}) # 上报到监控系统 metrics.record("llm_latency", latency) metrics.record("token_usage", tokens.get("total_tokens", 0)) def on_tool_error(self, error, **kwargs): logger.error(f"工具调用失败: {error}") metrics.increment("tool_error_count")### 5.2 结构化日志输出pythonimport structloglogger = structlog.get_logger()def instrumented_node(node_func): """装饰器:为所有节点自动添加结构化日志""" def wrapper(state: AgentState) -> AgentState: logger.info("node_start", node=node_func.__name__, iteration=state.get("iteration_count", 0), task_id=state.get("task_id")) try: result = node_func(state) logger.info("node_success", node=node_func.__name__, output_keys=list(result.keys())) return result except Exception as e: logger.error("node_error", node=node_func.__name__, error=str(e), traceback=traceback.format_exc()) raise return wrapper—## 六、部署方案:LangGraph Platform vs 自建### 6.1 LangGraph Platform(云托管)- 优点:开箱即用,内置持久化、调度、监控- 适合:快速验证、中小规模应用- 成本:按API调用量计费### 6.2 自建部署(Docker + Kubernetes)dockerfile# DockerfileFROM python:3.11-slimWORKDIR /appCOPY requirements.txt .RUN pip install -r requirements.txtCOPY . .# 启动LangGraph ServerCMD ["python", "-m", "langgraph", "server", "--host", "0.0.0.0", "--port", "8080"]``````yaml# k8s-deployment.yamlapiVersion: apps/v1kind: Deploymentmetadata: name: langgraph-agentspec: replicas: 3 selector: matchLabels: app: langgraph-agent template: spec: containers: - name: agent image: myregistry/langgraph-agent:v2.1.0 resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "4Gi" cpu: "2000m" env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: api-secrets key: openai-key—## 七、性能优化实践### 7.1 异步并行节点pythonimport asynciofrom langgraph.graph import StateGraphasync def parallel_search_node(state: AgentState) -> AgentState: """并行执行多个搜索任务""" queries = state["search_queries"] # 并发执行所有搜索 tasks = [search_tool.ainvoke(q) for q in queries] results = await asyncio.gather(*tasks, return_exceptions=True) # 过滤失败的结果 valid_results = [r for r in results if not isinstance(r, Exception)] return {"search_results": valid_results}### 7.2 缓存策略pythonfrom functools import lru_cacheimport hashlibclass CachedLLM: def __init__(self, llm, cache_ttl=3600): self.llm = llm self.cache = {} self.cache_ttl = cache_ttl def invoke(self, messages): cache_key = hashlib.md5(str(messages).encode()).hexdigest() if cache_key in self.cache: cached_at, result = self.cache[cache_key] if time.time() - cached_at < self.cache_ttl: return result result = self.llm.invoke(messages) self.cache[cache_key] = (time.time(), result) return result—## 八、总结与选型建议| 场景 | 推荐方案 ||------|---------|| 简单线性任务 | LangChain LCEL || 需要条件分支 | LangGraph基础图 || 复杂多步骤任务 | LangGraph + Plan-Execute || 多Agent协作 | LangGraph + Supervisor || 需要人工审核 | LangGraph + interrupt() || 生产级部署 | LangGraph Platform 或自建K8s |LangGraph的核心价值在于:用图结构化了Agent的执行逻辑,让不可预测的LLM行为变得可控、可调试、可回滚。2026年的AI应用开发,LangGraph已经成为构建生产级Agent系统的事实标准。—参考资料:LangGraph官方文档 v0.2.x、LangChain Blog 2026 Agent Architecture系列

Logo

欢迎加入DeepSeek 技术社区。在这里,你可以找到志同道合的朋友,共同探索AI技术的奥秘。

更多推荐