Tutorial

Clean Code Python: Performance — Profiling, N+1 Detection, and Load Testing

An N+1 query that takes 50ms with 10 rows takes 5 seconds with 1000. A memory leak that grows 1MB/hour OOMs in 3 days. Here is how to profile, detect, and load test a multi-tenant Python backend before production tells you first.

Tin Dang avatar
Tin Dang
Dashboard with flame graphs and performance metrics showing query execution times and memory allocation patterns

An N+1 query that takes 50ms with 10 results takes 5 seconds with 1,000. A memory leak that grows 1MB per hour kills your server in 3 days. A JSON serialization bottleneck that is invisible at 100 items dominates your response time at 50,000.

You cannot optimize what you cannot measure, and you cannot trust an optimization that has not been verified under realistic multi-tenant load. This post covers the full performance lifecycle: profiling in production, detecting N+1 queries automatically, optimizing the critical path, and load testing under conditions that match what your tenants actually do.

The Performance Investigation Workflow

Before optimizing anything, you need a repeatable process for finding what is actually slow. Guessing is how you spend a week optimizing JSON serialization when the real bottleneck is a missing index.

Profiling in Production

Development profiling lies to you. Your dev database has 50 rows. Production has 50,000. Your dev machine has no memory pressure. Production shares resources with 200 tenants. Profile where it matters.

CPU Profiling with py-spy

py-spy attaches to a running Python process with zero code changes and near-zero overhead. It samples the call stack and produces flame graphs that show exactly where CPU time is spent.

Terminal window
# Attach to running process — no restart, no code changes
py-spy record -o flamegraph.svg --pid $(pgrep -f uvicorn)
# Profile for 30 seconds during peak traffic
py-spy record -o flamegraph.svg --pid $(pgrep -f uvicorn) --duration 30
# Top-like live view for quick triage
py-spy top --pid $(pgrep -f uvicorn)

Memory Profiling with memray

memray tracks every allocation and produces a timeline showing where memory grows. It catches leaks that only manifest after hours of production traffic.

Terminal window
# Record allocations for the process
memray run -o output.bin your_script.py
# For a running server, use the live tracker
memray run --live your_script.py
# Generate a flame graph of allocations
memray flamegraph output.bin -o memory_flamegraph.html
# Programmatic tracking for specific code paths
import memray
async def suspect_endpoint(request: Request) -> Response:
with memray.Tracker("suspect_endpoint.bin"):
# This block's allocations are recorded
result = await heavy_computation()
return JSONResponse(result)

Automatic N+1 Detection

N+1 queries are the single most common performance problem in ORM-backed applications. They are invisible during development because small datasets mask the linear growth. A query that runs once to fetch 10 orders, then 10 more times to fetch each order’s items, looks fine. The same pattern with 1,000 orders runs 1,001 queries.

SQLAlchemy Event Listener

This listener counts queries per request and logs a warning when the count exceeds a threshold. In development, it raises an exception. In production, it emits a metric.

src/db/query_counter.py
from contextvars import ContextVar
from typing import Any
from sqlalchemy import event
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.orm import Session
import structlog
logger = structlog.get_logger()
_query_count: ContextVar[int] = ContextVar("query_count", default=0)
_query_threshold: int = 10
def reset_query_count() -> None:
"""Reset at the start of each request."""
_query_count.set(0)
def get_query_count() -> int:
"""Return current query count for the request."""
return _query_count.get()
def register_query_counter(engine: Engine) -> None:
"""Register event listener that counts queries per request."""
@event.listens_for(engine, "before_cursor_execute")
def _count_query(
conn: Connection,
cursor: Any,
statement: str,
parameters: Any,
context: Any,
executemany: bool,
) -> None:
current = _query_count.get()
_query_count.set(current + 1)
if current + 1 > _query_threshold:
logger.warning(
"high_query_count",
query_number=current + 1,
threshold=_query_threshold,
statement=statement[:200],
)

Request-Level Query Counting Middleware

Wire the counter into the request lifecycle. Every request starts at zero, and the response includes the query count as a header in development.

