Tutorial

Clean Code Python: Locking, Idempotency, and Sagas

At 1M transactions per day, two requests modifying the same inventory record is not an edge case. Here is how to prevent lost updates with optimistic locking, double charges with idempotency keys, and partial failures with the saga pattern.

Tin Dang avatar
Tin Dang
Interlocking gears and mechanisms working in precise synchronization inside a clock tower

At 1M transactions per day, two requests modifying the same inventory record simultaneously is not a theoretical concern — it happens hundreds of times per hour. Without optimistic locking, you get lost updates: two warehouse workers each “pick” the last copy of a bestseller, and your inventory goes negative. Without idempotency, a network timeout during payment causes the customer to be charged twice. Without sagas, a failed shipment notification leaves an order in a state where the customer paid but nobody knows to ship it.

ShelfWise learned all three lessons the hard way. This post covers the patterns that prevent each failure.

Optimistic Locking: Preventing Lost Updates

The problem is simple. Two concurrent requests read the same row, each makes a decision based on what they read, and both write back. The second write silently overwrites the first.

Pessimistic LockingOptimistic Locking
Mechanism SELECT FOR UPDATE — holds row lock Version column — check-and-set on write
Contention cost High — blocked requests wait Low — retry on conflict
Deadlock risk Yes — if lock order varies None — no locks held
Throughput Lower under contention Higher — no waiting
Failure mode Timeout if lock held too long StaleDataError — caller retries
Best for Short, critical sections Read-heavy with rare conflicts

For ShelfWise, optimistic locking is the right default. Most requests read inventory; only a fraction modify it. Pessimistic locking would serialize every inventory update behind a row lock, killing throughput.

The Version Column

Every mutable entity gets a version column. On every update, the application increments the version and includes the expected version in the WHERE clause. If someone else modified the row between our read and write, the WHERE clause matches zero rows and we know the update was lost.

src/models/inventory.py
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from src.db.base import Base, TenantMixin, TimestampMixin
class InventoryItem(TenantMixin, TimestampMixin, Base):
__tablename__ = "inventory_items"
id: Mapped[int] = mapped_column(primary_key=True)
book_id: Mapped[int] = mapped_column(ForeignKey("books.id"))
warehouse_id: Mapped[int] = mapped_column(ForeignKey("warehouses.id"))
quantity: Mapped[int] = mapped_column(Integer, default=0)
location: Mapped[str] = mapped_column(String(50))
# Optimistic locking — SQLAlchemy checks this automatically
version: Mapped[int] = mapped_column(Integer, default=1)
__mapper_args__ = {"version_id_col": version}

SQLAlchemy’s version_id_col does the heavy lifting. When you call session.flush(), SQLAlchemy generates:

UPDATE inventory_items
SET quantity = :new_qty, version = :new_version
WHERE id = :id AND version = :expected_version

If the row was modified by another transaction, rowcount is 0 and SQLAlchemy raises StaleDataError. No manual version checking required.

Handling StaleDataError

The service layer catches StaleDataError and retries with a fresh read.

src/services/inventory_service.py
from sqlalchemy.orm.exc import StaleDataError
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from src.models.inventory import InventoryItem
from src.repositories.inventory_repository import InventoryRepositoryProtocol
class InsufficientStockError(Exception):
def __init__(self, book_id: int, requested: int, available: int) -> None:
self.book_id = book_id
self.requested = requested
self.available = available
super().__init__(
f"Book {book_id}: requested {requested}, available {available}"
)
class InventoryService:
def __init__(self, repo: InventoryRepositoryProtocol) -> None:
self._repo = repo
@retry(
retry=retry_if_exception_type(StaleDataError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.1, max=1.0),
)
async def reserve_stock(
self, book_id: int, warehouse_id: int, quantity: int,
) -> InventoryItem:
"""Atomically reduce inventory. Retries on concurrent modification."""
item = await self._repo.get_by_book_and_warehouse(book_id, warehouse_id)
if item is None:
raise ValueError(f"No inventory record for book {book_id}")
if item.quantity < quantity:
raise InsufficientStockError(book_id, quantity, item.quantity)
item.quantity -= quantity
await self._repo.save(item)
return item

The tenacity retry decorator retries up to three times with exponential backoff. In practice, the first retry almost always succeeds because the conflicting transaction has already committed.

Idempotency: Preventing Double Execution

Rate limiting from Part 15 stops intentional abuse. Idempotency stops accidental duplication. When a client sends a payment request and the connection drops before they receive the response, they have no way to know if the payment went through. They retry. Without idempotency, the customer is charged twice.

