Tutorial

Clean Code Python: The Repository Pattern with SQLAlchemy

Wrap your database access behind a clean interface. The Repository pattern decouples business logic from SQLAlchemy, makes testing trivial, and gives you a single place to optimize queries.

Tin Dang avatar
Tin Dang
Organized archive shelving system representing structured data storage and retrieval

Your SQLAlchemy session is leaking into your route handlers. Every db.execute(select(Book).where(...)) in a FastAPI endpoint is a query you cannot test without spinning up a database, a migration, and test fixtures. Add a second endpoint that needs the same query with slightly different filtering, and you have just duplicated a SQL expression across two files.

The Repository pattern fixes this by placing all database access — queries, inserts, updates, deletes — behind a class interface. Route handlers call services. Services call repositories. Repositories call SQLAlchemy. The database is invisible to everything except the repository.

What the Repository Pattern Solves

Two architectures, same functionality:

Without the pattern, every layer that needs data talks directly to SQLAlchemy. A test for the service layer requires a real database connection. With the pattern, the test substitutes a fake repository — the service is isolated from the database entirely.

The Full BookRepository Implementation

Building on the Protocol from Part 2, here is the complete BookRepository with eager loading, a search query, integrity error handling, and a count method:

src/repositories/book_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from sqlalchemy.exc import IntegrityError
from src.models.book import Book
from src.models.author import Author
from src.schemas.book import BookCreate, BookResponse, BookWithAuthorResponse
class BookRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get_by_id(self, book_id: int) -> BookWithAuthorResponse | None:
result = await self._session.execute(
select(Book)
.options(selectinload(Book.author))
.where(Book.id == book_id)
)
book = result.scalar_one_or_none()
return BookWithAuthorResponse.model_validate(book) if book else None
async def list_all(
self,
*,
limit: int = 50,
offset: int = 0,
) -> list[BookResponse]:
result = await self._session.execute(
select(Book).limit(limit).offset(offset)
)
return [BookResponse.model_validate(b) for b in result.scalars()]
async def search(self, query: str, *, limit: int = 20) -> list[BookResponse]:
result = await self._session.execute(
select(Book)
.where(Book.title.ilike(f"%{query}%"))
.limit(limit)
)
return [BookResponse.model_validate(b) for b in result.scalars()]
async def create(self, data: BookCreate) -> BookResponse:
book = Book(**data.model_dump())
self._session.add(book)
try:
await self._session.commit()
except IntegrityError:
await self._session.rollback()
raise ValueError(f"Book with ISBN {data.isbn} already exists")
await self._session.refresh(book)
return BookResponse.model_validate(book)
async def delete(self, book_id: int) -> bool:
book = await self._session.get(Book, book_id)
if not book:
return False
await self._session.delete(book)
await self._session.commit()
return True
async def count(self) -> int:
result = await self._session.execute(
select(func.count()).select_from(Book)
)
return result.scalar_one()

Three patterns worth noting in this implementation:

selectinload for related data. get_by_id uses selectinload(Book.author) to fetch the author in a second query batched by SQLAlchemy — not an N+1 query, but also not a JOIN. Use selectinload when you need the related object on a single entity fetch. Use joinedload for list queries where the JOIN reduces round-trips.

IntegrityError handling. The create method catches IntegrityError — the exception raised when the unique constraint on isbn is violated — rolls back the session, and raises a domain exception (ValueError) that the service layer can catch and convert to a 409 Conflict HTTP response. The database error never leaks to the API layer.

scalar_one_or_none vs scalar_one. scalar_one_or_none() returns None when no row matches. scalar_one() raises NoResultFound when no row matches and MultipleResultsFound when more than one matches. Use scalar_one_or_none for lookup by ID (user may request a missing ID). Use scalar_one for lookups that must exist (e.g., fetching the current user by session token).

Transaction Management

The repository handles single-operation transactions internally (each method commits its own changes). When a service needs to perform multiple operations atomically — create an order and add line items — it manages the transaction externally:

