langgraph 调用 golang mcp server

TwoAdmin 2025-9-17 127 9/17

LangGraph 可以很好地与 MCP Server 集成。这种组合非常强大,LangGraph 作为工作流编排器,MCP Server 提供具体的能力,共同构建复杂的 AI 应用。

集成架构概述

安装依赖

bash

pip install langgraph langchain modelcontextprotocol

完整示例:数据分析工作流

1. 首先创建 MCP Server

go

// mcp_server.go
package main

import (
	"context"
	"fmt"
	"log"

	mcp "github.com/modelcontextprotocol/sdk-go"
)

func main() {
	server := mcp.NewServer("data-analytics-server", "1.0.0")

	// 提供销售数据资源
	server.AddResourceHandler("sales-data", func(ctx context.Context, method string, params map[string]any) (mcp.ResourceResult, error) {
		if method == "list" {
			return mcp.ResourceResult{
				Contents: []mcp.ResourceContent{
					{URI: "sales:///2024/q1.csv"},
					{URI: "sales:///2024/q2.csv"},
				},
			}, nil
		}

		// 模拟销售数据
		salesData := `日期,产品,销售额,数量
2024-01-10,产品A,12000,24
2024-01-15,产品B,8500,17
2024-02-08,产品A,15000,30
2024-02-20,产品C,9200,18
2024-03-05,产品B,11000,22
2024-03-18,产品A,13500,27`

		return mcp.ResourceResult{
			Contents: []mcp.ResourceContent{
				{
					URI:      "sales:///2024/q1.csv",
					Content:  salesData,
					MimeType: "text/csv",
				},
			},
		}, nil
	})

	// 添加数据分析工具
	server.AddToolHandler("calculate-metrics", func(ctx context.Context, params map[string]any) (mcp.ToolResult, error) {
		data, ok := params["data"].(string)
		if !ok {
			return mcp.ToolResult{}, fmt.Errorf("data parameter is required")
		}

		// 简单的指标计算(实际中会更复杂)
		totalSales, avgSale, productCount := calculateSalesMetrics(data)

		result := fmt.Sprintf("总销售额: $%.2f\n平均销售额: $%.2f\n产品种类数: %d", 
			totalSales, avgSale, productCount)

		return mcp.ToolResult{
			Content: []mcp.Content{
				{
					Type: "text",
					Text: result,
				},
			},
		}, nil
	}, mcp.ParamSchema{
		Type: mcp.JSONSchemaObject,
		Properties: map[string]mcp.ParamSchema{
			"data": {
				Type:        mcp.JSONSchemaString,
				Description: "CSV格式的销售数据",
			},
		},
		Required: []string{"data"},
	})

	if err := server.Run(); err != nil {
		log.Fatalf("Server error: %v", err)
	}
}

func calculateSalesMetrics(data string) (float64, float64, int) {
	// 简化的计算逻辑
	lines := strings.Split(data, "\n")
	total := 0.0
	count := 0
	products := make(map[string]bool)
	
	for i, line := range lines {
		if i == 0 { // 跳过标题行
			continue
		}
		fields := strings.Split(line, ",")
		if len(fields) >= 3 {
			sales, _ := strconv.ParseFloat(fields[2], 64)
			total += sales
			count++
			products[fields[1]] = true
		}
	}
	
	avg := 0.0
	if count > 0 {
		avg = total / float64(count)
	}
	
	return total, avg, len(products)
}

2. LangGraph 工作流集成

python

# langgraph_mcp_integration.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
import requests
import json
import subprocess
import asyncio

# 状态定义
class AnalysisState(TypedDict):
    user_query: str
    raw_data: str
    metrics: str
    analysis_result: str
    next_step: str

