LangGraph 对应参数详解
workflow = StateGraph(AgentState)
# 添加所有节点(全部使用异步版本)
workflow.add_node("classifier", async_classifier_node)
workflow.add_node("fetch_position_node", async_virtual_position_tools)
workflow.add_node("fetch_resume_optimize_node", async_resume_optimize_tools)
workflow.add_node("generate_response_node", async_generate_response_node)
workflow.add_node("error_handler", async_error_handler_node)
workflow.set_entry_point("classifier")
workflow.add_edge("error_handler", END)
# 条件边
workflow.add_conditional_edges(
"classifier",
decide_next_node,
{
"fetch_position_node": "fetch_position_node",
"fetch_resume_optimize_node": "fetch_resume_optimize_node",
"generate_response_node": "generate_response_node",
"error_handler": "error_handler",
}
)
# 普通边(工具节点执行后回到分类器)
workflow.add_edge("fetch_position_node", "classifier")
workflow.add_edge("fetch_resume_optimize_node", "classifier")
workflow.add_edge("generate_response_node", END)
# 编译图
app = workflow.compile()
def decide_next_node(state: AgentState):
"""决策路由 - 扩展支持 MCP"""
iteration = state.get('iteration_count', 0)
max_iterations = state.get('max_iterations', AgentConfig.MAX_ITERATIONS)
print(f"🔀 决策路由检查 - 迭代: {iteration}/{max_iterations}")
if iteration >= max_iterations:
return "generate_response_node"
if state.get("error"):
return "error_handler"
action = state.get("next_step")
# 扩展的动作映射
action_map = {
"fetch_position": "fetch_position_node",
"fetch_resume_optimize": "fetch_resume_optimize_node",
"generate_response": "generate_response_node"
}
next_node = action_map.get(action, "generate_response_node")
print(f"路由到: {next_node}")
return next_node
状态图工作流的各个组件和参数含义:
1. 基础结构
workflow = StateGraph(AgentState)
作用:创建一个状态图工作流
参数:AgentState - 定义工作流中传递的状态数据类型
2. 添加节点 (add_node)
workflow.add_node("classifier", async_classifier_node)
作用:向工作流中添加一个处理节点 参数:
"classifier" - 节点名称(字符串标识符)
async_classifier_node - 异步处理函数,接收和返回 AgentState
3. 设置入口点 (set_entry_point)
workflow.set_entry_point("classifier")
作用:指定工作流的起始节点
参数:"classifier" - 入口节点的名称
4. 添加普通边 (add_edge)
workflow.add_edge("fetch_position_node", "classifier")
作用:定义节点间的无条件转移路径
参数:
"fetch_position_node" - 源节点名称
"classifier" - 目标节点名称
含义:当 fetch_position_node 执行完成后,无条件转移到 classifier
5. 添加条件边 (add_conditional_edges)
workflow.add_conditional_edges(
"classifier", # 源节点
decide_next_node, # 条件判断函数
{
"fetch_position_node": "fetch_position_node",
"fetch_resume_optimize_node": "fetch_resume_optimize_node",
"generate_response_node": "generate_response_node",
"error_handler": "error_handler",
}
)
作用:根据条件动态决定下一个节点
参数:
"classifier" - 源节点名称
decide_next_node - 条件判断函数,返回下一个节点的名称
字典 - 映射判断函数返回值到实际节点名称
键(Key):decide_next_node 函数的返回值
这是条件判断函数返回的逻辑决策结果
比如:decide_next_node(state) 返回 "fetch_position_node"
值(Value):实际要跳转的节点名称
这是工作流中注册的具体节点名称
6. 特殊节点
workflow.add_edge("error_handler", END)
workflow.add_edge("generate_response_node", END)
END:特殊标识符,表示工作流结束
decide_next_node 函数是一个智能路由决策器,我来详细解释每个部分的含义:
1. 迭代控制机制
iteration = state.get('iteration_count', 0)
max_iterations = state.get('max_iterations', AgentConfig.MAX_ITERATIONS)
print(f"决策路由检查 - 迭代: {iteration}/{max_iterations}")
if iteration >= max_iterations:
return "generate_response_node"
作用:防止无限循环
iteration_count:当前循环次数
max_iterations:最大允许循环次数(如5次)
安全机制:超过最大次数就强制结束,避免死循环
2. 错误处理
if state.get("error"):
return "error_handler"
作用:全局错误捕获
如果状态中有错误信息,立即跳转到错误处理节点
3. 动作路由逻辑
action = state.get("next_step")
action_map = {
"fetch_position": "fetch_position_node",
"fetch_resume_optimize": "fetch_resume_optimize_node",
"generate_response": "generate_response_node"
}
next_node = action_map.get(action, "generate_response_node")
作用:根据 next_step 决定下一步
action_map:动作到节点的映射字典
get(action, "generate_response_node"):默认安全回退
完整流程解析
这个工作流实现了以下逻辑:
开始 → classifier(分类器节点)
分类器 根据 decide_next_node 的结果选择路径:
职位查询 → fetch_position_node → 返回 classifier
简历优化 → fetch_resume_optimize_node → 返回 classifier
生成响应 → generate_response_node → 结束
错误处理 → error_handler → 结束
状态流转示例
# 示例状态流转
开始 → classifier → (根据条件)
→ fetch_position_node → classifier →
→ fetch_resume_optimize_node → classifier →
→ generate_response_node → END
# 或者错误情况
开始 → classifier → error_handler → END
这种设计允许工作流在多个工具节点间循环,直到最终生成响应或处理错误。
非特殊说明,本博所有文章均为博主原创。