The Idempotency Key Pattern

The client provides an Idempotency-Key header with a unique value (typically a UUID). The server checks if it has already processed a request with that key. If yes, it returns the cached response. If no, it processes the request and caches the response.

src/infra/idempotency.py
import json
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, String, Integer, DateTime, Text
from sqlalchemy.orm import Mapped, mapped_column
from src.db.base import Base
class IdempotencyRecord(Base):
"""Persistent idempotency storage for audit and recovery."""
__tablename__ = "idempotency_keys"
key: Mapped[str] = mapped_column(String(255), primary_key=True)
tenant_id: Mapped[int] = mapped_column(Integer, index=True)
status_code: Mapped[int] = mapped_column(Integer)
response_body: Mapped[str] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(UTC),
)
@dataclass(frozen=True, slots=True)
class CachedResponse:
status_code: int
body: dict[str, Any]
TTL: int = 86_400 # 24 hours
class IdempotencyStore:
"""Two-layer idempotency: Redis for speed, PostgreSQL for persistence."""
def __init__(self, redis_client: redis.Redis, session: AsyncSession) -> None:
self._redis = redis_client
self._session = session
async def get(self, key: str, tenant_id: int) -> CachedResponse | None:
# Fast path: check Redis
cached = await self._redis.get(f"idem:{tenant_id}:{key}")
if cached is not None:
data = json.loads(cached)
return CachedResponse(
status_code=data["status_code"], body=data["body"],
)
# Slow path: check PostgreSQL (Redis may have evicted)
stmt = select(IdempotencyRecord).where(
IdempotencyRecord.key == key,
IdempotencyRecord.tenant_id == tenant_id,
)
record = (await self._session.execute(stmt)).scalar_one_or_none()
if record is None:
return None
response = CachedResponse(
status_code=record.status_code,
body=json.loads(record.response_body),
)
# Re-populate Redis for subsequent retries
await self._cache_in_redis(key, tenant_id, response)
return response
async def store(
self, key: str, tenant_id: int, response: CachedResponse,
) -> None:
# Write to both layers
await self._cache_in_redis(key, tenant_id, response)
record = IdempotencyRecord(
key=key,
tenant_id=tenant_id,
status_code=response.status_code,
response_body=json.dumps(response.body),
)
self._session.add(record)
await self._session.flush()
async def _cache_in_redis(
self, key: str, tenant_id: int, response: CachedResponse,
) -> None:
payload = json.dumps({
"status_code": response.status_code,
"body": response.body,
})
await self._redis.setex(
f"idem:{tenant_id}:{key}", TTL, payload,
)

Idempotency Middleware

The middleware intercepts mutating requests and checks for the idempotency key before the endpoint runs.

src/api/middleware/idempotency.py
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from src.core.tenant_context import get_current_tenant
from src.infra.idempotency import IdempotencyStore, CachedResponse
MUTATING_METHODS = {"POST", "PUT", "PATCH", "DELETE"}
class IdempotencyMiddleware(BaseHTTPMiddleware):
def __init__(self, app, store: IdempotencyStore) -> None: # noqa: ANN001
super().__init__(app)
self._store = store
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint,
) -> Response:
if request.method not in MUTATING_METHODS:
return await call_next(request)
idem_key = request.headers.get("Idempotency-Key")
if idem_key is None:
return await call_next(request)
tenant = get_current_tenant()
if tenant is None:
return await call_next(request)
# Check for cached response
cached = await self._store.get(idem_key, tenant.id)
if cached is not None:
return JSONResponse(
status_code=cached.status_code,
content=cached.body,
headers={"X-Idempotent-Replayed": "true"},
)
# Process request and cache response
response = await call_next(request)
if 200 <= response.status_code < 300:
body = b""
async for chunk in response.body_iterator:
body += chunk if isinstance(chunk, bytes) else chunk.encode()
import json
cached_response = CachedResponse(
status_code=response.status_code,
body=json.loads(body),
)
await self._store.store(idem_key, tenant.id, cached_response)
return JSONResponse(
status_code=response.status_code,
content=cached_response.body,
headers=dict(response.headers),
)
return response

The X-Idempotent-Replayed: true header tells the client this is a cached response from a previous execution, not a new one. This matters for debugging.

The Saga Pattern: Multi-Step Operations

Some operations span multiple services and cannot be wrapped in a single database transaction. In ShelfWise, creating an order requires four steps: reserve inventory, charge payment, confirm the order, and send a notification. If payment fails after inventory is reserved, we need to release the reservation.