src/api/middleware/query_count.py
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response
from src.core.config import settings
from src.db.query_counter import get_query_count, reset_query_count
class QueryCountMiddleware(BaseHTTPMiddleware):
"""Track and expose query counts per request."""
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
reset_query_count()
response = await call_next(request)
count = get_query_count()
if settings.debug:
response.headers["X-Query-Count"] = str(count)
if count > 20:
# Emit metric for monitoring dashboards
from src.metrics import high_query_count_total
high_query_count_total.labels(
endpoint=request.url.path,
method=request.method,
).inc()
return response

Eager Loading Strategies

SQLAlchemy provides three eager loading strategies. Choosing the wrong one is almost as bad as lazy loading.

StrategySQL GeneratedBest ForWatch Out
joinedload Single query with LEFT JOIN One-to-one, small one-to-many Cartesian explosion with large collections or multiple joins
selectinload Separate SELECT ... WHERE id IN (...) One-to-many, many-to-many Two queries instead of one, but avoids row multiplication
subqueryload Subquery in WHERE clause Deep nesting where selectin hits limits Complex SQL, harder to debug, can be slower than selectin
src/repositories/catalog_repository.py
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy.ext.asyncio import AsyncSession
from src.models.book import Book
from src.models.author import Author
class CatalogRepository:
"""Repository with explicit eager loading strategies."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def list_books_with_authors(
self,
tenant_id: UUID,
limit: int = 50,
offset: int = 0,
) -> list[Book]:
"""Fetch books with authors in exactly 2 queries.
Uses selectinload to avoid the cartesian explosion
that joinedload would cause with 50K books.
"""
stmt = (
select(Book)
.options(selectinload(Book.author))
.where(Book.tenant_id == tenant_id)
.order_by(Book.title)
.limit(limit)
.offset(offset)
)
result = await self._session.execute(stmt)
return list(result.scalars().all())
async def get_book_with_full_details(
self, book_id: UUID
) -> Book | None:
"""Single book with all relations — joinedload is fine here.
One book = one author = one publisher. No cartesian risk.
"""
stmt = (
select(Book)
.options(
joinedload(Book.author),
joinedload(Book.publisher),
)
.where(Book.id == book_id)
)
result = await self._session.execute(stmt)
return result.scalar_one_or_none()

Database Query Optimization

Index Strategy for Multi-Tenant Queries

Every query in ShelfWise filters by tenant_id. Without a composite index, PostgreSQL scans the full table and then filters. With a composite index, it jumps directly to the tenant’s rows.

-- Composite index: tenant_id first (equality), then sort column
CREATE INDEX ix_books_tenant_title
ON books (tenant_id, title);
-- Partial index for active orders only (most queries filter by status)
CREATE INDEX ix_orders_tenant_active
ON orders (tenant_id, created_at DESC)
WHERE status NOT IN ('cancelled', 'archived');
-- Covering index: includes columns needed by the query
-- PostgreSQL can answer the query from the index alone (index-only scan)
CREATE INDEX ix_books_tenant_title_covering
ON books (tenant_id, title)
INCLUDE (id, isbn, price);

EXPLAIN ANALYZE

Never guess at query performance. EXPLAIN ANALYZE shows what PostgreSQL actually does.

EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT id, title, price
FROM books
WHERE tenant_id = '550e8400-e29b-41d4-a716-446655440000'
ORDER BY title
LIMIT 50;
-- Look for:
-- Index Scan (good) vs Seq Scan (bad on large tables)
-- Rows Removed by Filter (high = missing index)
-- Sort Method: external merge (high = needs more work_mem)
-- Buffers: shared hit vs shared read (hit = cached, read = disk)

Response Serialization: orjson

Python’s built-in json module is slow. Pydantic’s default JSON serialization is better but still not optimal for high-throughput endpoints. orjson is a Rust-backed JSON library that is 4-10x faster.

src/core/serialization.py
import orjson
from fastapi.responses import ORJSONResponse
class ShelfWiseResponse(ORJSONResponse):
"""Default response class using orjson for serialization.
orjson natively handles:
- UUID (no str() conversion needed)
- datetime (ISO 8601, no isoformat() call)
- Decimal (as JSON number, not string)
- dataclasses (no dict() conversion)
"""
media_type = "application/json"
src/main.py
from fastapi import FastAPI
from src.core.serialization import ShelfWiseResponse
app = FastAPI(default_response_class=ShelfWiseResponse)

Combine with Pydantic’s model configuration for maximum serialization performance:

from pydantic import BaseModel, ConfigDict
class BookResponse(BaseModel):
"""Response model optimized for fast serialization."""
model_config = ConfigDict(
frozen=True,
from_attributes=True, # Direct ORM → Pydantic without dict()
ser_json_bytes="base64",
ser_json_timedelta="float",
)
id: UUID
title: str
price: Decimal
author_name: str

Connection Reuse for External APIs

ShelfWise integrates with payment providers, shipping APIs, and ISBN lookup services. Creating a new TCP connection per request adds 50-200ms of latency.

src/clients/http_pool.py
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
import httpx
@asynccontextmanager
async def managed_http_client() -> AsyncIterator[httpx.AsyncClient]:
"""Shared HTTP client with connection pooling.
Reuses TCP connections across requests.
Limits concurrent connections to prevent exhaustion.
"""
async with httpx.AsyncClient(
timeout=httpx.Timeout(10.0, connect=5.0),
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=30.0,
),
http2=True, # Multiplexing over single connection
) as client:
yield client

Load Testing with Locust

Unit tests verify correctness. Load tests verify that correctness holds under pressure. Locust simulates realistic multi-tenant traffic patterns.

tests/load/locustfile.py
import random
from uuid import uuid4
from locust import HttpUser, between, task
# Simulate 5 tenants with different catalog sizes
TENANTS = [
{"id": str(uuid4()), "token": "jwt_tenant_a", "catalog_size": 500},
{"id": str(uuid4()), "token": "jwt_tenant_b", "catalog_size": 5_000},
{"id": str(uuid4()), "token": "jwt_tenant_c", "catalog_size": 50_000},
{"id": str(uuid4()), "token": "jwt_tenant_d", "catalog_size": 100},
{"id": str(uuid4()), "token": "jwt_tenant_e", "catalog_size": 10_000},
]
class ShelfWiseUser(HttpUser):
"""Simulates a tenant user browsing the catalog and placing orders."""
wait_time = between(0.5, 2.0)
def on_start(self) -> None:
self.tenant = random.choice(TENANTS)
self.client.headers.update({
"Authorization": f"Bearer {self.tenant['token']}",
})
@task(10)
def browse_catalog(self) -> None:
"""Most common operation: paginated catalog browse."""
page = random.randint(1, 10)
self.client.get(f"/api/v2/books?page={page}&limit=50")
@task(5)
def view_book(self) -> None:
"""View a single book's details."""
book_id = str(uuid4()) # Use seeded IDs in real tests
self.client.get(f"/api/v2/books/{book_id}")
@task(2)
def search_catalog(self) -> None:
"""Full-text search — expensive operation."""
queries = ["python", "machine learning", "distributed", "design"]
self.client.get(f"/api/v2/books/search?q={random.choice(queries)}")
@task(1)
def place_order(self) -> None:
"""Least common but most expensive operation."""
self.client.post(
"/api/v2/orders",
json={
"items": [
{"book_id": str(uuid4()), "quantity": random.randint(1, 3)}
for _ in range(random.randint(1, 5))
],
},
)

