FastAPI快速入门-Python现代Web框架

一、为什么选择FastAPI?

FastAPI是一个现代、快速的Python Web框架,用于构建API。它基于Python类型提示,自动生成API文档。

1.1 FastAPI优势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌─────────────────────────────────────────────────────────────────┐
│ FastAPI特点 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ⚡ 快 → 性能接近Node.js和Go
│ │
│ 📝 方便 → 自动生成API文档 │
│ │
│ 🛡️ 安全 → 自动数据验证 │
│ │
│ 🔄 现代化 → 支持异步、类型提示 │
│ │
│ 📚 文档 → Swagger UI + ReDoc │
│ │
└─────────────────────────────────────────────────────────────────┘

1.2 性能对比

框架 性能
FastAPI ⚡⚡⚡⚡⚡
Starlette ⚡⚡⚡⚡
Flask ⚡⚡⚡
Django ⚡⚡
Express.js ⚡⚡⚡⚡

1.3 学习路线

1
Python基础 → FastAPI入门 → 数据验证 → 数据库 → 认证授权 → 部署

二、安装FastAPI

2.1 安装依赖

1
2
3
4
5
6
7
8
9
10
11
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Linux/macOS
# 或
venv\Scripts\activate # Windows

# 安装FastAPI和uvicorn
pip install fastapi uvicorn

# 安装所有依赖(可选)
pip install fastapi[all]

2.2 安装可选依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 数据验证
pip install pydantic

# 数据库
pip install sqlalchemy databases

# 认证
pip install python-jose passlib[bcrypt]

# 邮件
pip install aiosmtplib

# 异步HTTP客户端
pip install httpx

# Redis
pip install redis

三、快速开始

3.1 第一个API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# main.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
return {"message": "Hello, FastAPI!"}

@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id, "name": f"Item {item_id}"}

# 启动命令
# uvicorn main:app --reload
1
2
3
4
5
6
7
# 运行
uvicorn main:app --reload

# 访问
# http://127.0.0.1:8000
# API文档: http://127.0.0.1:8000/docs
# ReDoc: http://127.0.0.1:8000/redoc

3.2 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
myproject/
├── main.py # 主应用
├── routers/ # 路由目录
│ ├── __init__.py
│ └── items.py
├── models/ # 数据模型
│ └── __init__.py
├── schemas/ # Pydantic模型
│ └── __init__.py
├── crud/ # 业务逻辑
│ └── __init__.py
├── database.py # 数据库配置
├── requirements.txt # 依赖
└── .env # 环境变量

四、路由与参数

4.1 路径参数

1
2
3
4
5
6
7
8
9
10
11
from fastapi import FastAPI

app = FastAPI()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
return {"user_id": user_id, "username": f"user_{user_id}"}

@app.get("/posts/{post_id}")
async def get_post(post_id: int):
return {"post_id": post_id, "title": f"Post {post_id}"}

4.2 查询参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import FastAPI, Query

app = FastAPI()

# 必需参数
@app.get("/items/")
async def read_items(skip: int = 0, limit: int = 10):
return {"skip": skip, "limit": limit}

# 可选参数
@app.get("/users/")
async def read_users(name: str | None = None):
if name:
return {"name": name}
return {"users": ["user1", "user2"]}

# 带验证
@app.get("/products/")
async def read_products(
q: str = Query(default="", min_length=3, max_length=50),
page: int = Query(default=1, ge=1)
):
return {"q": q, "page": page}

4.3 请求体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

# 定义模型
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None

@app.post("/items/")
async def create_item(item: Item):
# 包含tax返回完整信息
if item.tax:
price_with_tax = item.price + item.tax
return {"item": item, "price_with_tax": price_with_tax}
return {"item": item}

# 嵌套模型
class User(BaseModel):
username: str
email: str
full_name: str | None = None

class Order(BaseModel):
user: User
items: list[Item]
status: str = "pending"

@app.post("/orders/")
async def create_order(order: Order):
return {"order": order}

4.4 表单数据

1
2
3
4
5
6
7
from fastapi import FastAPI, Form

app = FastAPI()

@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
return {"username": username}

4.5 文件上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import FileResponse

app = FastAPI()

# 单文件上传
@app.post("/upload/")
async def upload_file(file: UploadFile = File()):
contents = await file.read()
# 保存文件
with open(f"uploads/{file.filename}", "wb") as f:
f.write(contents)
return {"filename": file.filename, "size": len(contents)}

# 多文件上传
@app.post("/upload/multiple/")
async def upload_multiple(files: list[UploadFile] = File()):
results = []
for file in files:
contents = await file.read()
results.append({"filename": file.filename, "size": len(contents)})
return {"files": results}

