LangGraph 可以很好地与 MCP Server 集成。这种组合非常强大,LangGraph 作为工作流编排器,MCP Server 提供具体的能力,共同构建复杂的 AI 应用。
集成架构概述
安装依赖
pip install langgraph langchain modelcontextprotocol
完整示例:数据分析工作流
1. 首先创建 MCP Server
// mcp_server.go
package main
import (
"context"
"fmt"
"log"
mcp "github.com/modelcontextprotocol/sdk-go"
)
func main() {
server := mcp.NewServer("data-analytics-server", "1.0.0")
// 提供销售数据资源
server.AddResourceHandler("sales-data", func(ctx context.Context, method string, params map[string]any) (mcp.ResourceResult, error) {
if method == "list" {
return mcp.ResourceResult{
Contents: []mcp.ResourceContent{
{URI: "sales:///2024/q1.csv"},
{URI: "sales:///2024/q2.csv"},
},
}, nil
}
// 模拟销售数据
salesData := `日期,产品,销售额,数量
2024-01-10,产品A,12000,24
2024-01-15,产品B,8500,17
2024-02-08,产品A,15000,30
2024-02-20,产品C,9200,18
2024-03-05,产品B,11000,22
2024-03-18,产品A,13500,27`
return mcp.ResourceResult{
Contents: []mcp.ResourceContent{
{
URI: "sales:///2024/q1.csv",
Content: salesData,
MimeType: "text/csv",
},
},
}, nil
})
// 添加数据分析工具
server.AddToolHandler("calculate-metrics", func(ctx context.Context, params map[string]any) (mcp.ToolResult, error) {
data, ok := params["data"].(string)
if !ok {
return mcp.ToolResult{}, fmt.Errorf("data parameter is required")
}
// 简单的指标计算(实际中会更复杂)
totalSales, avgSale, productCount := calculateSalesMetrics(data)
result := fmt.Sprintf("总销售额: $%.2f\n平均销售额: $%.2f\n产品种类数: %d",
totalSales, avgSale, productCount)
return mcp.ToolResult{
Content: []mcp.Content{
{
Type: "text",
Text: result,
},
},
}, nil
}, mcp.ParamSchema{
Type: mcp.JSONSchemaObject,
Properties: map[string]mcp.ParamSchema{
"data": {
Type: mcp.JSONSchemaString,
Description: "CSV格式的销售数据",
},
},
Required: []string{"data"},
})
if err := server.Run(); err != nil {
log.Fatalf("Server error: %v", err)
}
}
func calculateSalesMetrics(data string) (float64, float64, int) {
// 简化的计算逻辑
lines := strings.Split(data, "\n")
total := 0.0
count := 0
products := make(map[string]bool)
for i, line := range lines {
if i == 0 { // 跳过标题行
continue
}
fields := strings.Split(line, ",")
if len(fields) >= 3 {
sales, _ := strconv.ParseFloat(fields[2], 64)
total += sales
count++
products[fields[1]] = true
}
}
avg := 0.0
if count > 0 {
avg = total / float64(count)
}
return total, avg, len(products)
}
2. LangGraph 工作流集成
# langgraph_mcp_integration.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
import requests
import json
import subprocess
import asyncio
# 状态定义
class AnalysisState(TypedDict):
user_query: str
raw_data: str
metrics: str
analysis_result: str
next_step: str
# 启动 MCP Server
def start_mcp_server():
"""启动 Go MCP Server"""
process = subprocess.Popen(
["./mcp_server"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return process
# MCP 客户端类
class MCPClient:
def __init__(self, process):
self.process = process
def send_request(self, method: str, params: dict = None):
"""发送 JSON-RPC 请求到 MCP Server"""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or {}
}
# 发送请求
self.process.stdin.write(json.dumps(request) + "\n")
self.process.stdin.flush()
# 读取响应
response_line = self.process.stdout.readline()
return json.loads(response_line)
def get_sales_data(self):
"""获取销售数据"""
response = self.send_request("readResource", {
"name": "sales-data",
"arguments": {"name": "sales:///2024/q1.csv"}
})
return response.get("result", {}).get("contents", [{}])[0].get("content", "")
def calculate_metrics(self, data: str):
"""计算指标"""
response = self.send_request("callTool", {
"name": "calculate-metrics",
"arguments": {"data": data}
})
return response.get("result", {}).get("content", [{}])[0].get("text", "")
# 定义工作流节点
def retrieve_data(state: AnalysisState, mcp_client: MCPClient):
"""从 MCP Server 获取数据"""
print("📊 从 MCP Server 获取数据...")
raw_data = mcp_client.get_sales_data()
return {"raw_data": raw_data}
def calculate_metrics(state: AnalysisState, mcp_client: MCPClient):
"""使用 MCP Server 计算指标"""
print("🧮 计算销售指标...")
metrics = mcp_client.calculate_metrics(state["raw_data"])
return {"metrics": metrics}
def analyze_results(state: AnalysisState, llm):
"""使用大模型分析结果"""
print("🤖 AI 分析数据...")
prompt = f"""
根据以下销售数据和指标,为用户的问题提供详细分析:
用户问题: {state['user_query']}
原始数据:
{state['raw_data']}
计算指标:
{state['metrics']}
请提供:
1. 关键洞察和趋势分析
2. 业务建议
3. 潜在的风险和机会
用中文回复,保持专业但易于理解。
"""
# 这里使用 LangChain 的 LLM,实际中可以是 OpenAI、Anthropic 等
response = llm.invoke(prompt)
return {"analysis_result": response.content}
def decide_next_step(state: AnalysisState):
"""决定下一步操作"""
print("🔍 决定下一步...")
# 简单的决策逻辑
if "详细" in state["user_query"] or "深入" in state["user_query"]:
return {"next_step": "detailed_analysis"}
elif "预测" in state["user_query"] or "未来" in state["user_query"]:
return {"next_step": "forecast"}
else:
return {"next_step": "complete"}
def detailed_analysis(state: AnalysisState, llm):
"""详细分析(如果需要)"""
print("🔎 进行详细分析...")
prompt = f"""
基于之前的分析,进行更深入的研究:
{state['analysis_result']}
请提供更详细的数据洞察和具体的行动建议。
"""
response = llm.invoke(prompt)
return {"analysis_result": state["analysis_result"] + "\n\n详细分析:\n" + response.content}
# 构建工作流
def create_workflow(llm):
"""创建 LangGraph 工作流"""
workflow = StateGraph(AnalysisState)
# 添加节点
workflow.add_node("retrieve_data", lambda state: retrieve_data(state, mcp_client))
workflow.add_node("calculate_metrics", lambda state: calculate_metrics(state, mcp_client))
workflow.add_node("analyze_results", lambda state: analyze_results(state, llm))
workflow.add_node("decide_next_step", decide_next_step)
workflow.add_node("detailed_analysis", lambda state: detailed_analysis(state, llm))
# 设置入口点
workflow.set_entry_point("retrieve_data")
# 添加边
workflow.add_edge("retrieve_data", "calculate_metrics")
workflow.add_edge("calculate_metrics", "analyze_results")
workflow.add_edge("analyze_results", "decide_next_step")
# 条件边
workflow.add_conditional_edges(
"decide_next_step",
lambda state: state["next_step"],
{
"detailed_analysis": "detailed_analysis",
"forecast": "detailed_analysis", # 简化处理
"complete": END
}
)
workflow.add_edge("detailed_analysis", END)
return workflow.compile()
# 主执行函数
async def main():
# 启动 MCP Server
mcp_process = start_mcp_server()
mcp_client = MCPClient(mcp_process)
# 初始化 LLM (这里用假定的 LangChain LLM)
from langchain.llms import FakeLLM # 实际中使用真实的 LLM
llm = FakeLLM()
# 创建和工作流
workflow = create_workflow(llm)
# 执行工作流
user_query = "请分析2024年第一季度的销售表现,并提供详细建议"
initial_state = {
"user_query": user_query,
"raw_data": "",
"metrics": "",
"analysis_result": "",
"next_step": ""
}
print("🚀 开始执行工作流...")
result = await workflow.ainvoke(initial_state)
print("\n" + "="*50)
print("📋 最终分析结果:")
print("="*50)
print(result["analysis_result"])
# 清理
mcp_process.terminate()
if __name__ == "__main__":
asyncio.run(main())
更高级的集成模式
动态工具调用
# 动态发现和调用 MCP 工具
class DynamicMCPIntegration:
def __init__(self, mcp_client):
self.mcp_client = mcp_client
self.available_tools = self.discover_tools()
def discover_tools(self):
"""发现 MCP Server 提供的所有工具"""
response = self.mcp_client.send_request("listTools")
return response.get("result", {}).get("tools", [])
def execute_tool(self, tool_name: str, arguments: dict):
"""动态执行工具"""
for tool in self.available_tools:
if tool["name"] == tool_name:
return self.mcp_client.send_request("callTool", {
"name": tool_name,
"arguments": arguments
})
raise ValueError(f"Tool {tool_name} not found")
def create_tool_node(self, tool_name: str):
"""为 LangGraph 创建动态工具节点"""
def tool_node(state: dict):
# 从状态中提取参数
arguments = self.prepare_arguments(tool_name, state)
result = self.execute_tool(tool_name, arguments)
return {f"{tool_name}_result": result}
return tool_node
# 在工作流中动态添加节点
def build_dynamic_workflow(mcp_integration: DynamicMCPIntegration):
workflow = StateGraph(dict)
for tool in mcp_integration.available_tools:
node = mcp_integration.create_tool_node(tool["name"])
workflow.add_node(tool["name"], node)
# 动态构建工作流逻辑...
return workflow.compile()
配置和部署
Docker 部署
# Dockerfile
FROM golang:1.21
WORKDIR /app
COPY mcp_server.go .
COPY go.mod .
COPY go.sum .
RUN go build -o mcp-server
CMD ["./mcp-server"]
客户端配置
// claude_desktop_config.json
{
"mcpServers": {
"data-analytics-server": {
"command": "docker",
"args": ["run", "--rm", "my-mcp-server:latest"]
}
}
}
优势总结
-
工作流编排: LangGraph 管理复杂的多步骤流程
-
能力扩展: MCP Server 提供专业的数据和处理能力
-
灵活组合: 可以动态组合不同的 MCP Server
-
错误处理: LangGraph 提供强大的错误处理和重试机制
-
状态管理: 保持完整的对话状态和历史
这种组合特别适合需要复杂数据处理、多系统集成、和长期对话状态的 AI 应用场景。
- THE END -
最后修改:2025年9月17日
非特殊说明,本博所有文章均为博主原创。