Your SQLAlchemy session is leaking into your route handlers. Every db.execute(select(Book).where(...)) in a FastAPI endpoint is a query you cannot test without spinning up a database, a migration, and test fixtures. Add a second endpoint that needs the same query with slightly different filtering, and you have just duplicated a SQL expression across two files.
The Repository pattern fixes this by placing all database access — queries, inserts, updates, deletes — behind a class interface. Route handlers call services. Services call repositories. Repositories call SQLAlchemy. The database is invisible to everything except the repository.
What the Repository Pattern Solves
Two architectures, same functionality:
Without the pattern, every layer that needs data talks directly to SQLAlchemy. A test for the service layer requires a real database connection. With the pattern, the test substitutes a fake repository — the service is isolated from the database entirely.
The Full BookRepository Implementation
Building on the Protocol from Part 2, here is the complete BookRepository with eager loading, a search query, integrity error handling, and a count method:
from sqlalchemy.ext.asyncio import AsyncSessionfrom sqlalchemy import select, funcfrom sqlalchemy.orm import selectinloadfrom sqlalchemy.exc import IntegrityErrorfrom src.models.book import Bookfrom src.models.author import Authorfrom src.schemas.book import BookCreate, BookResponse, BookWithAuthorResponse
class BookRepository: def __init__(self, session: AsyncSession) -> None: self._session = session
async def get_by_id(self, book_id: int) -> BookWithAuthorResponse | None: result = await self._session.execute( select(Book) .options(selectinload(Book.author)) .where(Book.id == book_id) ) book = result.scalar_one_or_none() return BookWithAuthorResponse.model_validate(book) if book else None
async def list_all( self, *, limit: int = 50, offset: int = 0, ) -> list[BookResponse]: result = await self._session.execute( select(Book).limit(limit).offset(offset) ) return [BookResponse.model_validate(b) for b in result.scalars()]
async def search(self, query: str, *, limit: int = 20) -> list[BookResponse]: result = await self._session.execute( select(Book) .where(Book.title.ilike(f"%{query}%")) .limit(limit) ) return [BookResponse.model_validate(b) for b in result.scalars()]
async def create(self, data: BookCreate) -> BookResponse: book = Book(**data.model_dump()) self._session.add(book) try: await self._session.commit() except IntegrityError: await self._session.rollback() raise ValueError(f"Book with ISBN {data.isbn} already exists") await self._session.refresh(book) return BookResponse.model_validate(book)
async def delete(self, book_id: int) -> bool: book = await self._session.get(Book, book_id) if not book: return False await self._session.delete(book) await self._session.commit() return True
async def count(self) -> int: result = await self._session.execute( select(func.count()).select_from(Book) ) return result.scalar_one()Three patterns worth noting in this implementation:
selectinload for related data. get_by_id uses selectinload(Book.author) to fetch the author in a second query batched by SQLAlchemy — not an N+1 query, but also not a JOIN. Use selectinload when you need the related object on a single entity fetch. Use joinedload for list queries where the JOIN reduces round-trips.
IntegrityError handling. The create method catches IntegrityError — the exception raised when the unique constraint on isbn is violated — rolls back the session, and raises a domain exception (ValueError) that the service layer can catch and convert to a 409 Conflict HTTP response. The database error never leaks to the API layer.
scalar_one_or_none vs scalar_one. scalar_one_or_none() returns None when no row matches. scalar_one() raises NoResultFound when no row matches and MultipleResultsFound when more than one matches. Use scalar_one_or_none for lookup by ID (user may request a missing ID). Use scalar_one for lookups that must exist (e.g., fetching the current user by session token).
Transaction Management
The repository handles single-operation transactions internally (each method commits its own changes). When a service needs to perform multiple operations atomically — create an order and add line items — it manages the transaction externally:
from sqlalchemy.ext.asyncio import AsyncSessionfrom src.repositories.book_repository import BookRepositoryfrom src.repositories.order_repository import OrderRepositoryfrom src.schemas.order import OrderCreate, OrderResponse
class OrderService: def __init__( self, session: AsyncSession, order_repo: OrderRepository, book_repo: BookRepository, ) -> None: self._session = session self._order_repo = order_repo self._book_repo = book_repo
async def place_order( self, user_id: int, book_ids: list[int], ) -> OrderResponse: async with self._session.begin(): # Both operations share the same transaction order = await self._order_repo.create(user_id=user_id) for book_id in book_ids: await self._order_repo.add_item(order.id, book_id) # Commit happens automatically when the context exits without error # Rollback happens automatically if an exception is raised return orderWhen the async with self._session.begin() block completes without an exception, SQLAlchemy commits automatically. If any statement inside the block raises — say, add_item finds the book does not exist — SQLAlchemy rolls back all changes from that transaction, including the create_order call.
Connection Pooling Configuration
The default create_async_engine configuration is fine for development but too conservative for production. Here is a production-ready async engine setup:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmakerfrom src.core.config import settings
engine = create_async_engine( settings.DATABASE_URL, pool_size=10, max_overflow=20, pool_timeout=30, pool_recycle=1800, # recycle connections every 30 minutes pool_pre_ping=True, # verify connection health before use echo=settings.DEBUG,)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)pool_pre_ping=True sends a cheap SELECT 1 before returning a connection from the pool. This catches connections that were dropped by the database server (e.g., after a 30-minute idle timeout) before your application tries to use them. Without this, you get OperationalError: server closed the connection unexpectedly under low traffic.
Here is a guide to sizing the pool for different environments:
| Environment | pool_size | max_overflow | pool_recycle |
|---|---|---|---|
| Development (local) | 2 | 5 | 3600 (1 hour) |
| Staging (1-2 replicas) | 5 | 10 | 1800 (30 min) |
| Production (load balanced) | 10 | 20 | 1800 (30 min) |
| Production (high throughput) | 20 | 40 | 900 (15 min) |
The formula: pool_size + max_overflow is the maximum number of concurrent database connections from a single process. For a PostgreSQL server with a max_connections = 100 limit and three application replicas: (10 + 20) * 3 = 90 — comfortably under the limit with headroom for admin connections.
The FastAPI Dependency for Sessions
FastAPI’s dependency injection system wires the session and repository into route handlers without manual instantiation. This is the bridge between the HTTP layer and the repository layer:
from typing import AsyncGeneratorfrom fastapi import Dependsfrom sqlalchemy.ext.asyncio import AsyncSessionfrom src.db.session import AsyncSessionLocalfrom src.repositories.book_repository import BookRepository
async def get_db() -> AsyncGenerator[AsyncSession, None]: async with AsyncSessionLocal() as session: yield session
async def get_book_repo( session: AsyncSession = Depends(get_db),) -> BookRepository: return BookRepository(session)Usage in a route handler:
from fastapi import APIRouter, Depends, HTTPExceptionfrom src.api.deps import get_book_repofrom src.repositories.book_repository import BookRepositoryfrom src.schemas.book import BookResponse
router = APIRouter(prefix="/books", tags=["books"])
@router.get("/{book_id}", response_model=BookResponse)async def get_book( book_id: int, repo: BookRepository = Depends(get_book_repo),) -> BookResponse: book = await repo.get_by_id(book_id) if not book: raise HTTPException(status_code=404, detail="Book not found") return bookThe route handler does not know the session exists. It receives a BookRepository and calls get_by_id. The async with AsyncSessionLocal() as session in get_db ensures the session is closed after every request — even if the request raises an exception.
Keeping the Repository Focused
A repository should have one responsibility: database access for a single entity. When a repository method grows beyond 20-25 lines, it is usually doing too much. Common signals:
- The method fetches an entity, applies business logic, and updates a different entity — split the logic into the service
- The method builds a complex query conditionally based on many optional parameters — extract the query builder to a separate function
- The method calls another repository — services coordinate multiple repositories, not repositories themselves
A well-structured repository is boring. It is a thin layer of SQLAlchemy calls with consistent error handling. The interesting code lives in the service layer, not here.
Key Takeaways
- Repository pattern isolates database access: only repositories import from SQLAlchemy; services and route handlers never call
session.execute()directly selectinloadvsjoinedload: useselectinloadfor single-entity eager loading,joinedloadfor list queries; never rely on lazy loading in an async contextIntegrityErrorhandling belongs in the repository: catch it, rollback, raise a domain exception; let the service layer decide the HTTP status code- Transaction boundaries belong in the service: repositories commit their own single-operation transactions; services manage multi-operation transactions with
session.begin() pool_pre_ping=Truein production: eliminates stale connection errors under low traffic; the overhead is negligible