langgraph同时使用tool和mcp(一)

TwoAdmin 2025-9-25 70 9/25

langgraph同时使用tool和mcp

定义智能体的状态 (State)

from typing import TypedDict, Annotated, List

from langgraph.graph import add_messages

class AgentState(TypedDict):
    """
    定义图的状态。它会作为数据载体在所有节点之间传递。
    """
    
    userID: str
    messages: Annotated[List,add_messages]
    error: str # 记录错误信息(可选)
    # 每个工具的执行结果
    job_info:str
    web_search_query:str
    web_search_result: str
    
    # 下一步要执行的动作
    next_step: str
    iteration_count: int # 新增:迭代计数器
    max_iterations: int # 新增:最大迭代次数
    should_continue: bool # 新增:是否继续执行

mcp 客户端


import json
import httpx
import subprocess
import os
from fastapi import FastAPI, HTTPException
from typing import Dict, Any, Optional
import logging

from app.config import comet_config

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="MCP客户端", description="用于与MCP服务器通信的客户端")
MCP_SSE_URL =  comet_config['mcp_uri']
MCP_SERVER_PATH = os.getenv("MCP_SERVER_PATH", "/Users/wangye/data/go/mcp_youself/mcp")





@app.get("/")
async def root():
    return {"message": "MCP客户端服务已启动"}




async def call_stdio_tool(mcp_request: Dict, env: Dict):
    """通过子进程调用Stdio模式的MCP服务器"""
    try:
        if not os.path.exists(MCP_SERVER_PATH):
            raise FileNotFoundError(f"MCP服务器可执行文件不存在: {MCP_SERVER_PATH}")

        logger.info(f"启动Stdio模式MCP服务器: {MCP_SERVER_PATH}")

        # 启动MCP服务器进程
        process = subprocess.Popen(
            [MCP_SERVER_PATH, "-t", "stdio"],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            env=env,
            bufsize=1
        )

        # 发送请求
        request_json = json.dumps(mcp_request)
        stdout, stderr = process.communicate(input=request_json + "\n")

        if stderr:
            logger.warning(f"MCP服务器标准错误: {stderr}")

        if stdout:
            try:
                response = json.loads(stdout.strip())
                if "result" in response:
                    return response["result"]
                elif "error" in response:
                    raise HTTPException(status_code=400, detail=response["error"])
                return {"response": response}
            except json.JSONDecodeError:
                return {"result": stdout.strip()}
        else:
            raise HTTPException(status_code=500, detail="MCP服务器无响应")

    except Exception as e:
        logger.error(f"Stdio模式调用失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Stdio模式调用失败: {str(e)}")


async def call_sse_tool(payload: Dict, auth_token: Optional[str] = None):
    """通过SSE模式调用MCP服务器"""
    try:
        # 设置请求头
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json"
        }
        mcp_request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tools/call",
            "params":payload
            }
        if auth_token:
            headers["Authorization"] = auth_token

        # 发送SSE请求
        async with httpx.AsyncClient(timeout=30.0) as client:
            # 首先检查SSE服务器是否健康
            '''try:
                health_response = await client.get(f"{MCP_SSE_URL}/health", timeout=50.0)

                if health_response.status_code != 200:
                    raise HTTPException(status_code=500, detail="MCP SSE服务器不可用")
            except httpx.RequestError:
                raise HTTPException(status_code=500, detail="无法连接到MCP SSE服务器")'''
            print("mcp_request", mcp_request)
            # 发送工具调用请求
            response = await client.post(
                f"{MCP_SSE_URL}/tools/call",
                json=mcp_request,
                headers=headers
            )
            print("response", response)
            if response.status_code == 200:
                # 解析SSE响应(可能是多个事件)
                return response.json()
            else:
                error_detail = f"SSE请求失败: {response.status_code}"
                try:
                    error_data = response.json()
                    error_detail = error_data.get("error", error_detail)
                except:
                    pass
                raise HTTPException(status_code=response.status_code, detail=error_detail)

    except httpx.TimeoutException:
        raise HTTPException(status_code=504, detail="MCP服务器响应超时")
    except httpx.RequestError as e:
        raise HTTPException(status_code=503, detail=f"无法连接到MCP服务器: {str(e)}")
    except Exception as e:
        logger.error(f"SSE模式调用失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"SSE模式调用失败: {str(e)}")


async def parse_sse_response(response: httpx.Response) -> Dict:
    """解析SSE响应"""
    content = response.text
    logger.debug(f"SSE响应内容: {content}")

    # SSE格式通常是多行文本,我们需要找到包含JSON的数据行
    lines = content.strip().split('\n')
    for line in lines:
        if line.startswith('data:'):
            try:
                json_data = json.loads(line[5:].strip())  # 去掉"data:"前缀
                return json_data
            except json.JSONDecodeError:
                continue

    # 如果没有找到有效的JSON数据,尝试直接解析整个响应
    try:
        return json.loads(content)
    except json.JSONDecodeError:
        return {"raw_response": content}
@app.get("/health")
async def health_check():
    """健康检查端点"""
    return {
        "status": "healthy",
        "message": "MCP客户端运行正常"
    }

@app.get("/test-sse")
async def test_sse():
    """测试SSE连接"""
    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            response = await client.get(f"{MCP_SSE_URL}/health")
            return {
                "status": "success",
                "sse_status": response.status_code,
                "sse_response": response.json() if response.status_code == 200 else None
            }
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

@app.get("/server-info")
async def server_info():
    """获取服务器信息"""
    return {
        "sse_url": MCP_SSE_URL,
        "environment": dict(os.environ)
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
- THE END -
Tag:

TwoAdmin

10月21日13:39

最后修改:2025年10月21日
0

非特殊说明,本博所有文章均为博主原创。