langgraph同时使用tool和mcp
定义智能体的状态 (State)
from typing import TypedDict, Annotated, List
from langgraph.graph import add_messages
class AgentState(TypedDict):
"""
定义图的状态。它会作为数据载体在所有节点之间传递。
"""
userID: str
messages: Annotated[List,add_messages]
error: str # 记录错误信息(可选)
# 每个工具的执行结果
job_info:str
web_search_query:str
web_search_result: str
# 下一步要执行的动作
next_step: str
iteration_count: int # 新增:迭代计数器
max_iterations: int # 新增:最大迭代次数
should_continue: bool # 新增:是否继续执行
mcp 客户端
import json
import httpx
import subprocess
import os
from fastapi import FastAPI, HTTPException
from typing import Dict, Any, Optional
import logging
from app.config import comet_config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="MCP客户端", description="用于与MCP服务器通信的客户端")
MCP_SSE_URL = comet_config['mcp_uri']
MCP_SERVER_PATH = os.getenv("MCP_SERVER_PATH", "/Users/wangye/data/go/mcp_youself/mcp")
@app.get("/")
async def root():
return {"message": "MCP客户端服务已启动"}
async def call_stdio_tool(mcp_request: Dict, env: Dict):
"""通过子进程调用Stdio模式的MCP服务器"""
try:
if not os.path.exists(MCP_SERVER_PATH):
raise FileNotFoundError(f"MCP服务器可执行文件不存在: {MCP_SERVER_PATH}")
logger.info(f"启动Stdio模式MCP服务器: {MCP_SERVER_PATH}")
# 启动MCP服务器进程
process = subprocess.Popen(
[MCP_SERVER_PATH, "-t", "stdio"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
bufsize=1
)
# 发送请求
request_json = json.dumps(mcp_request)
stdout, stderr = process.communicate(input=request_json + "\n")
if stderr:
logger.warning(f"MCP服务器标准错误: {stderr}")
if stdout:
try:
response = json.loads(stdout.strip())
if "result" in response:
return response["result"]
elif "error" in response:
raise HTTPException(status_code=400, detail=response["error"])
return {"response": response}
except json.JSONDecodeError:
return {"result": stdout.strip()}
else:
raise HTTPException(status_code=500, detail="MCP服务器无响应")
except Exception as e:
logger.error(f"Stdio模式调用失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"Stdio模式调用失败: {str(e)}")
async def call_sse_tool(payload: Dict, auth_token: Optional[str] = None):
"""通过SSE模式调用MCP服务器"""
try:
# 设置请求头
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
mcp_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params":payload
}
if auth_token:
headers["Authorization"] = auth_token
# 发送SSE请求
async with httpx.AsyncClient(timeout=30.0) as client:
# 首先检查SSE服务器是否健康
'''try:
health_response = await client.get(f"{MCP_SSE_URL}/health", timeout=50.0)
if health_response.status_code != 200:
raise HTTPException(status_code=500, detail="MCP SSE服务器不可用")
except httpx.RequestError:
raise HTTPException(status_code=500, detail="无法连接到MCP SSE服务器")'''
print("mcp_request", mcp_request)
# 发送工具调用请求
response = await client.post(
f"{MCP_SSE_URL}/tools/call",
json=mcp_request,
headers=headers
)
print("response", response)
if response.status_code == 200:
# 解析SSE响应(可能是多个事件)
return response.json()
else:
error_detail = f"SSE请求失败: {response.status_code}"
try:
error_data = response.json()
error_detail = error_data.get("error", error_detail)
except:
pass
raise HTTPException(status_code=response.status_code, detail=error_detail)
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="MCP服务器响应超时")
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"无法连接到MCP服务器: {str(e)}")
except Exception as e:
logger.error(f"SSE模式调用失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"SSE模式调用失败: {str(e)}")
async def parse_sse_response(response: httpx.Response) -> Dict:
"""解析SSE响应"""
content = response.text
logger.debug(f"SSE响应内容: {content}")
# SSE格式通常是多行文本,我们需要找到包含JSON的数据行
lines = content.strip().split('\n')
for line in lines:
if line.startswith('data:'):
try:
json_data = json.loads(line[5:].strip()) # 去掉"data:"前缀
return json_data
except json.JSONDecodeError:
continue
# 如果没有找到有效的JSON数据,尝试直接解析整个响应
try:
return json.loads(content)
except json.JSONDecodeError:
return {"raw_response": content}
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"message": "MCP客户端运行正常"
}
@app.get("/test-sse")
async def test_sse():
"""测试SSE连接"""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{MCP_SSE_URL}/health")
return {
"status": "success",
"sse_status": response.status_code,
"sse_response": response.json() if response.status_code == 200 else None
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
@app.get("/server-info")
async def server_info():
"""获取服务器信息"""
return {
"sse_url": MCP_SSE_URL,
"environment": dict(os.environ)
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
- THE END -
最后修改:2025年10月21日
非特殊说明,本博所有文章均为博主原创。