# 下载文件
@app.get("/download/{filename}")
async def download_file(filename: str):
return FileResponse(f"uploads/{filename}", filename=filename)

五、数据验证

5.1 Pydantic基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pydantic import BaseModel, Field, validator
from typing import Optional
from datetime import datetime

class User(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: str
age: int = Field(..., ge=0, le=150)
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.now)

@validator('email')
def validate_email(cls, v):
if '@' not in v:
raise ValueError('Invalid email')
return v.lower()

# 使用
user = User(
username="john",
email="John@example.com",
age=25
)
print(user.username) # john
print(user.email) # john@example.com

5.2 嵌套验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pydantic import BaseModel, Field
from typing import List

class Item(BaseModel):
name: str
price: float

class Order(BaseModel):
user_id: int
items: List[Item]
total: float = 0

# 自动计算总价
def calculate_total(self):
self.total = sum(item.price for item in self.items)
return self.total

# 测试
order = Order(
user_id=1,
items=[
Item(name="Apple", price=5.0),
Item(name="Banana", price=3.0)
]
)
order.calculate_total()
print(order.total) # 8.0

5.3 响应模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pydantic import BaseModel
from typing import Optional

class UserIn(BaseModel):
username: str
email: str
password: str

class UserOut(BaseModel):
username: str
email: str
# 不返回password

class UserInDB(BaseModel):
username: str
hashed_password: str

# 响应时排除敏感字段
@app.post("/users/", response_model=UserOut)
async def create_user(user: UserIn):
# 存储到数据库(密码需要hash)
return user

六、数据库操作

6.1 使用SQLAlchemy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
# PostgreSQL: "postgresql://user:password@postgresserver/db"

engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# models.py
from sqlalchemy import Column, Integer, String, Float
from database import Base

class Item(Base):
__tablename__ = "items"

id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
price = Column(Float)
description = Column(String, nullable=True)

# 创建表
Base.metadata.create_all(bind=engine)

# crud.py
from sqlalchemy.orm import Session
from models import Item
from schemas import ItemCreate

def get_items(db: Session, skip: int = 0, limit: int = 10):
return db.query(Item).offset(skip).limit(limit).all()

def create_item(db: Session, item: ItemCreate):
db_item = Item(**item.dict())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item

# main.py
from fastapi import Depends, FastAPI
from sqlalchemy.orm import Session
from database import SessionLocal, engine, Base
from routers import items

Base.metadata.create_all(bind=engine)

app = FastAPI()

# 依赖注入
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

app.include_router(items.router, prefix="/items", tags=["items"])

6.2 使用异步SQLAlchemy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# async_database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite+aiosqlite:///./test.db"
# PostgreSQL: "postgresql+asyncpg://user:password@localhost/db"

engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=True)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)

async def get_db():
async with async_session() as session:
yield session

# 异步CRUD
async def get_items(db: AsyncSession):
result = await db.execute(select(Item))
return result.scalars().all()

七、认证与授权

7.1 OAuth2密码模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from datetime import datetime, timedelta
from jose import JWTError, jwt

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

# 验证密码
def verify_password(plain_password: str, hashed_password: str):
return pwd_context.verify(plain_password, hashed_password)

# Hash密码
def get_password_hash(password: str):
return pwd_context.hash(password)

# 创建Token
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

# 获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
return username

