在 FastAPI 中实现轮训(Round Robin)调用有多种方式。以下是几种常见的实现方法:
1. 简单的轮训负载均衡
基本轮训实现
from fastapi import FastAPI, HTTPException
import requests
from typing import List
import itertools
app = FastAPI()
# 后端服务列表
SERVERS = [
"http://server1:8000",
"http://server2:8000",
"http://server3:8000"
]
# 创建轮训迭代器
server_cycle = itertools.cycle(SERVERS)
def get_next_server():
"""获取下一个服务器"""
return next(server_cycle)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def round_robin_proxy(path: str, request: Request):
"""轮训代理所有请求"""
try:
# 获取下一个服务器
target_server = get_next_server()
target_url = f"{target_server}/{path}"
# 转发请求
async with httpx.AsyncClient() as client:
# 复制请求头和体
headers = dict(request.headers)
headers.pop("host", None) # 移除原始host头
if request.method == "GET":
response = await client.get(
target_url,
params=dict(request.query_params),
headers=headers
)
else:
body = await request.body()
response = await client.request(
request.method,
target_url,
content=body,
headers=headers
)
# 返回响应
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Proxy error: {str(e)}")
2. 带健康检查的轮训
import asyncio
from typing import Dict, List
import aiohttp
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import Response
import itertools
app = FastAPI()
class RoundRobinBalancer:
def __init__(self, servers: List[str]):
self.servers = servers
self.healthy_servers = servers.copy()
self.cycle = itertools.cycle(self.healthy_servers)
self.server_status: Dict[str, bool] = {server: True for server in servers}
def get_next_server(self) -> str:
"""获取下一个健康服务器"""
return next(self.cycle)
async def check_health(self, server: str) -> bool:
"""检查服务器健康状态"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{server}/health", timeout=5) as response:
return response.status == 200
except:
return False
async def health_check_task(self):
"""定期健康检查任务"""
while True:
for server in self.servers:
is_healthy = await self.check_health(server)
self.server_status[server] = is_healthy
# 更新健康服务器列表
self.healthy_servers = [
s for s in self.servers if self.server_status[s]
]
if self.healthy_servers:
self.cycle = itertools.cycle(self.healthy_servers)
else:
# 如果没有健康服务器,使用所有服务器
self.cycle = itertools.cycle(self.servers)
await asyncio.sleep(30) # 每30秒检查一次
# 初始化负载均衡器
balancer = RoundRobinBalancer([
"http://server1:8000",
"http://server2:8000",
"http://server3:8000"
])
# 启动健康检查任务
@app.on_event("startup")
async def startup_event():
asyncio.create_task(balancer.health_check_task())
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_request(path: str, request: Request):
target_server = balancer.get_next_server()
target_url = f"{target_server}/{path}"
try:
async with aiohttp.ClientSession() as session:
# 准备请求数据
headers = dict(request.headers)
headers.pop("host", None)
data = await request.body() if request.method != "GET" else None
# 转发请求
async with session.request(
method=request.method,
url=target_url,
headers=headers,
data=data,
params=dict(request.query_params),
timeout=30
) as response:
content = await response.read()
return Response(
content=content,
status_code=response.status,
headers=dict(response.headers)
)
except Exception as e:
raise HTTPException(status_code=502, detail=f"Backend server error: {str(e)}")
3. 基于权重的轮训
from collections import defaultdict
import random
class WeightedRoundRobin:
def __init__(self, servers: Dict[str, int]):
"""
servers: {"http://server1:8000": 3, "http://server2:8000": 2, "http://server3:8000": 1}
"""
self.servers = servers
self.current_index = -1
self.current_weight = 0
self.max_weight = max(servers.values())
self.server_list = list(servers.keys())
def get_next_server(self) -> str:
"""获取下一个服务器(加权轮训算法)"""
while True:
self.current_index = (self.current_index + 1) % len(self.server_list)
if self.current_index == 0:
self.current_weight = self.current_weight - 1
if self.current_weight <= 0: self.current_weight = self.max_weight server = self.server_list[self.current_index] if self.servers[server] >= self.current_weight:
return server
# 使用加权轮训
weighted_balancer = WeightedRoundRobin({
"http://server1:8000": 3, # 权重3
"http://server2:8000": 2, # 权重2
"http://server3:8000": 1 # 权重1
})
4. 微服务场景的轮训发现
from consul import Consul
import consul.aio
class ConsulRoundRobin:
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.aio.Consul(host=consul_host, port=consul_port)
self.service_cycles = {}
async def get_service_instance(self, service_name: str) -> str:
"""通过Consul获取服务实例(轮训)"""
if service_name not in self.service_cycles:
await self.refresh_services(service_name)
if service_name in self.service_cycles and self.service_cycles[service_name]:
return next(self.service_cycles[service_name])
else:
raise Exception(f"No instances found for service: {service_name}")
async def refresh_services(self, service_name: str):
"""从Consul刷新服务列表"""
index, instances = await self.consul.health.service(service_name)
if instances:
instances_urls = [
f"http://{instance['Service']['Address']}:{instance['Service']['Port']}"
for instance in instances
]
self.service_cycles[service_name] = itertools.cycle(instances_urls)
# 使用Consul服务发现
consul_balancer = ConsulRoundRobin()
@app.get("/api/{service_name}/{path:path}")
async def service_proxy(service_name: str, path: str, request: Request):
try:
target_base = await consul_balancer.get_service_instance(service_name)
target_url = f"{target_base}/{path}"
# 转发请求逻辑...
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
5. Docker Compose 配置示例
version: '3.8' services: load-balancer: build: . ports: - "8000:8000" depends_on: - server1 - server2 - server3 server1: image: your-backend-image environment: - PORT=8000 server2: image: your-backend-image environment: - PORT=8000 server3: image: your-backend-image environment: - PORT=8000
关键注意事项
-
连接池管理:使用异步HTTP客户端(如
httpx.AsyncClient或aiohttp) -
超时设置:合理设置请求超时时间
-
错误处理:处理后端服务不可用的情况
-
健康检查:定期检查后端服务状态
-
性能监控:监控各个后端服务的响应时间和错误率
选择适合你场景的实现方式。对于简单的应用,第一种基本轮训就足够了;对于生产环境,建议使用带健康检查的版本。
- THE END -
最后修改:2025年9月19日
非特殊说明,本博所有文章均为博主原创。