fastapi 轮训调用

TwoAdmin 2025-9-18 91 9/18

FastAPI 中实现轮训(Round Robin)调用有多种方式。以下是几种常见的实现方法:

1. 简单的轮训负载均衡

基本轮训实现

python

from fastapi import FastAPI, HTTPException
import requests
from typing import List
import itertools

app = FastAPI()

# 后端服务列表
SERVERS = [
    "http://server1:8000",
    "http://server2:8000", 
    "http://server3:8000"
]

# 创建轮训迭代器
server_cycle = itertools.cycle(SERVERS)

def get_next_server():
    """获取下一个服务器"""
    return next(server_cycle)

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def round_robin_proxy(path: str, request: Request):
    """轮训代理所有请求"""
    try:
        # 获取下一个服务器
        target_server = get_next_server()
        target_url = f"{target_server}/{path}"
        
        # 转发请求
        async with httpx.AsyncClient() as client:
            # 复制请求头和体
            headers = dict(request.headers)
            headers.pop("host", None)  # 移除原始host头
            
            if request.method == "GET":
                response = await client.get(
                    target_url,
                    params=dict(request.query_params),
                    headers=headers
                )
            else:
                body = await request.body()
                response = await client.request(
                    request.method,
                    target_url,
                    content=body,
                    headers=headers
                )
        
        # 返回响应
        return Response(
            content=response.content,
            status_code=response.status_code,
            headers=dict(response.headers)
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Proxy error: {str(e)}")

2. 带健康检查的轮训

python

import asyncio
from typing import Dict, List
import aiohttp
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import Response
import itertools

app = FastAPI()

class RoundRobinBalancer:
    def __init__(self, servers: List[str]):
        self.servers = servers
        self.healthy_servers = servers.copy()
        self.cycle = itertools.cycle(self.healthy_servers)
        self.server_status: Dict[str, bool] = {server: True for server in servers}
        
    def get_next_server(self) -> str:
        """获取下一个健康服务器"""
        return next(self.cycle)
    
    async def check_health(self, server: str) -> bool:
        """检查服务器健康状态"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(f"{server}/health", timeout=5) as response:
                    return response.status == 200
        except:
            return False
    
    async def health_check_task(self):
        """定期健康检查任务"""
        while True:
            for server in self.servers:
                is_healthy = await self.check_health(server)
                self.server_status[server] = is_healthy
                
                # 更新健康服务器列表
                self.healthy_servers = [
                    s for s in self.servers if self.server_status[s]
                ]
                
                if self.healthy_servers:
                    self.cycle = itertools.cycle(self.healthy_servers)
                else:
                    # 如果没有健康服务器,使用所有服务器
                    self.cycle = itertools.cycle(self.servers)
            
            await asyncio.sleep(30)  # 每30秒检查一次

# 初始化负载均衡器
balancer = RoundRobinBalancer([
    "http://server1:8000",
    "http://server2:8000",
    "http://server3:8000"
])

# 启动健康检查任务
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(balancer.health_check_task())

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy_request(path: str, request: Request):
    target_server = balancer.get_next_server()
    target_url = f"{target_server}/{path}"
    
    try:
        async with aiohttp.ClientSession() as session:
            # 准备请求数据
            headers = dict(request.headers)
            headers.pop("host", None)
            
            data = await request.body() if request.method != "GET" else None
            
            # 转发请求
            async with session.request(
                method=request.method,
                url=target_url,
                headers=headers,
                data=data,
                params=dict(request.query_params),
                timeout=30
            ) as response:
                
                content = await response.read()
                return Response(
                    content=content,
                    status_code=response.status,
                    headers=dict(response.headers)
                )
                
    except Exception as e:
        raise HTTPException(status_code=502, detail=f"Backend server error: {str(e)}")

3. 基于权重的轮训

python

from collections import defaultdict
import random

class WeightedRoundRobin:
    def __init__(self, servers: Dict[str, int]):
        """
        servers: {"http://server1:8000": 3, "http://server2:8000": 2, "http://server3:8000": 1}
        """
        self.servers = servers
        self.current_index = -1
        self.current_weight = 0
        self.max_weight = max(servers.values())
        self.server_list = list(servers.keys())
        
    def get_next_server(self) -> str:
        """获取下一个服务器(加权轮训算法)"""
        while True:
            self.current_index = (self.current_index + 1) % len(self.server_list)
            if self.current_index == 0:
                self.current_weight = self.current_weight - 1
                if self.current_weight <= 0: self.current_weight = self.max_weight server = self.server_list[self.current_index] if self.servers[server] >= self.current_weight:
                return server

# 使用加权轮训
weighted_balancer = WeightedRoundRobin({
    "http://server1:8000": 3,  # 权重3
    "http://server2:8000": 2,  # 权重2  
    "http://server3:8000": 1   # 权重1
})

4. 微服务场景的轮训发现

python

from consul import Consul
import consul.aio

class ConsulRoundRobin:
    def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
        self.consul = consul.aio.Consul(host=consul_host, port=consul_port)
        self.service_cycles = {}
        
    async def get_service_instance(self, service_name: str) -> str:
        """通过Consul获取服务实例(轮训)"""
        if service_name not in self.service_cycles:
            await self.refresh_services(service_name)
            
        if service_name in self.service_cycles and self.service_cycles[service_name]:
            return next(self.service_cycles[service_name])
        else:
            raise Exception(f"No instances found for service: {service_name}")
    
    async def refresh_services(self, service_name: str):
        """从Consul刷新服务列表"""
        index, instances = await self.consul.health.service(service_name)
        if instances:
            instances_urls = [
                f"http://{instance['Service']['Address']}:{instance['Service']['Port']}"
                for instance in instances
            ]
            self.service_cycles[service_name] = itertools.cycle(instances_urls)

# 使用Consul服务发现
consul_balancer = ConsulRoundRobin()

@app.get("/api/{service_name}/{path:path}")
async def service_proxy(service_name: str, path: str, request: Request):
    try:
        target_base = await consul_balancer.get_service_instance(service_name)
        target_url = f"{target_base}/{path}"
        
        # 转发请求逻辑...
        
    except Exception as e:
        raise HTTPException(status_code=502, detail=str(e))

5. Docker Compose 配置示例

yaml
version: '3.8'
services:
  load-balancer:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - server1
      - server2
      - server3
  
  server1:
    image: your-backend-image
    environment:
      - PORT=8000
  
  server2:
    image: your-backend-image  
    environment:
      - PORT=8000
  
  server3:
    image: your-backend-image
    environment:
      - PORT=8000

关键注意事项

  1. 连接池管理:使用异步HTTP客户端(如 httpx.AsyncClientaiohttp

  2. 超时设置:合理设置请求超时时间

  3. 错误处理:处理后端服务不可用的情况

  4. 健康检查:定期检查后端服务状态

  5. 性能监控:监控各个后端服务的响应时间和错误率

选择适合你场景的实现方式。对于简单的应用,第一种基本轮训就足够了;对于生产环境,建议使用带健康检查的版本。

- THE END -
Tag:

TwoAdmin

9月19日19:18

最后修改:2025年9月19日
0

非特殊说明,本博所有文章均为博主原创。