在 FastAPI 中处理异步调用同步方法,而同步方法内部又需要调用多个异步方法的情况,有几种解决方案。以下是详细的实现方法:
方法一:使用 asyncio.run() 在同步方法中(不推荐)
python
from fastapi import FastAPI
import asyncio
import time
app = FastAPI()
# 异步方法
async def async_task_1():
await asyncio.sleep(1)
return "任务1完成"
async def async_task_2():
await asyncio.sleep(2)
return "任务2完成"
# 同步方法调用多个异步方法
def sync_method():
# 注意:这在已有事件循环的环境中会报错
result1 = asyncio.run(async_task_1())
result2 = asyncio.run(async_task_2())
return f"{result1}, {result2}"
@app.get("/")
async def root():
result = sync_method() # 这会报错!
return {"result": result}
问题:在 FastAPI 的异步环境中使用 asyncio.run() 会报错,因为已经有一个运行中的事件循环。
方法二:使用 asyncio.create_task() 和异步包装器(推荐)
python
from fastapi import FastAPI
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
# 异步方法
async def async_task_1():
await asyncio.sleep(1)
return "任务1完成"
async def async_task_2():
await asyncio.sleep(2)
return "任务2完成"
# 同步方法(假设这是第三方库或遗留代码)
def sync_method():
# 模拟一些同步工作
time.sleep(0.5)
return "同步方法完成"
# 异步包装器
async def run_sync_method():
# 在线程池中运行同步方法
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, sync_method)
return result
# 并行运行多个异步任务
async def run_async_tasks_parallel():
# 创建任务并行执行
task1 = asyncio.create_task(async_task_1())
task2 = asyncio.create_task(async_task_2())
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
return results
@app.get("/sequential")
async def sequential_execution():
"""顺序执行:先同步方法,再异步任务"""
sync_result = await run_sync_method()
async_results = await run_async_tasks_parallel()
return {
"sync_result": sync_result,
"async_results": async_results
}
@app.get("/parallel")
async def parallel_execution():
"""并行执行:同步方法和异步任务同时进行"""
# 同时启动同步和异步任务
sync_task = asyncio.create_task(run_sync_method())
async_task = asyncio.create_task(run_async_tasks_parallel())
# 等待所有任务完成
sync_result, async_results = await asyncio.gather(sync_task, async_task)
return {
"sync_result": sync_result,
"async_results": async_results
}
方法三:使用背景任务和依赖注入
python
from fastapi import FastAPI, BackgroundTasks, Depends
import asyncio
import time
from typing import List
app = FastAPI()
class TaskService:
def __init__(self):
self.results = []
# 同步方法
def process_data_sync(self, data: str) -> str:
time.sleep(1) # 模拟同步处理
return f"处理完成: {data}"
# 异步方法
async def process_data_async(self, data: str) -> str:
await asyncio.sleep(1)
return f"异步处理完成: {data}"
# 依赖注入
def get_task_service() -> TaskService:
return TaskService()
async def run_sync_in_thread(func, *args):
"""在线程池中运行同步函数"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
@app.post("/process")
async def process_tasks(
data: List[str],
background_tasks: BackgroundTasks,
service: TaskService = Depends(get_task_service)
):
results = []
for item in data:
# 先运行同步处理
sync_result = await run_sync_in_thread(service.process_data_sync, item)
# 然后运行异步处理
async_result = await service.process_data_async(item)
results.append({
"input": item,
"sync_result": sync_result,
"async_result": async_result
})
return {"results": results}
# 批量处理版本
@app.post("/process-batch")
async def process_batch_tasks(
data: List[str],
service: TaskService = Depends(get_task_service)
):
async def process_single_item(item: str):
# 并行执行同步和异步处理
sync_task = asyncio.create_task(
run_sync_in_thread(service.process_data_sync, item)
)
async_task = asyncio.create_task(
service.process_data_async(item)
)
sync_result, async_result = await asyncio.gather(sync_task, async_task)
return {
"input": item,
"sync_result": sync_result,
"async_result": async_result
}
# 并行处理所有项目
tasks = [process_single_item(item) for item in data]
results = await asyncio.gather(*tasks)
return {"results": results}
方法四:使用高级模式(类封装)
python
from fastapi import FastAPI
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, List
app = FastAPI()
class HybridProcessor:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)
# 同步方法(可能来自第三方库)
def heavy_sync_operation(self, data: str) -> str:
"""耗时的同步操作"""
time.sleep(1)
return f"同步处理: {data.upper()}"
# 异步方法
async def light_async_operation(self, data: str) -> str:
"""轻量的异步操作"""
await asyncio.sleep(0.5)
return f"异步处理: {data.lower()}"
# 包装同步方法为异步
async def run_sync_async(self, data: str) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor, self.heavy_sync_operation, data
)
# 组合操作
async def process_hybrid(self, data: List[str]) -> List[dict]:
async def process_item(item: str):
# 并行执行同步和异步操作
sync_task = asyncio.create_task(self.run_sync_async(item))
async_task = asyncio.create_task(self.light_async_operation(item))
sync_result, async_result = await asyncio.gather(sync_task, async_task)
return {
"original": item,
"sync_result": sync_result,
"async_result": async_result
}
# 并行处理所有项目
tasks = [process_item(item) for item in data]
return await asyncio.gather(*tasks)
def __del__(self):
self.executor.shutdown()
# 全局实例
processor = HybridProcessor()
@app.post("/hybrid-process")
async def hybrid_process(data: List[str]):
results = await processor.process_hybrid(data)
return {"results": results}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
关键要点
不要在有事件循环的环境中用 asyncio.run() - 这会引发 RuntimeError
使用 loop.run_in_executor() - 将同步方法放在线程池中执行
利用 asyncio.gather() - 并行执行多个异步任务
合理使用 asyncio.create_task() - 创建并发任务
注意资源管理 - 特别是线程池的管理
性能建议
对于 I/O 密集型操作,优先使用异步
对于 CPU 密集型操作,使用线程池执行同步代码
避免在异步上下文中直接调用阻塞的同步方法
合理设置线程池大小,避免资源耗尽
这种模式可以让你在 FastAPI 的异步环境中灵活地处理同步和异步代码的混合调用。
- THE END -
最后修改:2025年9月23日
非特殊说明,本博所有文章均为博主原创。