Tutorial

Clean Code Python: Connection Pooling and Database Resilience Under Load

Your pool_size=5 default just met Black Friday traffic. 200 concurrent requests, 5 connections, 195 coroutines waiting on pool checkout — then timeouts cascade into a full outage. Here is how to configure connection pooling, read replicas, and circuit breakers for production traffic.

Tin Dang avatar
Tin Dang
Highway interchange system with multiple lanes merging and diverging representing connection routing and pooling

It is Black Friday. ShelfWise traffic spikes to 200 concurrent requests per instance. Your create_async_engine() call uses SQLAlchemy’s defaults: pool_size=5, max_overflow=10. That is 15 connections total. The other 185 coroutines are queued, waiting for a connection to become available. After 30 seconds, pool_timeout fires and they all get TimeoutError. Your load balancer sees 5xx responses, retries them, which adds more requests to the queue. Within 90 seconds, every instance is unresponsive.

This is pool exhaustion — the number one cause of production database failures in Python async applications. It is not a database problem. The database is fine. It is a connection pool problem, and it is entirely preventable with correct configuration.

The Connection Pool Lifecycle

Every database operation in an async SQLAlchemy application follows the same lifecycle:

The pool maintains a set of persistent connections (pool_size). When all persistent connections are in use, it creates temporary overflow connections up to max_overflow. When overflow connections are returned, they are disposed — not kept in the pool. If both the pool and overflow are exhausted, the next checkout blocks until pool_timeout expires.

Production Engine Configuration

Here is the create_async_engine configuration for ShelfWise, with every parameter justified:

src/db/engine.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from src.core.config import settings
def create_engine() -> AsyncEngine:
"""Create the primary async engine with production pool settings."""
return create_async_engine(
settings.database.url.get_secret_value(),
# ── Pool sizing ──────────────────────────────────────────
pool_size=20, # Persistent connections kept open
max_overflow=30, # Temporary connections under burst
# Total max: 20 + 30 = 50 connections
# Rule of thumb: 1 connection handles ~1 concurrent query
# 50 connections ≈ 50 concurrent database operations per instance
# ── Timeouts ─────────────────────────────────────────────
pool_timeout=10, # Seconds to wait for a connection before raising
# Default is 30 — too long. A user waiting 30s has already left.
# 10s is enough for burst absorption without hiding pool exhaustion.
# ── Connection health ────────────────────────────────────
pool_pre_ping=True, # SELECT 1 before every checkout
# Adds ~1ms latency per checkout. Worth it.
# Without it, a connection dropped by the database (idle timeout,
# failover, network blip) causes a hard error on the NEXT query.
# With it, the dead connection is silently replaced.
# ── Connection recycling ─────────────────────────────────
pool_recycle=1800, # Replace connections older than 30 minutes
# PostgreSQL and PgBouncer have idle connection timeouts.
# Recycling before those timeouts prevents "connection reset" errors.
# Also prevents long-lived connections from accumulating memory.
# ── Diagnostics ──────────────────────────────────────────
echo=settings.database.echo, # SQL logging (off in prod)
echo_pool="debug" if settings.debug else False,
)
Traffic Levelpool_sizemax_overflowpool_timeoutHandles (concurrent)
Dev / staging 5 5 30 ~10 requests
Low traffic (< 50 RPS) 10 10 15 ~20 requests
Medium traffic (50-200 RPS) 20 30 10 ~50 requests
High traffic (200-1000 RPS) 30 50 5 ~80 requests
Extreme (1000+ RPS) Use PgBouncer Pool in PgBouncer 3 Thousands

The numbers follow a formula: pool_size + max_overflow should be less than your PostgreSQL max_connections divided by the number of application instances. If PostgreSQL allows 200 connections and you run 4 instances, each instance gets at most 50 connections — so pool_size=20, max_overflow=30 is the ceiling.

Request-Scoped Session Lifecycle

Every request gets its own session. The session is created at the start of the request, used for all database operations during that request, and closed when the request ends. This is not optional — sharing sessions across requests is a race condition.

