USD ($)
$
United States Dollar
Euro Member Countries
India Rupee

SQLAlchemy ORM with Async Support for PostgreSQL/MySQL

Lesson 10/30 | Study Time: 26 Min

SQLAlchemy ORM with async support allows Python applications to interact with relational databases like PostgreSQL and MySQL in a non-blocking, asynchronous manner. Introduced in SQLAlchemy 1.4 and refined in later versions, this approach is designed to work with Python’s asyncio ecosystem and modern async web frameworks.

By using asynchronous database drivers, developers can efficiently manage multiple concurrent database operations, making it well suited for scalable APIs, microservices, and high-performance backend systems.

Prerequisites and Installation

Start with Python 3.9+ and async database drivers to unlock SQLAlchemy's full async capabilities. This setup ensures compatibility with frameworks like FastAPI, central to our web API development course.

Install core packages using pip


1. sqlalchemy[asyncio] for ORM and async engine support

2. asyncpg for high-performance PostgreSQL connections

3. aiomysql for MySQL async operations

bash
pip install sqlalchemy[asyncio] asyncpg aiomysql fastapi

Key benefit: asyncpg's C-based implementation delivers up to 3x faster queries than synchronous drivers, ideal for API endpoints under load.

Async Engine Setup

The async engine replaces traditional create_engine() with create_async_engine() from sqlalchemy.ext.asyncio. Connection strings use dialect prefixes like postgresql+asyncpg:// or mysql+aiomysql:// to enable non-blocking I/O.

Here's a complete configuration example

python
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase

# PostgreSQL example
pg_engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost:5432/mydb",
echo=True, # Logs SQL for debugging
future=True # Enables SQLAlchemy 2.0 features
)

# MySQL example
mysql_engine = create_async_engine(
"mysql+aiomysql://user:password@localhost:3306/mydb",
pool_pre_ping=True # Validates connections
)

class Base(DeclarativeBase):
pass

Always call await engine.dispose() after use to release resources cleanly. Set pool_size=20 and max_overflow=10 for production to manage connection pools efficiently.

Async Session Management

Sessions handle ORM transactions asynchronously via async_sessionmaker. Use context managers (async with) to ensure proper rollback and closure, preventing leaks in concurrent API routes.

Create a session factory

python
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncAttrs
from sqlalchemy.orm import sessionmaker

async_session = async_sessionmaker(pg_engine, class_=AsyncSession, expire_on_commit=False)

Session Patterns

This approach integrates seamlessly with FastAPI's Depends() for per-request sessions.

Defining ORM Models

Models inherit from a Base class with AsyncAttrs for lazy-loading compatibility. Use SQLAlchemy 2.0's Mapped annotations for type safety and IDE autocompletion.

Example User model for a web API

python
from sqlalchemy import String, Integer, DateTime
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime

class User(Base, AsyncAttrs):
__tablename__ = "users"

id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)


1. Supports relationships: posts: Mapped[List["Post"]] = relationship(back_populates="user")

2. Indexes on frequent query fields boost async performance

3. AsyncAttrs enables await session.refresh(user) for updated data

Migrate schemas with Alembic: alembic revision --autogenerate -m "add users" then alembic upgrade head.

CRUD Operations

Async CRUD mirrors sync patterns but requires await on all session methods. This non-blocking behavior shines in FastAPI endpoints handling multiple requests.

Create

python
async def create_user(session: AsyncSession, name: str, email: str):
user = User(name=name, email=email)
session.add(user)
await session.commit()
await session.refresh(user) # Reloads with generated ID
return user


Read

Use select() for modern queries:


1. Fetch all: result = await session.execute(select(User)); users = result.scalars().all()

2. Filter: result = await session.execute(select(User).where(User.email == "test@example.com"))

3. Single: user = await session.get(User, 1) or user = (await session.execute(select(User).where(User.id == 1))).scalar_one_or_none()


Update

python
async def update_user(session: AsyncSession, user_id: int, name: str):
user = await session.get(User, user_id)
if user:
user.name = name
await session.commit()


Delete

await session.delete(user); await session.commit()

Performance tip: Batch operations with session.add_all([user1, user2]) for bulk inserts.

Relationships and Joins

Async handles eager/lazy loading transparently. Use selectinload() for efficient N+1 query avoidance in APIs.


Example with Post model

python
class Post(Base, AsyncAttrs):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
user: Mapped["User"] = relationship(back_populates="posts")

# Query with join
from sqlalchemy.orm import selectinload
stmt = select(Post).options(selectinload(Post.user)).where(Post.user_id == 1)
result = await session.execute(stmt)


1. selectinload: Single round-trip for collections

2. joinedload: Eager Cartesian product for one-to-one

3. Avoid in loops to prevent connection exhaustion

FastAPI Integration

Combine with FastAPI for production web APIs. Use dependencies for scoped sessions.

python
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager

app = FastAPI()

@asynccontextmanager
async def get_db():
async_session = async_sessionmaker(pg_engine)
async with async_session() as session:
try:
yield session
await session.commit()
except:
await session.rollback()
raise
finally:
await session.close()

@app.get("/users/")
async db: AsyncSession = Depends(get_db):
result = await db.execute(select(User))
return result.scalars().all()

Sync vs Async SQLAlchemy

Error Handling and Best Practices

Wrap operations in try/except for SQLAlchemyError. Monitor pool usage with engine.pool.status().

1. Limit session lifespan to request scope

2. Use expire_on_commit=False for detached instances

3. Enable query logging: echo=True

4. Test with pytest-asyncio for async endpoints

5. Scale pools: pool_size=50 for high traffic


Common pitfalls


1. Forgetting await (blocks sync)

2. Nested sessions (use single dependency)

3. Ignoring dispose() (memory leaks)

himanshu singh

himanshu singh

Product Designer
Profile

Class Sessions

1- HTTP Methods and REST Principles 2- Status Codes, Headers, and Request/Response cycles 3- JSON and XML Data Formats for API Payloads 4- Resource Naming Conventions and URI Design Best Practices 5- Statelessness, HATEOAS, and API Versioning Strategies 6- Rate Limiting, Caching, and Idempotency for Scalability 7- FastAPI Setup, Pydantic Models, and Async Endpoint Creation 8- Path/Query Parameters, Request/Response Validation 9- Dependency Injection and Middleware for Authentication/Authorization 10- SQLAlchemy ORM with Async Support for PostgreSQL/MySQL 11- CRUD Operations via API Endpoints with Relationships 12- Database Migrations Using Alembic and Connection Pooling 13- JWT/OAuth2 Implementation with FastAPI Security Utilities 14- File Uploads, Pagination, and Real-Time WebSockets 15- Input Sanitization, CORS, and OWASP Top 10 Defenses 16- Unit/integration testing with Pytest and FastAPI TestClient 17- API Documentation Generation with OpenAPI/Swagger 18- Mocking External Services and Load Testing with Locust 19- Containerization with Docker and Orchestration via Docker Compose 20- Deployment to Cloud Platforms 21- CI/CD Pipelines Using GitHub Actions and Monitoring with Prometheus 22- Consuming APIs in React/Vue.js with Axios/Fetch 23- State Management (Redux/Zustand) for API Data Flows 24- Error Handling, Optimistic Updates, and Frontend Caching Strategies 25- Async Processing with Celery/Redis for Background Tasks 26- Caching Layers (Redis) and Database Query Optimization 27- Microservices Patterns and API Gateways 28- Building a Full-Stack CRUD App with User Auth and File Handling 29- API Analytics, Logging (Structlog), and Error Tracking 30- Code Reviews, Maintainability, and Evolving APIs in Production