Load Test Scenarios

Run multiple scenarios that match real traffic patterns:

Terminal window
# Normal traffic: 50 concurrent users
locust -f tests/load/locustfile.py --headless \
-u 50 -r 5 --run-time 5m \
--host http://staging.shelfwise.io
# Peak traffic (3x normal): 150 concurrent users
locust -f tests/load/locustfile.py --headless \
-u 150 -r 15 --run-time 10m \
--host http://staging.shelfwise.io
# Tenant onboarding spike: bulk import during normal traffic
locust -f tests/load/locustfile.py -f tests/load/bulk_import.py \
--headless -u 50 -r 10 --run-time 15m \
--host http://staging.shelfwise.io

Performance Regression CI

Load tests are useless if you only run them before launches. Integrate p99 latency comparison into CI so regressions are caught before merge.

tests/performance/test_p99_regression.py
import json
import statistics
from pathlib import Path
import pytest
@pytest.fixture
def baseline() -> dict[str, float]:
"""Load baseline p99 latencies from previous successful run."""
baseline_path = Path("tests/performance/baseline.json")
return json.loads(baseline_path.read_text())
@pytest.fixture
def current_results() -> dict[str, float]:
"""Load current run results from locust stats."""
results_path = Path("tests/performance/current_stats.json")
return json.loads(results_path.read_text())
REGRESSION_THRESHOLD = 1.2 # 20% regression is actionable
def test_catalog_p99_no_regression(
baseline: dict[str, float],
current_results: dict[str, float],
) -> None:
"""Catalog endpoint p99 must not regress more than 20%."""
baseline_p99 = baseline["GET /api/v2/books"]["p99"]
current_p99 = current_results["GET /api/v2/books"]["p99"]
assert current_p99 < baseline_p99 * REGRESSION_THRESHOLD, (
f"p99 regression: {baseline_p99:.0f}ms → {current_p99:.0f}ms "
f"({current_p99 / baseline_p99:.1%} of baseline)"
)