src/db/session.py
from collections.abc import AsyncIterator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
)
from src.db.engine import create_engine
engine = create_engine()
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Prevent lazy loads after commit
)
async def get_session() -> AsyncIterator[AsyncSession]:
"""FastAPI dependency that provides a request-scoped session.
The session is committed on success, rolled back on exception,
and always closed — guaranteeing the connection returns to the pool.
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

The FastAPI dependency wires it into route handlers:

src/api/deps.py
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.session import get_session
SessionDep = Annotated[AsyncSession, Depends(get_session)]
src/api/v1/books.py
from src.api.deps import SessionDep
@router.get("/books/{book_id}")
async def get_book(book_id: int, session: SessionDep):
# session is created for this request, closed after response
result = await session.execute(
select(Book).where(Book.id == book_id)
)
return result.scalar_one_or_none()

The critical guarantee: the async with AsyncSessionLocal() as session block ensures the session is closed even if the route handler raises an exception. A closed session returns its connection to the pool. If sessions leak (not closed), connections leak, and pool exhaustion follows.

Read/Write Splitting

ShelfWise’s read traffic outweighs writes 10:1. Catalog browsing, search results, book detail pages — all reads. Writes happen on order placement, inventory updates, and admin operations. Sending all traffic to a single primary database wastes the read replicas you are paying for.

# src/db/engine.py (extended)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from src.core.config import settings
def create_primary_engine() -> AsyncEngine:
"""Engine for write operations — connects to the primary."""
return create_async_engine(
settings.database.primary_url.get_secret_value(),
pool_size=10,
max_overflow=15,
pool_timeout=10,
pool_pre_ping=True,
pool_recycle=1800,
)
def create_replica_engine() -> AsyncEngine:
"""Engine for read operations — connects to a read replica.
Larger pool because reads dominate traffic.
Shorter recycle because replicas may rotate during maintenance.
"""
return create_async_engine(
settings.database.replica_url.get_secret_value(),
pool_size=25,
max_overflow=40,
pool_timeout=10,
pool_pre_ping=True,
pool_recycle=900, # 15 minutes — replicas rotate more often
)

The session factory routes reads and writes to different engines:

# src/db/session.py (extended)
from collections.abc import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from src.db.engine import create_primary_engine, create_replica_engine
primary_engine = create_primary_engine()
replica_engine = create_replica_engine()
PrimarySession = async_sessionmaker(primary_engine, class_=AsyncSession, expire_on_commit=False)
ReplicaSession = async_sessionmaker(replica_engine, class_=AsyncSession, expire_on_commit=False)
async def get_write_session() -> AsyncIterator[AsyncSession]:
"""Session routed to the primary database (reads + writes)."""
async with PrimarySession() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def get_read_session() -> AsyncIterator[AsyncSession]:
"""Session routed to the read replica (reads only).
Do NOT use this session for writes — they will either fail
(read-only replica) or create split-brain inconsistency.
"""
async with ReplicaSession() as session:
yield session
# No commit — reads do not need it

Route handlers declare their intent:

# Read endpoint — routed to replica
@router.get("/books/")
async def list_books(
session: Annotated[AsyncSession, Depends(get_read_session)],
):
result = await session.execute(select(Book).limit(50))
return result.scalars().all()
# Write endpoint — routed to primary
@router.post("/books/")
async def create_book(
data: BookCreate,
session: Annotated[AsyncSession, Depends(get_write_session)],
):
book = Book(**data.model_dump())
session.add(book)
await session.flush()
return book

Circuit Breaker on the Connection Pool

When the database goes down, every request tries to connect, waits for pool_timeout, and fails. Meanwhile, the application is accumulating requests in the queue, consuming memory, and providing no useful service. A circuit breaker detects the failure pattern and fails fast — immediately returning an error without attempting a connection.

src/db/circuit_breaker.py
import asyncio
import time
import logging
from enum import StrEnum
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger("shelfwise.db")
class CircuitState(StrEnum):
CLOSED = "closed" # Normal operation — requests flow through
OPEN = "open" # Failing — reject immediately
HALF_OPEN = "half_open" # Testing — allow one request through
class DatabaseCircuitBreaker:
"""Circuit breaker for database connections.
Transitions:
- CLOSED → OPEN: after `failure_threshold` consecutive failures
- OPEN → HALF_OPEN: after `recovery_timeout` seconds
- HALF_OPEN → CLOSED: on first success
- HALF_OPEN → OPEN: on first failure
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
):
self._failure_threshold = failure_threshold
self._recovery_timeout = recovery_timeout
self._failure_count = 0
self._last_failure_time = 0.0
self._state = CircuitState.CLOSED
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
# Check if recovery timeout has elapsed
if time.monotonic() - self._last_failure_time >= self._recovery_timeout:
self._state = CircuitState.HALF_OPEN
logger.info("Circuit breaker → HALF_OPEN (testing recovery)")
return self._state
def record_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.CLOSED
logger.info("Circuit breaker → CLOSED (recovered)")
def record_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._failure_count >= self._failure_threshold:
self._state = CircuitState.OPEN
logger.warning(
"Circuit breaker → OPEN after %d failures (blocking requests for %ds)",
self._failure_count,
self._recovery_timeout,
)
@asynccontextmanager
async def protect(
self, session_factory
) -> AsyncIterator[AsyncSession]:
"""Wrap session creation with circuit breaker logic."""
if self.state == CircuitState.OPEN:
raise ConnectionError(
"Database circuit breaker is OPEN. "
"Failing fast to prevent cascade."
)
try:
async with session_factory() as session:
yield session
self.record_success()
except Exception as exc:
self.record_failure()
raise
# Global instance
db_circuit_breaker = DatabaseCircuitBreaker(
failure_threshold=5,
recovery_timeout=30.0,
)