# 启动 MCP Server
def start_mcp_server():
    """启动 Go MCP Server"""
    process = subprocess.Popen(
        ["./mcp_server"],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    return process

# MCP 客户端类
class MCPClient:
    def __init__(self, process):
        self.process = process
    
    def send_request(self, method: str, params: dict = None):
        """发送 JSON-RPC 请求到 MCP Server"""
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": method,
            "params": params or {}
        }
        
        # 发送请求
        self.process.stdin.write(json.dumps(request) + "\n")
        self.process.stdin.flush()
        
        # 读取响应
        response_line = self.process.stdout.readline()
        return json.loads(response_line)
    
    def get_sales_data(self):
        """获取销售数据"""
        response = self.send_request("readResource", {
            "name": "sales-data",
            "arguments": {"name": "sales:///2024/q1.csv"}
        })
        return response.get("result", {}).get("contents", [{}])[0].get("content", "")
    
    def calculate_metrics(self, data: str):
        """计算指标"""
        response = self.send_request("callTool", {
            "name": "calculate-metrics",
            "arguments": {"data": data}
        })
        return response.get("result", {}).get("content", [{}])[0].get("text", "")

# 定义工作流节点
def retrieve_data(state: AnalysisState, mcp_client: MCPClient):
    """从 MCP Server 获取数据"""
    print("📊 从 MCP Server 获取数据...")
    raw_data = mcp_client.get_sales_data()
    return {"raw_data": raw_data}

def calculate_metrics(state: AnalysisState, mcp_client: MCPClient):
    """使用 MCP Server 计算指标"""
    print("🧮 计算销售指标...")
    metrics = mcp_client.calculate_metrics(state["raw_data"])
    return {"metrics": metrics}

def analyze_results(state: AnalysisState, llm):
    """使用大模型分析结果"""
    print("🤖 AI 分析数据...")
    
    prompt = f"""
    根据以下销售数据和指标,为用户的问题提供详细分析:
    
    用户问题: {state['user_query']}
    
    原始数据:
    {state['raw_data']}
    
    计算指标:
    {state['metrics']}
    
    请提供:
    1. 关键洞察和趋势分析
    2. 业务建议
    3. 潜在的风险和机会
    
    用中文回复,保持专业但易于理解。
    """
    
    # 这里使用 LangChain 的 LLM,实际中可以是 OpenAI、Anthropic 等
    response = llm.invoke(prompt)
    return {"analysis_result": response.content}

def decide_next_step(state: AnalysisState):
    """决定下一步操作"""
    print("🔍 决定下一步...")
    
    # 简单的决策逻辑
    if "详细" in state["user_query"] or "深入" in state["user_query"]:
        return {"next_step": "detailed_analysis"}
    elif "预测" in state["user_query"] or "未来" in state["user_query"]:
        return {"next_step": "forecast"}
    else:
        return {"next_step": "complete"}

def detailed_analysis(state: AnalysisState, llm):
    """详细分析(如果需要)"""
    print("🔎 进行详细分析...")
    
    prompt = f"""
    基于之前的分析,进行更深入的研究:
    
    {state['analysis_result']}
    
    请提供更详细的数据洞察和具体的行动建议。
    """
    
    response = llm.invoke(prompt)
    return {"analysis_result": state["analysis_result"] + "\n\n详细分析:\n" + response.content}

# 构建工作流
def create_workflow(llm):
    """创建 LangGraph 工作流"""
    workflow = StateGraph(AnalysisState)
    
    # 添加节点
    workflow.add_node("retrieve_data", lambda state: retrieve_data(state, mcp_client))
    workflow.add_node("calculate_metrics", lambda state: calculate_metrics(state, mcp_client))
    workflow.add_node("analyze_results", lambda state: analyze_results(state, llm))
    workflow.add_node("decide_next_step", decide_next_step)
    workflow.add_node("detailed_analysis", lambda state: detailed_analysis(state, llm))
    
    # 设置入口点
    workflow.set_entry_point("retrieve_data")
    
    # 添加边
    workflow.add_edge("retrieve_data", "calculate_metrics")
    workflow.add_edge("calculate_metrics", "analyze_results")
    workflow.add_edge("analyze_results", "decide_next_step")
    
    # 条件边
    workflow.add_conditional_edges(
        "decide_next_step",
        lambda state: state["next_step"],
        {
            "detailed_analysis": "detailed_analysis",
            "forecast": "detailed_analysis",  # 简化处理
            "complete": END
        }
    )
    workflow.add_edge("detailed_analysis", END)
    
    return workflow.compile()