Each forward step has a compensating action. If any step fails, the saga runs compensations in reverse order to undo everything that succeeded.

Saga Orchestrator

The orchestrator manages the state machine. Each saga is a sequence of steps, and each step has an execute and a compensate method.

src/sagas/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
import structlog
logger = structlog.get_logger()
class SagaStepStatus(StrEnum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATED = "compensated"
@dataclass
class SagaStep(ABC):
name: str
status: SagaStepStatus = SagaStepStatus.PENDING
@abstractmethod
async def execute(self, context: dict[str, Any]) -> dict[str, Any]:
"""Run the forward action. Return data to merge into context."""
...
@abstractmethod
async def compensate(self, context: dict[str, Any]) -> None:
"""Undo the forward action."""
...
@dataclass
class SagaResult:
success: bool
context: dict[str, Any]
failed_step: str | None = None
error: str | None = None
class SagaOrchestrator:
"""Runs a sequence of steps with automatic compensation on failure."""
def __init__(self, steps: list[SagaStep]) -> None:
self._steps = steps
async def execute(self, initial_context: dict[str, Any]) -> SagaResult:
context = dict(initial_context)
completed: list[SagaStep] = []
for step in self._steps:
try:
logger.info("saga_step_executing", step=step.name)
result = await step.execute(context)
context.update(result)
step.status = SagaStepStatus.COMPLETED
completed.append(step)
logger.info("saga_step_completed", step=step.name)
except Exception as exc:
step.status = SagaStepStatus.FAILED
logger.error(
"saga_step_failed", step=step.name, error=str(exc),
)
await self._compensate(completed, context)
return SagaResult(
success=False,
context=context,
failed_step=step.name,
error=str(exc),
)
return SagaResult(success=True, context=context)
async def _compensate(
self, completed: list[SagaStep], context: dict[str, Any],
) -> None:
for step in reversed(completed):
try:
logger.info("saga_compensating", step=step.name)
await step.compensate(context)
step.status = SagaStepStatus.COMPENSATED
logger.info("saga_compensated", step=step.name)
except Exception as exc:
logger.error(
"saga_compensation_failed",
step=step.name,
error=str(exc),
)
# Compensation failure is a critical alert —
# requires manual intervention

Order Creation Saga

Here is the concrete saga for creating an order in ShelfWise.

src/sagas/create_order.py
from dataclasses import dataclass
from typing import Any
from src.sagas.base import SagaStep, SagaOrchestrator
from src.services.inventory_service import InventoryService
from src.services.payment_service import PaymentService
from src.services.order_service import OrderService
from src.services.notification_service import NotificationService
@dataclass
class ReserveInventoryStep(SagaStep):
name: str = "reserve_inventory"
inventory_service: InventoryService | None = None
async def execute(self, context: dict[str, Any]) -> dict[str, Any]:
assert self.inventory_service is not None
reservation = await self.inventory_service.reserve_stock(
book_id=context["book_id"],
warehouse_id=context["warehouse_id"],
quantity=context["quantity"],
)
return {"reservation_id": reservation.id}
async def compensate(self, context: dict[str, Any]) -> None:
assert self.inventory_service is not None
await self.inventory_service.release_reservation(
context["reservation_id"],
)
@dataclass
class ChargePaymentStep(SagaStep):
name: str = "charge_payment"
payment_service: PaymentService | None = None
async def execute(self, context: dict[str, Any]) -> dict[str, Any]:
assert self.payment_service is not None
charge = await self.payment_service.charge(
customer_id=context["customer_id"],
amount=context["total_amount"],
idempotency_key=context["idempotency_key"],
)
return {"charge_id": charge.id}
async def compensate(self, context: dict[str, Any]) -> None:
assert self.payment_service is not None
await self.payment_service.refund(context["charge_id"])
@dataclass
class ConfirmOrderStep(SagaStep):
name: str = "confirm_order"
order_service: OrderService | None = None
async def execute(self, context: dict[str, Any]) -> dict[str, Any]:
assert self.order_service is not None
order = await self.order_service.confirm(
order_id=context["order_id"],
charge_id=context["charge_id"],
)
return {"confirmed_order_id": order.id}
async def compensate(self, context: dict[str, Any]) -> None:
assert self.order_service is not None
await self.order_service.cancel(context["order_id"])
async def create_order_saga(
context: dict[str, Any],
inventory_service: InventoryService,
payment_service: PaymentService,
order_service: OrderService,
) -> dict[str, Any]:
saga = SagaOrchestrator([
ReserveInventoryStep(inventory_service=inventory_service),
ChargePaymentStep(payment_service=payment_service),
ConfirmOrderStep(order_service=order_service),
])
result = await saga.execute(context)
if not result.success:
raise OrderCreationFailed(
step=result.failed_step,
reason=result.error,
)
return result.context
class OrderCreationFailed(Exception):
def __init__(self, step: str | None, reason: str | None) -> None:
self.step = step
self.reason = reason
super().__init__(f"Order creation failed at {step}: {reason}")

Notice that ChargePaymentStep.execute passes the idempotency_key to the payment service. If the saga fails after payment and restarts from scratch, the payment provider returns the cached charge instead of creating a new one. Sagas and idempotency work together.

Transactional Outbox

The saga raises a question: how do you reliably publish domain events (like “order confirmed”) when the database transaction and the message broker are two separate systems? If you publish the event first and the transaction rolls back, consumers see an event that never happened. If you commit first and the publish fails, the event is lost.

The transactional outbox pattern solves this by writing events to an outbox table in the same transaction as the business data. A separate poller reads the outbox and publishes to the message broker.

src/models/outbox.py
from datetime import UTC, datetime
from sqlalchemy import Integer, String, Text, DateTime, Boolean
from sqlalchemy.orm import Mapped, mapped_column
from src.db.base import Base
class OutboxEvent(Base):
__tablename__ = "outbox_events"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
aggregate_type: Mapped[str] = mapped_column(String(100))
aggregate_id: Mapped[int] = mapped_column(Integer, index=True)
event_type: Mapped[str] = mapped_column(String(100))
payload: Mapped[str] = mapped_column(Text) # JSON
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(UTC),
)
published: Mapped[bool] = mapped_column(Boolean, default=False)
# src/services/order_service.py — inside the confirm method
async def confirm(self, order_id: int, charge_id: str) -> Order:
order = await self._repo.get(order_id)
order.status = OrderStatus.CONFIRMED
order.charge_id = charge_id
# Write event in the SAME transaction as the order update
event = OutboxEvent(
aggregate_type="Order",
aggregate_id=order.id,
event_type="order.confirmed",
payload=json.dumps({
"order_id": order.id,
"customer_id": order.customer_id,
"total": str(order.total),
}),
)
self._session.add(event)
await self._session.flush()
return order

