It is Black Friday. ShelfWise traffic spikes to 200 concurrent requests per instance. Your create_async_engine() call uses SQLAlchemy’s defaults: pool_size=5, max_overflow=10. That is 15 connections total. The other 185 coroutines are queued, waiting for a connection to become available. After 30 seconds, pool_timeout fires and they all get TimeoutError. Your load balancer sees 5xx responses, retries them, which adds more requests to the queue. Within 90 seconds, every instance is unresponsive.
This is pool exhaustion — the number one cause of production database failures in Python async applications. It is not a database problem. The database is fine. It is a connection pool problem, and it is entirely preventable with correct configuration.
The Connection Pool Lifecycle
Every database operation in an async SQLAlchemy application follows the same lifecycle:
The pool maintains a set of persistent connections (pool_size). When all persistent connections are in use, it creates temporary overflow connections up to max_overflow. When overflow connections are returned, they are disposed — not kept in the pool. If both the pool and overflow are exhausted, the next checkout blocks until pool_timeout expires.
Production Engine Configuration
Here is the create_async_engine configuration for ShelfWise, with every parameter justified:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from src.core.config import settings
def create_engine() -> AsyncEngine: """Create the primary async engine with production pool settings.""" return create_async_engine( settings.database.url.get_secret_value(),
# ── Pool sizing ────────────────────────────────────────── pool_size=20, # Persistent connections kept open max_overflow=30, # Temporary connections under burst # Total max: 20 + 30 = 50 connections # Rule of thumb: 1 connection handles ~1 concurrent query # 50 connections ≈ 50 concurrent database operations per instance
# ── Timeouts ───────────────────────────────────────────── pool_timeout=10, # Seconds to wait for a connection before raising # Default is 30 — too long. A user waiting 30s has already left. # 10s is enough for burst absorption without hiding pool exhaustion.
# ── Connection health ──────────────────────────────────── pool_pre_ping=True, # SELECT 1 before every checkout # Adds ~1ms latency per checkout. Worth it. # Without it, a connection dropped by the database (idle timeout, # failover, network blip) causes a hard error on the NEXT query. # With it, the dead connection is silently replaced.
# ── Connection recycling ───────────────────────────────── pool_recycle=1800, # Replace connections older than 30 minutes # PostgreSQL and PgBouncer have idle connection timeouts. # Recycling before those timeouts prevents "connection reset" errors. # Also prevents long-lived connections from accumulating memory.
# ── Diagnostics ────────────────────────────────────────── echo=settings.database.echo, # SQL logging (off in prod) echo_pool="debug" if settings.debug else False, )| Traffic Level | pool_size | max_overflow | pool_timeout | Handles (concurrent) |
|---|---|---|---|---|
| Dev / staging | 5 | 5 | 30 | ~10 requests |
| Low traffic (< 50 RPS) | 10 | 10 | 15 | ~20 requests |
| Medium traffic (50-200 RPS) | 20 | 30 | 10 | ~50 requests |
| High traffic (200-1000 RPS) | 30 | 50 | 5 | ~80 requests |
| Extreme (1000+ RPS) | Use PgBouncer | Pool in PgBouncer | 3 | Thousands |
The numbers follow a formula: pool_size + max_overflow should be less than your PostgreSQL max_connections divided by the number of application instances. If PostgreSQL allows 200 connections and you run 4 instances, each instance gets at most 50 connections — so pool_size=20, max_overflow=30 is the ceiling.
Request-Scoped Session Lifecycle
Every request gets its own session. The session is created at the start of the request, used for all database operations during that request, and closed when the request ends. This is not optional — sharing sessions across requests is a race condition.
from collections.abc import AsyncIterator
from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker,)
from src.db.engine import create_engine
engine = create_engine()AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, # Prevent lazy loads after commit)
async def get_session() -> AsyncIterator[AsyncSession]: """FastAPI dependency that provides a request-scoped session.
The session is committed on success, rolled back on exception, and always closed — guaranteeing the connection returns to the pool. """ async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raiseThe FastAPI dependency wires it into route handlers:
from typing import Annotated
from fastapi import Dependsfrom sqlalchemy.ext.asyncio import AsyncSession
from src.db.session import get_session
SessionDep = Annotated[AsyncSession, Depends(get_session)]from src.api.deps import SessionDep
@router.get("/books/{book_id}")async def get_book(book_id: int, session: SessionDep): # session is created for this request, closed after response result = await session.execute( select(Book).where(Book.id == book_id) ) return result.scalar_one_or_none()The critical guarantee: the async with AsyncSessionLocal() as session block ensures the session is closed even if the route handler raises an exception. A closed session returns its connection to the pool. If sessions leak (not closed), connections leak, and pool exhaustion follows.
Read/Write Splitting
ShelfWise’s read traffic outweighs writes 10:1. Catalog browsing, search results, book detail pages — all reads. Writes happen on order placement, inventory updates, and admin operations. Sending all traffic to a single primary database wastes the read replicas you are paying for.
# src/db/engine.py (extended)from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from src.core.config import settings
def create_primary_engine() -> AsyncEngine: """Engine for write operations — connects to the primary.""" return create_async_engine( settings.database.primary_url.get_secret_value(), pool_size=10, max_overflow=15, pool_timeout=10, pool_pre_ping=True, pool_recycle=1800, )
def create_replica_engine() -> AsyncEngine: """Engine for read operations — connects to a read replica.
Larger pool because reads dominate traffic. Shorter recycle because replicas may rotate during maintenance. """ return create_async_engine( settings.database.replica_url.get_secret_value(), pool_size=25, max_overflow=40, pool_timeout=10, pool_pre_ping=True, pool_recycle=900, # 15 minutes — replicas rotate more often )The session factory routes reads and writes to different engines:
# src/db/session.py (extended)from collections.abc import AsyncIterator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from src.db.engine import create_primary_engine, create_replica_engine
primary_engine = create_primary_engine()replica_engine = create_replica_engine()
PrimarySession = async_sessionmaker(primary_engine, class_=AsyncSession, expire_on_commit=False)ReplicaSession = async_sessionmaker(replica_engine, class_=AsyncSession, expire_on_commit=False)
async def get_write_session() -> AsyncIterator[AsyncSession]: """Session routed to the primary database (reads + writes).""" async with PrimarySession() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
async def get_read_session() -> AsyncIterator[AsyncSession]: """Session routed to the read replica (reads only).
Do NOT use this session for writes — they will either fail (read-only replica) or create split-brain inconsistency. """ async with ReplicaSession() as session: yield session # No commit — reads do not need itRoute handlers declare their intent:
# Read endpoint — routed to replica@router.get("/books/")async def list_books( session: Annotated[AsyncSession, Depends(get_read_session)],): result = await session.execute(select(Book).limit(50)) return result.scalars().all()
# Write endpoint — routed to primary@router.post("/books/")async def create_book( data: BookCreate, session: Annotated[AsyncSession, Depends(get_write_session)],): book = Book(**data.model_dump()) session.add(book) await session.flush() return bookCircuit Breaker on the Connection Pool
When the database goes down, every request tries to connect, waits for pool_timeout, and fails. Meanwhile, the application is accumulating requests in the queue, consuming memory, and providing no useful service. A circuit breaker detects the failure pattern and fails fast — immediately returning an error without attempting a connection.
import asyncioimport timeimport loggingfrom enum import StrEnumfrom collections.abc import AsyncIteratorfrom contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger("shelfwise.db")
class CircuitState(StrEnum): CLOSED = "closed" # Normal operation — requests flow through OPEN = "open" # Failing — reject immediately HALF_OPEN = "half_open" # Testing — allow one request through
class DatabaseCircuitBreaker: """Circuit breaker for database connections.
Transitions: - CLOSED → OPEN: after `failure_threshold` consecutive failures - OPEN → HALF_OPEN: after `recovery_timeout` seconds - HALF_OPEN → CLOSED: on first success - HALF_OPEN → OPEN: on first failure """
def __init__( self, failure_threshold: int = 5, recovery_timeout: float = 30.0, ): self._failure_threshold = failure_threshold self._recovery_timeout = recovery_timeout self._failure_count = 0 self._last_failure_time = 0.0 self._state = CircuitState.CLOSED
@property def state(self) -> CircuitState: if self._state == CircuitState.OPEN: # Check if recovery timeout has elapsed if time.monotonic() - self._last_failure_time >= self._recovery_timeout: self._state = CircuitState.HALF_OPEN logger.info("Circuit breaker → HALF_OPEN (testing recovery)") return self._state
def record_success(self) -> None: self._failure_count = 0 if self._state == CircuitState.HALF_OPEN: self._state = CircuitState.CLOSED logger.info("Circuit breaker → CLOSED (recovered)")
def record_failure(self) -> None: self._failure_count += 1 self._last_failure_time = time.monotonic() if self._failure_count >= self._failure_threshold: self._state = CircuitState.OPEN logger.warning( "Circuit breaker → OPEN after %d failures (blocking requests for %ds)", self._failure_count, self._recovery_timeout, )
@asynccontextmanager async def protect( self, session_factory ) -> AsyncIterator[AsyncSession]: """Wrap session creation with circuit breaker logic.""" if self.state == CircuitState.OPEN: raise ConnectionError( "Database circuit breaker is OPEN. " "Failing fast to prevent cascade." )
try: async with session_factory() as session: yield session self.record_success() except Exception as exc: self.record_failure() raise
# Global instancedb_circuit_breaker = DatabaseCircuitBreaker( failure_threshold=5, recovery_timeout=30.0,)Integrate with the session dependency:
# src/db/session.py (with circuit breaker)from src.db.circuit_breaker import db_circuit_breaker
async def get_session() -> AsyncIterator[AsyncSession]: async with db_circuit_breaker.protect(AsyncSessionLocal) as session: try: yield session await session.commit() except Exception: await session.rollback() raiseWhen the database is down, the first 5 requests fail normally (waiting for pool_timeout). The 6th request and all subsequent ones fail immediately with ConnectionError — no waiting, no queue buildup. After 30 seconds, the circuit breaker enters HALF_OPEN and allows one request through. If it succeeds, the breaker closes and traffic resumes. If it fails, the breaker reopens for another 30 seconds.
PgBouncer: When Application Pooling Is Not Enough
At extreme scale — thousands of requests per second across dozens of application instances — application-level pooling hits a wall. Each instance maintains its own pool, and the aggregate connection count can exceed PostgreSQL’s max_connections (default: 100). PgBouncer sits between your application and PostgreSQL, multiplexing thousands of application connections onto a smaller set of database connections.
When using PgBouncer in transaction mode (the recommended mode for modern applications), each application connection is mapped to a PostgreSQL connection only for the duration of a transaction. Between transactions, the PostgreSQL connection is returned to PgBouncer’s pool and can be used by other application connections.
[databases]shelfwise = host=postgres port=5432 dbname=shelfwise
[pgbouncer]listen_port = 6432pool_mode = transaction ; connection released after each transactionmax_client_conn = 1000 ; total client connections PgBouncer acceptsdefault_pool_size = 50 ; PostgreSQL connections per databasereserve_pool_size = 10 ; extra connections for burstreserve_pool_timeout = 3 ; seconds before using reserve poolserver_idle_timeout = 300 ; close idle server connections after 5 minWith PgBouncer in the path, your application pool settings change:
# With PgBouncer — application pool is now "client connections to PgBouncer"engine = create_async_engine( "postgresql+asyncpg://pgbouncer:6432/shelfwise", pool_size=5, # Small — PgBouncer does the real pooling max_overflow=10, pool_timeout=5, # Short — PgBouncer queues are faster pool_pre_ping=True, pool_recycle=600, # PgBouncer recycles too, so keep this shorter)Schema-Per-Tenant: Connection Pool Interactions
For ShelfWise enterprise tenants using schema-per-tenant isolation (from Part 9), the pool needs to set search_path on every session checkout. The session event from Part 9 extends naturally:
from sqlalchemy import event, textfrom sqlalchemy.pool import Pool
from src.core.context import get_current_tenant, is_system_context
@event.listens_for(Pool, "checkout")def set_tenant_search_path(dbapi_conn, connection_record, connection_proxy): """Set PostgreSQL search_path to the tenant's schema on checkout.
This runs every time a connection is checked out from the pool, ensuring the correct schema regardless of which connection we get. """ if is_system_context(): # System context uses the public schema dbapi_conn.execute("SET search_path TO public") return
try: tenant = get_current_tenant() except RuntimeError: # No tenant context (health check, startup) — use public dbapi_conn.execute("SET search_path TO public") return
schema_name = f"tenant_{tenant.slug}" # Parameterized to prevent SQL injection via tenant slug dbapi_conn.execute(f"SET search_path TO {schema_name}, public")This interacts with PgBouncer’s transaction mode: because PgBouncer releases the server connection after each transaction, the SET search_path must be issued at the start of every transaction, not just at checkout. The pool checkout event fires at the right time — when the application acquires a connection from the pool, which in PgBouncer’s case happens at transaction start.
Monitoring Pool Health
A pool that silently exhausts is worse than one that fails loudly. Instrument the pool to expose metrics:
import loggingfrom sqlalchemy import eventfrom sqlalchemy.pool import Pool
logger = logging.getLogger("shelfwise.pool")
@event.listens_for(Pool, "checkout")def on_checkout(dbapi_conn, connection_record, connection_proxy): pool = connection_proxy._pool logger.debug( "Pool checkout", extra={ "pool_size": pool.size(), "checked_in": pool.checkedin(), "checked_out": pool.checkedout(), "overflow": pool.overflow(), }, )
@event.listens_for(Pool, "checkin")def on_checkin(dbapi_conn, connection_record): logger.debug("Pool checkin")
def get_pool_stats(engine) -> dict: """Expose pool stats for health check endpoints and metrics.""" pool = engine.pool return { "pool_size": pool.size(), "checked_in": pool.checkedin(), "checked_out": pool.checkedout(), "overflow": pool.overflow(), "utilization_pct": round( pool.checkedout() / (pool.size() + pool.overflow()) * 100 if (pool.size() + pool.overflow()) > 0 else 0, 1, ), }Expose it in the health check endpoint:
@router.get("/health")async def health(): stats = get_pool_stats(engine) status = "healthy" if stats["utilization_pct"] < 80 else "degraded" return {"status": status, "pool": stats}Set alerts at 70% pool utilization. At 80%, you are one traffic spike away from exhaustion. At 90%, you are already in the incident.
Key Takeaways
pool_size=5default is a time bomb. Any non-trivial production workload needs explicit pool configuration. Start withpool_size=20, max_overflow=30for medium traffic and tune from there.pool_pre_ping=Trueis non-negotiable. The 1ms cost per checkout prevents hard errors from stale connections. Without it, database failovers and network blips cause immediate application errors.pool_timeout=10fails fast. The default 30 seconds means users wait half a minute before seeing an error. 10 seconds absorbs short bursts without hiding pool exhaustion.- Request-scoped sessions guarantee connection return. The
async withblock ensures the session is closed and the connection returns to the pool even if the route handler raises. - Read/write splitting offloads 90% of queries to read replicas. Route catalog browsing and search to replicas, orders and updates to the primary.
- Circuit breakers prevent cascade failures. When the database is down, fail fast instead of queuing requests that will all timeout. Five failures trigger the breaker; 30 seconds later it tests recovery.
- PgBouncer handles extreme scale. When aggregate application connections exceed PostgreSQL’s limits, PgBouncer multiplexes thousands of clients onto dozens of database connections.
- Monitor pool utilization, alert at 70%. A pool at 80% is one traffic spike from exhaustion. Metrics and alerts are cheaper than incidents.
ShelfWise now has a database layer that handles Black Friday traffic without cascading failures. The connection pool is sized for production, reads are split to replicas, the circuit breaker prevents cascade, and PgBouncer handles the multiplexing at extreme scale. The next post in the series tackles the API layer: rate limiting, pagination, and versioning for a multi-tenant SaaS.