# 登录接口
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 验证用户名密码
user = fake_users_db.get(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(status_code=400, detail="Incorrect username or password")

access_token = create_access_token(
data={"sub": user["username"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}

# 受保护的路由
@app.get("/users/me")
async def read_users_me(current_user: str = Depends(get_current_user)):
return {"username": current_user}

7.2 JWT认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 完整的JWT实现
from pydantic import BaseModel

class Token(BaseModel):
access_token: str
token_type: str

class TokenData(BaseModel):
username: str | None = None

# 登录后返回Token
@app.post("/login", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# ...验证逻辑
access_token = create_access_token(data={"sub": form_data.username})
return {"access_token": access_token, "token_type": "bearer"}

八、中间件

8.1 CORS中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # 允许的源
allow_credentials=True,
allow_methods=["*"], # 允许的方法
allow_headers=["*"], # 允许的头
)

8.2 自定义中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time

class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response

app.add_middleware(TimingMiddleware)

九、错误处理

9.1 自定义异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)

@app.get("/items/{item_id}")
async def read_item(item_id: int):
if item_id == 0:
raise HTTPException(status_code=404, detail="Item not found")
return {"item_id": item_id}

9.2 全局异常处理

1
2
3
4
5
6
7
8
9
10
11
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)

十、依赖注入

10.1 基础依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from fastapi import FastAPI, Depends

app = FastAPI()

# 简单依赖
def common_parameters(q: str | None = None, limit: int = 100):
return {"q": q, "limit": limit}

@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons

# 类依赖
class CommonQueryParams:
def __init__(self, q: str | None = None, limit: int = 100):
self.q = q
self.limit = limit

@app.get("/users/")
async def read_users(params: CommonQueryParams = Depends(CommonQueryParams)):
return {"q": params.q, "limit": params.limit}

10.2 多个依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
# 依赖链
def get_username():
return "admin"

def get_password():
return "secret"

def authenticate(username: str = Depends(get_username), password: str = Depends(get_password)):
return {"username": username, "authenticated": True}

@app.get("/protected")
async def protected_route(auth: dict = Depends(authenticate)):
return auth

十一、实战案例:TODO应用

11.1 项目结构

1
2
3
4
5
6
7
todoapp/
├── main.py
├── models.py
├── schemas.py
├── crud.py
├── database.py
└── requirements.txt

11.2 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from database import engine, SessionLocal, Base
from models import Todo
from schemas import TodoCreate, TodoResponse
from crud import (
get_todos,
get_todo,
create_todo,
update_todo,
delete_todo
)

Base.metadata.create_all(bind=engine)

app = FastAPI(title="TODO API")

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

@app.get("/todos", response_model=List[TodoResponse])
def read_todos(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
todos = get_todos(db, skip=skip, limit=limit)
return todos

@app.get("/todos/{todo_id}", response_model=TodoResponse)
def read_todo(todo_id: int, db: Session = Depends(get_db)):
todo = get_todo(db, todo_id)
if todo is None:
raise HTTPException(status_code=404, detail="Todo not found")
return todo

@app.post("/todos", response_model=TodoResponse)
def create_new_todo(todo: TodoCreate, db: Session = Depends(get_db)):
return create_todo(db, todo)

@app.put("/todos/{todo_id}", response_model=TodoResponse)
def update_existing_todo(todo_id: int, todo: TodoCreate, db: Session = Depends(get_db)):
return update_todo(db, todo_id, todo)

@app.delete("/todos/{todo_id}")
def delete_existing_todo(todo_id: int, db: Session = Depends(get_db)):
delete_todo(db, todo_id)
return {"message": "Deleted successfully"}

# schemas.py
from pydantic import BaseModel

class TodoCreate(BaseModel):
title: str
description: str | None = None
completed: bool = False

class TodoResponse(TodoCreate):
id: int

class Config:
from_attributes = True

# crud.py
from sqlalchemy.orm import Session
from models import Todo
from schemas import TodoCreate

def get_todos(db: Session, skip: int = 0, limit: int = 100):
return db.query(Todo).offset(skip).limit(limit).all()

def get_todo(db: Session, todo_id: int):
return db.query(Todo).filter(Todo.id == todo_id).first()

def create_todo(db: Session, todo: TodoCreate):
db_todo = Todo(**todo.dict())
db.add(db_todo)
db.commit()
db.refresh(db_todo)
return db_todo

def update_todo(db: Session, todo_id: int, todo: TodoCreate):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo:
db_todo.title = todo.title
db_todo.description = todo.description
db_todo.completed = todo.completed
db.commit()
db.refresh(db_todo)
return db_todo

def delete_todo(db: Session, todo_id: int):
db_todo = db.query(Todo).filter(Todo.id == todo_id).first()
if db_todo:
db.delete(db_todo)
db.commit()
return db_todo

十二、部署

12.1 使用Uvicorn

1
2
3
4
5
6
7
8
# 基本运行
uvicorn main:app

# 生产模式
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

# HTTPS
uvicorn main:app --ssl-keyfile=key.pem --ssl-certfile=cert.pem

12.2 使用Gunicorn

1
2
3
4
5
# 安装
pip install gunicorn

# 运行
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker

12.3 Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

常用命令速查

命令 说明
uvicorn main:app --reload 开发模式启动
uvicorn main:app --host 0.0.0.0 生产模式启动
uvicorn main:app --workers 4 多进程启动
python -m uvicorn main:app 另一种启动方式

参考资料


持续更新中…欢迎收藏!

#FastAPI #Python #Web框架 #API #异步


FastAPI快速入门-Python现代Web框架
https://r0f2.my/post/33-fastapi-quickstart/
作者
JA
发布于
2026年2月13日
许可协议