Async SQLAlchemy (Async Engine, Async Sessions, FastAPI Integration)
Introduction
Modern Python backends demand high performance and concurrency.That’s where Async SQLAlchemy comes in.
With async:
- Handle multiple requests simultaneously
- Improve API performance
- Non-blocking database operations
What is Async SQLAlchemy?
Async SQLAlchemy allows you to:
-
Use
async/awaitsyntax - Run DB queries without blocking the server
- Integrate seamlessly with FastAPI
Important Note
Async SQLAlchemy is available from SQLAlchemy 1.4+
Step 1: Install Dependencies
pip install sqlalchemy aiosqlite asyncpg
For FastAPI:
pip install fastapi uvicorn
Project Structure
app/├── main.py├── database.py├── models.py├── schemas.py└── crud.py
Step 2: Async Database Setup
database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.orm import sessionmaker, declarative_baseDATABASE_URL = "sqlite+aiosqlite:///./test.db"engine = create_async_engine(DATABASE_URL, echo=True)AsyncSessionLocal = sessionmaker(bind=engine,class_=AsyncSession,expire_on_commit=False)Base = declarative_base()
Step 3: Models (Same as Sync)
models.py
from sqlalchemy import Column, Integer, Stringfrom .database import Baseclass User(Base):__tablename__ = "users"id = Column(Integer, primary_key=True, index=True)name = Column(String)email = Column(String, unique=True, index=True)
Step 4: Schemas (Pydantic)
from pydantic import BaseModelclass UserCreate(BaseModel):name: stremail: strclass UserResponse(BaseModel):id: intname: stremail: strclass Config:orm_mode = TrueStep 5: Async CRUD Operations
crud.pyfrom sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from . import models async def create_user(db: AsyncSession, user): db_user = models.User(name=user.name, email=user.email) db.add(db_user) await db.commit() await db.refresh(db_user) return db_user async def get_users(db: AsyncSession): result = await db.execute(select(models.User)) return result.scalars().all() async def get_user(db: AsyncSession, user_id: int): result = await db.execute( select(models.User).where(models.User.id == user_id) ) return result.scalar_one_or_none() async def delete_user(db: AsyncSession, user_id: int): user = await get_user(db, user_id) if user: await db.delete(user) await db.commit() return userStep 6: FastAPI Integration
main.pyAPI Endpoints (Async)
Create User
@app.post("/users", response_model=schemas.UserResponse)
async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_db)):
return await crud.create_user(db, user)Get All Users
@app.get("/users", response_model=list[schemas.UserResponse])
async def read_users(db: AsyncSession = Depends(get_db)):
return await crud.get_users(db)Get Single User
@app.get("/users/{user_id}", response_model=schemas.UserResponse)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
user = await crud.get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return userDelete User
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)):
user = await crud.delete_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "User deleted"}Sync vs Async Comparison
Feature Sync Async Execution Blocking Non-blocking Performance Good Better (high load) Complexity Simple Moderate Best For Small apps Scalable apps Common Mistakes
- Forgetting
await- Mixing sync & async sessions
- Using blocking DB drivers
- Not using
async withReal-World Tips
- Use PostgreSQL + asyncpg in production
- Avoid heavy CPU tasks in async routes
- Use connection pooling
- Monitor query performance
- Async SQLAlchemy improves scalability
- Uses
async/awaitsyntax- Integrates perfectly with FastAPI
- Ideal for high-performance APIs
Comments
Post a Comment