The ShelfWise Performance Story

Here is the real scenario. The catalog endpoint has been running fine for months. Then a distributor tenant onboards with 50,000 books. Their first catalog request takes 2 seconds. By the time they paginate to page 10, the response time is 5 seconds. Their integration team opens a P1 ticket.

Investigation:

  1. Pull the distributed trace from OpenTelemetry. The database span is 1.8 seconds out of a 2-second response.
  2. Check the query counter. The request executed 50,001 queries — one for the book list, one per book to load the author relationship.
  3. Run EXPLAIN ANALYZE on the catalog query. Sequential scan on the books table. No index on (tenant_id, title).
  4. Profile JSON serialization with py-spy. The json.dumps() call takes 200ms for 50,000 books — Python’s json module is the bottleneck after the database.

Fixes applied:

# Fix 1: selectinload eliminates N+1
# Before: 50,001 queries (lazy loading)
# After: 2 queries (one for books, one for authors)
stmt = (
select(Book)
.options(selectinload(Book.author))
.where(Book.tenant_id == tenant_id)
.order_by(Book.title)
.limit(50)
.offset(offset)
)
# Fix 2: composite index eliminates sequential scan
# Before: Seq Scan on books (cost=0.00..18234.00 rows=50000)
# After: Index Scan using ix_books_tenant_title (cost=0.42..156.00 rows=50)
# SQL: CREATE INDEX ix_books_tenant_title ON books (tenant_id, title);
# Fix 3: orjson eliminates serialization bottleneck
# Before: json.dumps() — 200ms for 50K books
# After: orjson.dumps() — 48ms for 50K books
app = FastAPI(default_response_class=ORJSONResponse)

Results:

MetricBeforeAfterImprovement
Queries per request50,001225,000x
Database time1,800ms18ms100x
Serialization time200ms48ms4x
Total response time2,100ms80ms26x

The critical lesson: all three problems were invisible at development scale. 50 books, 50 queries, 2ms serialization — everything looked fine. The query counter, composite index, and orjson were not premature optimization. They were the baseline configuration that should have been in place from day one.

Key Takeaways

  • Profile in production, not development. py-spy for CPU (zero overhead, no code changes), memray for memory leaks. Development datasets lie about performance characteristics.
  • Count queries per request automatically. A SQLAlchemy event listener that increments a ContextVar counter catches N+1 queries before they reach production. Set the threshold at 10, alert at 20, page at 50.
  • Choose eager loading deliberately. selectinload for one-to-many (avoids cartesian explosion), joinedload for one-to-one or single-entity fetches. Never rely on lazy loading in production code.
  • Index for your actual query patterns. Multi-tenant queries always filter by tenant_id. Put tenant_id first in composite indexes. Use partial indexes for status-filtered queries. Use covering indexes for index-only scans.
  • Serialize with orjson. It is 4-10x faster than stdlib json and natively handles UUIDs, datetimes, and Decimals. The migration is a one-line change to your FastAPI app.
  • Load test with realistic tenant distributions. One tenant with 100 books and one with 50,000 expose different bottlenecks than uniform test data. Simulate your actual tenant mix.
  • Gate merges on p99 regression. Compare current performance against a baseline in CI. A 20% regression threshold catches real problems without blocking normal variance.

You can profile, detect, and fix performance problems. You can prove your fixes hold under load. But none of this matters if the code never leaves your laptop. In the next post — the final post — we take ShelfWise from git init to production traffic, assembling every pattern from this series into a deployed, monitored, incident-ready system.

0

Next in this series

Clean Code Python: From git init to Production Traffic

Continue reading