# 主执行函数
async def main():
    # 启动 MCP Server
    mcp_process = start_mcp_server()
    mcp_client = MCPClient(mcp_process)
    
    # 初始化 LLM (这里用假定的 LangChain LLM)
    from langchain.llms import FakeLLM  # 实际中使用真实的 LLM
    llm = FakeLLM()
    
    # 创建和工作流
    workflow = create_workflow(llm)
    
    # 执行工作流
    user_query = "请分析2024年第一季度的销售表现,并提供详细建议"
    
    initial_state = {
        "user_query": user_query,
        "raw_data": "",
        "metrics": "",
        "analysis_result": "",
        "next_step": ""
    }
    
    print("🚀 开始执行工作流...")
    result = await workflow.ainvoke(initial_state)
    
    print("\n" + "="*50)
    print("📋 最终分析结果:")
    print("="*50)
    print(result["analysis_result"])
    
    # 清理
    mcp_process.terminate()

if __name__ == "__main__":
    asyncio.run(main())

更高级的集成模式

动态工具调用

python

# 动态发现和调用 MCP 工具
class DynamicMCPIntegration:
    def __init__(self, mcp_client):
        self.mcp_client = mcp_client
        self.available_tools = self.discover_tools()
    
    def discover_tools(self):
        """发现 MCP Server 提供的所有工具"""
        response = self.mcp_client.send_request("listTools")
        return response.get("result", {}).get("tools", [])
    
    def execute_tool(self, tool_name: str, arguments: dict):
        """动态执行工具"""
        for tool in self.available_tools:
            if tool["name"] == tool_name:
                return self.mcp_client.send_request("callTool", {
                    "name": tool_name,
                    "arguments": arguments
                })
        raise ValueError(f"Tool {tool_name} not found")
    
    def create_tool_node(self, tool_name: str):
        """为 LangGraph 创建动态工具节点"""
        def tool_node(state: dict):
            # 从状态中提取参数
            arguments = self.prepare_arguments(tool_name, state)
            result = self.execute_tool(tool_name, arguments)
            return {f"{tool_name}_result": result}
        
        return tool_node

# 在工作流中动态添加节点
def build_dynamic_workflow(mcp_integration: DynamicMCPIntegration):
    workflow = StateGraph(dict)
    
    for tool in mcp_integration.available_tools:
        node = mcp_integration.create_tool_node(tool["name"])
        workflow.add_node(tool["name"], node)
    
    # 动态构建工作流逻辑...
    return workflow.compile()

配置和部署

Docker 部署

dockerfile

# Dockerfile
FROM golang:1.21

WORKDIR /app
COPY mcp_server.go .
COPY go.mod .
COPY go.sum .

RUN go build -o mcp-server

CMD ["./mcp-server"]

客户端配置

json

// claude_desktop_config.json
{
  "mcpServers": {
    "data-analytics-server": {
      "command": "docker",
      "args": ["run", "--rm", "my-mcp-server:latest"]
    }
  }
}

优势总结

  1. 工作流编排: LangGraph 管理复杂的多步骤流程

  2. 能力扩展: MCP Server 提供专业的数据和处理能力

  3. 灵活组合: 可以动态组合不同的 MCP Server

  4. 错误处理: LangGraph 提供强大的错误处理和重试机制

  5. 状态管理: 保持完整的对话状态和历史

这种组合特别适合需要复杂数据处理、多系统集成、和长期对话状态的 AI 应用场景。

- THE END -

TwoAdmin

9月17日17:53

最后修改:2025年9月17日
0

非特殊说明,本博所有文章均为博主原创。