src/services/order_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from src.repositories.book_repository import BookRepository
from src.repositories.order_repository import OrderRepository
from src.schemas.order import OrderCreate, OrderResponse
class OrderService:
def __init__(
self,
session: AsyncSession,
order_repo: OrderRepository,
book_repo: BookRepository,
) -> None:
self._session = session
self._order_repo = order_repo
self._book_repo = book_repo
async def place_order(
self,
user_id: int,
book_ids: list[int],
) -> OrderResponse:
async with self._session.begin():
# Both operations share the same transaction
order = await self._order_repo.create(user_id=user_id)
for book_id in book_ids:
await self._order_repo.add_item(order.id, book_id)
# Commit happens automatically when the context exits without error
# Rollback happens automatically if an exception is raised
return order

When the async with self._session.begin() block completes without an exception, SQLAlchemy commits automatically. If any statement inside the block raises — say, add_item finds the book does not exist — SQLAlchemy rolls back all changes from that transaction, including the create_order call.

Connection Pooling Configuration

The default create_async_engine configuration is fine for development but too conservative for production. Here is a production-ready async engine setup:

src/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from src.core.config import settings
engine = create_async_engine(
settings.DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800, # recycle connections every 30 minutes
pool_pre_ping=True, # verify connection health before use
echo=settings.DEBUG,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

pool_pre_ping=True sends a cheap SELECT 1 before returning a connection from the pool. This catches connections that were dropped by the database server (e.g., after a 30-minute idle timeout) before your application tries to use them. Without this, you get OperationalError: server closed the connection unexpectedly under low traffic.

Here is a guide to sizing the pool for different environments:

Environmentpool_sizemax_overflowpool_recycle
Development (local) 2 5 3600 (1 hour)
Staging (1-2 replicas) 5 10 1800 (30 min)
Production (load balanced) 10 20 1800 (30 min)
Production (high throughput) 20 40 900 (15 min)

The formula: pool_size + max_overflow is the maximum number of concurrent database connections from a single process. For a PostgreSQL server with a max_connections = 100 limit and three application replicas: (10 + 20) * 3 = 90 — comfortably under the limit with headroom for admin connections.

The FastAPI Dependency for Sessions

FastAPI’s dependency injection system wires the session and repository into route handlers without manual instantiation. This is the bridge between the HTTP layer and the repository layer:

src/api/deps.py
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.session import AsyncSessionLocal
from src.repositories.book_repository import BookRepository
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
yield session
async def get_book_repo(
session: AsyncSession = Depends(get_db),
) -> BookRepository:
return BookRepository(session)

Usage in a route handler:

src/api/v1/books.py
from fastapi import APIRouter, Depends, HTTPException
from src.api.deps import get_book_repo
from src.repositories.book_repository import BookRepository
from src.schemas.book import BookResponse
router = APIRouter(prefix="/books", tags=["books"])
@router.get("/{book_id}", response_model=BookResponse)
async def get_book(
book_id: int,
repo: BookRepository = Depends(get_book_repo),
) -> BookResponse:
book = await repo.get_by_id(book_id)
if not book:
raise HTTPException(status_code=404, detail="Book not found")
return book

The route handler does not know the session exists. It receives a BookRepository and calls get_by_id. The async with AsyncSessionLocal() as session in get_db ensures the session is closed after every request — even if the request raises an exception.

Keeping the Repository Focused

A repository should have one responsibility: database access for a single entity. When a repository method grows beyond 20-25 lines, it is usually doing too much. Common signals:

  • The method fetches an entity, applies business logic, and updates a different entity — split the logic into the service
  • The method builds a complex query conditionally based on many optional parameters — extract the query builder to a separate function
  • The method calls another repository — services coordinate multiple repositories, not repositories themselves

A well-structured repository is boring. It is a thin layer of SQLAlchemy calls with consistent error handling. The interesting code lives in the service layer, not here.

Key Takeaways

  • Repository pattern isolates database access: only repositories import from SQLAlchemy; services and route handlers never call session.execute() directly
  • selectinload vs joinedload: use selectinload for single-entity eager loading, joinedload for list queries; never rely on lazy loading in an async context
  • IntegrityError handling belongs in the repository: catch it, rollback, raise a domain exception; let the service layer decide the HTTP status code
  • Transaction boundaries belong in the service: repositories commit their own single-operation transactions; services manage multi-operation transactions with session.begin()
  • pool_pre_ping=True in production: eliminates stale connection errors under low traffic; the overhead is negligible
0

Next in this series

Clean Code Python: The Service Layer — Where Business Logic Lives

Continue reading