FastAPI + SQLModel 的配置和 CRUD 操作。
1. 环境配置和安装
首先安装必要的依赖:
pip install fastapi sqlmodel uvicorn
database.py
from contextlib import contextmanager
from sqlmodel import create_engine, Session
from app.config import db_config
config_data = db_config
DATABASE_URL=f'mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset=utf8mb4'
# 创建引擎配置
engine = create_engine(
DATABASE_URL,
echo=True, # 显示 SQL 语句
pool_size=10,
max_overflow=20,
pool_pre_ping=True # 连接池预检查
)
# 依赖注入 - 获取数据库会话
@contextmanager
def get_session():
"""数据库会话上下文管理器"""
session = Session(engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
models __init__.py
from .interview_report import InterviewReport
# 确保所有模型都被导出
__all__ = ["InterviewReport"]
interview_report.py
import datetime
from typing import Optional
from sqlmodel import SQLModel, Field
class InterviewReport(SQLModel, table=True):
__tablename__ = "interview_report"
id: Optional[int] = Field(default=None, primary_key=True)
interview_id: Optional[int] = Field(default=None, description='面试ID')
content: str = Field(description='报告内容', default='')
is_receive: Optional[int] = Field(default=0, description='是否获取')
is_delete: Optional[int] = Field(default=0, description='是否删除')
created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
receive_at: datetime.datetime = Field(default=None)
deleted_at: datetime.datetime = Field(default=None)
查询
# 先查询是否已存在
with get_session() as session:
existing_report = session.exec(
select(InterviewReport).where(InterviewReport.interview_id == interview_id)
).first()
增加
content_json = json.dumps(final_report, ensure_ascii=False)
try:
with get_session() as session: # 使用 with 语句,不要用 next()
db_report = InterviewReport(
interview_id=interview_id,
content=content_json,
is_receive=0,
is_delete=0,
receive_at=None,
deleted_at=None
)
session.add(db_report)
session.flush()
logging.info(f"-- 面试ID :{interview_id}-入库成功,ID: {db_report.id}")
except Exception as e:
logging.info(f"-- 面试ID :{interview_id}-入库失败,错误: {str(e)}")
main.py
import logging
import os
from contextlib import asynccontextmanager
import uvicorn
from fastapi import FastAPI,Request
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse
from app.loger import setup_logger
from sdk.limiter import limiter
from sdk.middleware.auth_middleware import auth_middleware
from routers import portrait_router, questions_router, search_router, interview_router, interview_report, comet_router
# 生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
app = FastAPI(
lifespan=lifespan
)
# Add CORS middleware to allow cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.middleware("http")(auth_middleware)
# 自定义限流响应
@app.exception_handler(RateLimitExceeded)
async def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={
"code": 429,
"message": "您的操作过于频繁,6秒内只允许一次请求,请稍后再试",
"retry_after": f"{6}秒",
}
)
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware) # 必须添加
def configure_routers(app:FastAPI, prefix=''):
app.include_router(portrait_router.router, prefix=prefix)
app.include_router(questions_router.router, prefix=prefix)
app.include_router(search_router.router, prefix=prefix)
app.include_router(interview_router.router, prefix=prefix)
app.include_router(interview_report.router, prefix=prefix)
app.include_router(comet_router.router, prefix=prefix)
configure_routers(app)
current_dir = os.path.dirname(os.path.abspath(__file__))
setup_logger(current_dir)
# 主函数入口
if __name__ == '__main__':
# 启动FastAPI应用
from app.config import app_config
config = app_config
uvicorn.run(app, host='0.0.0.0', port=config['port'], workers=1)
- THE END -
最后修改:2025年10月27日
非特殊说明,本博所有文章均为博主原创。