The outbox poller uses SELECT FOR UPDATE SKIP LOCKED to safely process events across multiple worker instances without duplicate delivery.

src/workers/outbox_poller.py
from sqlalchemy import select, update
async def poll_outbox(session: AsyncSession, publisher: EventPublisher) -> int:
"""Fetch and publish unpublished outbox events. Returns count published."""
stmt = (
select(OutboxEvent)
.where(OutboxEvent.published == False) # noqa: E712
.order_by(OutboxEvent.created_at)
.limit(100)
.with_for_update(skip_locked=True)
)
events = (await session.execute(stmt)).scalars().all()
for event in events:
await publisher.publish(
topic=event.event_type,
payload=event.payload,
)
event.published = True
await session.commit()
return len(events)

SKIP LOCKED means if another worker instance is already processing an event, this query skips it instead of waiting. No duplicate deliveries, no lock contention.

What Happens Without This

In ShelfWise’s first month, a customer placed an order during a brief network glitch. Their browser retried the payment request. Without idempotency, the payment gateway processed both requests — the customer was charged twice. The support ticket turned into a Twitter thread. The thread turned into a Hacker News comment.

We implemented idempotency keys the next day. Every payment request now carries an Idempotency-Key header. The payment gateway returns the cached result on retry. The customer is charged exactly once, regardless of how many times the request is sent.

Checklist

Before shipping data consistency patterns to production:

  1. Every mutable entity has a version column with version_id_col
  2. StaleDataError is caught and retried with exponential backoff (max 3 attempts)
  3. All mutating endpoints accept an Idempotency-Key header
  4. Idempotency responses are stored in both Redis (speed) and PostgreSQL (durability)
  5. Multi-step operations use the saga pattern with explicit compensating actions
  6. Domain events are written to a transactional outbox in the same transaction as business data
  7. Outbox poller uses SELECT FOR UPDATE SKIP LOCKED for safe concurrent processing
  8. Compensation failures trigger critical alerts for manual intervention

Next in Part 17, we tackle the most dangerous operation in multi-tenant SaaS: schema migrations on live databases with hundreds of tenants.

0

Next in this series

Clean Code Python: Multi-Tenant Migrations Without Downtime

Continue reading