LangGraph 对应参数详解

TwoAdmin 2025-9-28 74 9/28

LangGraph 对应参数详解

workflow = StateGraph(AgentState)

# 添加所有节点(全部使用异步版本)
workflow.add_node("classifier", async_classifier_node)
workflow.add_node("fetch_position_node", async_virtual_position_tools)
workflow.add_node("fetch_resume_optimize_node", async_resume_optimize_tools)
workflow.add_node("generate_response_node", async_generate_response_node)
workflow.add_node("error_handler", async_error_handler_node)

workflow.set_entry_point("classifier")
workflow.add_edge("error_handler", END)

# 条件边
workflow.add_conditional_edges(
    "classifier",
    decide_next_node,
    {
        "fetch_position_node": "fetch_position_node",
        "fetch_resume_optimize_node": "fetch_resume_optimize_node",
        "generate_response_node": "generate_response_node",
        "error_handler": "error_handler",
    }
)

# 普通边(工具节点执行后回到分类器)
workflow.add_edge("fetch_position_node", "classifier")
workflow.add_edge("fetch_resume_optimize_node", "classifier")
workflow.add_edge("generate_response_node", END)

# 编译图
app = workflow.compile()

def decide_next_node(state: AgentState):
    """决策路由 - 扩展支持 MCP"""
    iteration = state.get('iteration_count', 0)
    max_iterations = state.get('max_iterations', AgentConfig.MAX_ITERATIONS)

    print(f"🔀 决策路由检查 - 迭代: {iteration}/{max_iterations}")

    if iteration >= max_iterations:
        return "generate_response_node"

    if state.get("error"):
        return "error_handler"

    action = state.get("next_step")

    # 扩展的动作映射
    action_map = {
        "fetch_position": "fetch_position_node",
        "fetch_resume_optimize": "fetch_resume_optimize_node",
        "generate_response": "generate_response_node"
    }

    next_node = action_map.get(action, "generate_response_node")
    print(f"路由到: {next_node}")
    return next_node

状态图工作流的各个组件和参数含义:

1. 基础结构

workflow = StateGraph(AgentState)

作用:创建一个状态图工作流
参数:AgentState - 定义工作流中传递的状态数据类型
2. 添加节点 (add_node)

workflow.add_node("classifier", async_classifier_node)

作用:向工作流中添加一个处理节点 参数:

"classifier" - 节点名称(字符串标识符)
async_classifier_node - 异步处理函数,接收和返回 AgentState
3. 设置入口点 (set_entry_point)

workflow.set_entry_point("classifier")

作用:指定工作流的起始节点
参数:"classifier" - 入口节点的名称
4. 添加普通边 (add_edge)

workflow.add_edge("fetch_position_node", "classifier")

作用:定义节点间的无条件转移路径
参数:

"fetch_position_node" - 源节点名称
"classifier" - 目标节点名称
含义:当 fetch_position_node 执行完成后,无条件转移到 classifier
5. 添加条件边 (add_conditional_edges)

workflow.add_conditional_edges(
"classifier", # 源节点
decide_next_node, # 条件判断函数
{
"fetch_position_node": "fetch_position_node",
"fetch_resume_optimize_node": "fetch_resume_optimize_node",
"generate_response_node": "generate_response_node",
"error_handler": "error_handler",
}
)

作用:根据条件动态决定下一个节点
参数:

"classifier" - 源节点名称
decide_next_node - 条件判断函数,返回下一个节点的名称
字典 - 映射判断函数返回值到实际节点名称

键(Key):decide_next_node 函数的返回值

这是条件判断函数返回的逻辑决策结果
比如:decide_next_node(state) 返回 "fetch_position_node"
值(Value):实际要跳转的节点名称

这是工作流中注册的具体节点名称

6. 特殊节点

workflow.add_edge("error_handler", END)
workflow.add_edge("generate_response_node", END)

END:特殊标识符,表示工作流结束

decide_next_node 函数是一个智能路由决策器,我来详细解释每个部分的含义:

1. 迭代控制机制

iteration = state.get('iteration_count', 0)
max_iterations = state.get('max_iterations', AgentConfig.MAX_ITERATIONS)

print(f"决策路由检查 - 迭代: {iteration}/{max_iterations}")

if iteration >= max_iterations:
return "generate_response_node"

作用:防止无限循环

iteration_count:当前循环次数
max_iterations:最大允许循环次数(如5次)
安全机制:超过最大次数就强制结束,避免死循环
2. 错误处理

if state.get("error"):
return "error_handler"

作用:全局错误捕获

如果状态中有错误信息,立即跳转到错误处理节点
3. 动作路由逻辑

action = state.get("next_step")

action_map = {
"fetch_position": "fetch_position_node",
"fetch_resume_optimize": "fetch_resume_optimize_node",
"generate_response": "generate_response_node"
}

next_node = action_map.get(action, "generate_response_node")

作用:根据 next_step 决定下一步

action_map:动作到节点的映射字典
get(action, "generate_response_node"):默认安全回退

完整流程解析

这个工作流实现了以下逻辑:

开始 → classifier(分类器节点)
分类器 根据 decide_next_node 的结果选择路径:

职位查询 → fetch_position_node → 返回 classifier
简历优化 → fetch_resume_optimize_node → 返回 classifier
生成响应 → generate_response_node → 结束
错误处理 → error_handler → 结束
状态流转示例

# 示例状态流转
开始 → classifier → (根据条件)
→ fetch_position_node → classifier →
→ fetch_resume_optimize_node → classifier →
→ generate_response_node → END

# 或者错误情况
开始 → classifier → error_handler → END
这种设计允许工作流在多个工具节点间循环,直到最终生成响应或处理错误。

- THE END -
Tag:

TwoAdmin

10月21日14:42

最后修改:2025年10月21日
0

非特殊说明,本博所有文章均为博主原创。