Integrate with the session dependency:

# src/db/session.py (with circuit breaker)
from src.db.circuit_breaker import db_circuit_breaker
async def get_session() -> AsyncIterator[AsyncSession]:
async with db_circuit_breaker.protect(AsyncSessionLocal) as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

When the database is down, the first 5 requests fail normally (waiting for pool_timeout). The 6th request and all subsequent ones fail immediately with ConnectionError — no waiting, no queue buildup. After 30 seconds, the circuit breaker enters HALF_OPEN and allows one request through. If it succeeds, the breaker closes and traffic resumes. If it fails, the breaker reopens for another 30 seconds.

PgBouncer: When Application Pooling Is Not Enough

At extreme scale — thousands of requests per second across dozens of application instances — application-level pooling hits a wall. Each instance maintains its own pool, and the aggregate connection count can exceed PostgreSQL’s max_connections (default: 100). PgBouncer sits between your application and PostgreSQL, multiplexing thousands of application connections onto a smaller set of database connections.

When using PgBouncer in transaction mode (the recommended mode for modern applications), each application connection is mapped to a PostgreSQL connection only for the duration of a transaction. Between transactions, the PostgreSQL connection is returned to PgBouncer’s pool and can be used by other application connections.

pgbouncer.ini
[databases]
shelfwise = host=postgres port=5432 dbname=shelfwise
[pgbouncer]
listen_port = 6432
pool_mode = transaction ; connection released after each transaction
max_client_conn = 1000 ; total client connections PgBouncer accepts
default_pool_size = 50 ; PostgreSQL connections per database
reserve_pool_size = 10 ; extra connections for burst
reserve_pool_timeout = 3 ; seconds before using reserve pool
server_idle_timeout = 300 ; close idle server connections after 5 min

With PgBouncer in the path, your application pool settings change:

# With PgBouncer — application pool is now "client connections to PgBouncer"
engine = create_async_engine(
"postgresql+asyncpg://pgbouncer:6432/shelfwise",
pool_size=5, # Small — PgBouncer does the real pooling
max_overflow=10,
pool_timeout=5, # Short — PgBouncer queues are faster
pool_pre_ping=True,
pool_recycle=600, # PgBouncer recycles too, so keep this shorter
)

Schema-Per-Tenant: Connection Pool Interactions

For ShelfWise enterprise tenants using schema-per-tenant isolation (from Part 9), the pool needs to set search_path on every session checkout. The session event from Part 9 extends naturally:

src/db/tenant_schema.py
from sqlalchemy import event, text
from sqlalchemy.pool import Pool
from src.core.context import get_current_tenant, is_system_context
@event.listens_for(Pool, "checkout")
def set_tenant_search_path(dbapi_conn, connection_record, connection_proxy):
"""Set PostgreSQL search_path to the tenant's schema on checkout.
This runs every time a connection is checked out from the pool,
ensuring the correct schema regardless of which connection we get.
"""
if is_system_context():
# System context uses the public schema
dbapi_conn.execute("SET search_path TO public")
return
try:
tenant = get_current_tenant()
except RuntimeError:
# No tenant context (health check, startup) — use public
dbapi_conn.execute("SET search_path TO public")
return
schema_name = f"tenant_{tenant.slug}"
# Parameterized to prevent SQL injection via tenant slug
dbapi_conn.execute(f"SET search_path TO {schema_name}, public")

This interacts with PgBouncer’s transaction mode: because PgBouncer releases the server connection after each transaction, the SET search_path must be issued at the start of every transaction, not just at checkout. The pool checkout event fires at the right time — when the application acquires a connection from the pool, which in PgBouncer’s case happens at transaction start.

Monitoring Pool Health

A pool that silently exhausts is worse than one that fails loudly. Instrument the pool to expose metrics:

src/db/pool_metrics.py
import logging
from sqlalchemy import event
from sqlalchemy.pool import Pool
logger = logging.getLogger("shelfwise.pool")
@event.listens_for(Pool, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
pool = connection_proxy._pool
logger.debug(
"Pool checkout",
extra={
"pool_size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
},
)
@event.listens_for(Pool, "checkin")
def on_checkin(dbapi_conn, connection_record):
logger.debug("Pool checkin")
def get_pool_stats(engine) -> dict:
"""Expose pool stats for health check endpoints and metrics."""
pool = engine.pool
return {
"pool_size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"utilization_pct": round(
pool.checkedout() / (pool.size() + pool.overflow()) * 100
if (pool.size() + pool.overflow()) > 0 else 0,
1,
),
}

Expose it in the health check endpoint:

@router.get("/health")
async def health():
stats = get_pool_stats(engine)
status = "healthy" if stats["utilization_pct"] < 80 else "degraded"
return {"status": status, "pool": stats}

Set alerts at 70% pool utilization. At 80%, you are one traffic spike away from exhaustion. At 90%, you are already in the incident.

Key Takeaways

  • pool_size=5 default is a time bomb. Any non-trivial production workload needs explicit pool configuration. Start with pool_size=20, max_overflow=30 for medium traffic and tune from there.
  • pool_pre_ping=True is non-negotiable. The 1ms cost per checkout prevents hard errors from stale connections. Without it, database failovers and network blips cause immediate application errors.
  • pool_timeout=10 fails fast. The default 30 seconds means users wait half a minute before seeing an error. 10 seconds absorbs short bursts without hiding pool exhaustion.
  • Request-scoped sessions guarantee connection return. The async with block ensures the session is closed and the connection returns to the pool even if the route handler raises.
  • Read/write splitting offloads 90% of queries to read replicas. Route catalog browsing and search to replicas, orders and updates to the primary.
  • Circuit breakers prevent cascade failures. When the database is down, fail fast instead of queuing requests that will all timeout. Five failures trigger the breaker; 30 seconds later it tests recovery.
  • PgBouncer handles extreme scale. When aggregate application connections exceed PostgreSQL’s limits, PgBouncer multiplexes thousands of clients onto dozens of database connections.
  • Monitor pool utilization, alert at 70%. A pool at 80% is one traffic spike from exhaustion. Metrics and alerts are cheaper than incidents.

ShelfWise now has a database layer that handles Black Friday traffic without cascading failures. The connection pool is sized for production, reads are split to replicas, the circuit breaker prevents cascade, and PgBouncer handles the multiplexing at extreme scale. The next post in the series tackles the API layer: rate limiting, pagination, and versioning for a multi-tenant SaaS.

0

Next in this series

Clean Code Python: Observability — Structured Logging, Tracing, and Metrics

Continue reading