At 1M transactions per day, two requests modifying the same inventory record simultaneously is not a theoretical concern — it happens hundreds of times per hour. Without optimistic locking, you get lost updates: two warehouse workers each “pick” the last copy of a bestseller, and your inventory goes negative. Without idempotency, a network timeout during payment causes the customer to be charged twice. Without sagas, a failed shipment notification leaves an order in a state where the customer paid but nobody knows to ship it.
ShelfWise learned all three lessons the hard way. This post covers the patterns that prevent each failure.
Optimistic Locking: Preventing Lost Updates
The problem is simple. Two concurrent requests read the same row, each makes a decision based on what they read, and both write back. The second write silently overwrites the first.
| Pessimistic Locking | Optimistic Locking | |
|---|---|---|
| Mechanism | SELECT FOR UPDATE — holds row lock | Version column — check-and-set on write |
| Contention cost | High — blocked requests wait | Low — retry on conflict |
| Deadlock risk | Yes — if lock order varies | None — no locks held |
| Throughput | Lower under contention | Higher — no waiting |
| Failure mode | Timeout if lock held too long | StaleDataError — caller retries |
| Best for | Short, critical sections | Read-heavy with rare conflicts |
For ShelfWise, optimistic locking is the right default. Most requests read inventory; only a fraction modify it. Pessimistic locking would serialize every inventory update behind a row lock, killing throughput.
The Version Column
Every mutable entity gets a version column. On every update, the application increments the version and includes the expected version in the WHERE clause. If someone else modified the row between our read and write, the WHERE clause matches zero rows and we know the update was lost.
from sqlalchemy import Integer, String, ForeignKeyfrom sqlalchemy.orm import Mapped, mapped_column
from src.db.base import Base, TenantMixin, TimestampMixin
class InventoryItem(TenantMixin, TimestampMixin, Base): __tablename__ = "inventory_items"
id: Mapped[int] = mapped_column(primary_key=True) book_id: Mapped[int] = mapped_column(ForeignKey("books.id")) warehouse_id: Mapped[int] = mapped_column(ForeignKey("warehouses.id")) quantity: Mapped[int] = mapped_column(Integer, default=0) location: Mapped[str] = mapped_column(String(50))
# Optimistic locking — SQLAlchemy checks this automatically version: Mapped[int] = mapped_column(Integer, default=1)
__mapper_args__ = {"version_id_col": version}SQLAlchemy’s version_id_col does the heavy lifting. When you call session.flush(), SQLAlchemy generates:
UPDATE inventory_itemsSET quantity = :new_qty, version = :new_versionWHERE id = :id AND version = :expected_versionIf the row was modified by another transaction, rowcount is 0 and SQLAlchemy raises StaleDataError. No manual version checking required.
Handling StaleDataError
The service layer catches StaleDataError and retries with a fresh read.
from sqlalchemy.orm.exc import StaleDataErrorfrom tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from src.models.inventory import InventoryItemfrom src.repositories.inventory_repository import InventoryRepositoryProtocol
class InsufficientStockError(Exception): def __init__(self, book_id: int, requested: int, available: int) -> None: self.book_id = book_id self.requested = requested self.available = available super().__init__( f"Book {book_id}: requested {requested}, available {available}" )
class InventoryService: def __init__(self, repo: InventoryRepositoryProtocol) -> None: self._repo = repo
@retry( retry=retry_if_exception_type(StaleDataError), stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.1, max=1.0), ) async def reserve_stock( self, book_id: int, warehouse_id: int, quantity: int, ) -> InventoryItem: """Atomically reduce inventory. Retries on concurrent modification.""" item = await self._repo.get_by_book_and_warehouse(book_id, warehouse_id) if item is None: raise ValueError(f"No inventory record for book {book_id}")
if item.quantity < quantity: raise InsufficientStockError(book_id, quantity, item.quantity)
item.quantity -= quantity await self._repo.save(item) return itemThe tenacity retry decorator retries up to three times with exponential backoff. In practice, the first retry almost always succeeds because the conflicting transaction has already committed.
Idempotency: Preventing Double Execution
Rate limiting from Part 15 stops intentional abuse. Idempotency stops accidental duplication. When a client sends a payment request and the connection drops before they receive the response, they have no way to know if the payment went through. They retry. Without idempotency, the customer is charged twice.
The Idempotency Key Pattern
The client provides an Idempotency-Key header with a unique value (typically a UUID). The server checks if it has already processed a request with that key. If yes, it returns the cached response. If no, it processes the request and caches the response.
import jsonfrom dataclasses import dataclassfrom datetime import UTC, datetime, timedeltafrom typing import Any
import redis.asyncio as redisfrom sqlalchemy.ext.asyncio import AsyncSessionfrom sqlalchemy import select, String, Integer, DateTime, Textfrom sqlalchemy.orm import Mapped, mapped_column
from src.db.base import Base
class IdempotencyRecord(Base): """Persistent idempotency storage for audit and recovery.""" __tablename__ = "idempotency_keys"
key: Mapped[str] = mapped_column(String(255), primary_key=True) tenant_id: Mapped[int] = mapped_column(Integer, index=True) status_code: Mapped[int] = mapped_column(Integer) response_body: Mapped[str] = mapped_column(Text) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(UTC), )
@dataclass(frozen=True, slots=True)class CachedResponse: status_code: int body: dict[str, Any]
TTL: int = 86_400 # 24 hours
class IdempotencyStore: """Two-layer idempotency: Redis for speed, PostgreSQL for persistence."""
def __init__(self, redis_client: redis.Redis, session: AsyncSession) -> None: self._redis = redis_client self._session = session
async def get(self, key: str, tenant_id: int) -> CachedResponse | None: # Fast path: check Redis cached = await self._redis.get(f"idem:{tenant_id}:{key}") if cached is not None: data = json.loads(cached) return CachedResponse( status_code=data["status_code"], body=data["body"], )
# Slow path: check PostgreSQL (Redis may have evicted) stmt = select(IdempotencyRecord).where( IdempotencyRecord.key == key, IdempotencyRecord.tenant_id == tenant_id, ) record = (await self._session.execute(stmt)).scalar_one_or_none() if record is None: return None
response = CachedResponse( status_code=record.status_code, body=json.loads(record.response_body), ) # Re-populate Redis for subsequent retries await self._cache_in_redis(key, tenant_id, response) return response
async def store( self, key: str, tenant_id: int, response: CachedResponse, ) -> None: # Write to both layers await self._cache_in_redis(key, tenant_id, response)
record = IdempotencyRecord( key=key, tenant_id=tenant_id, status_code=response.status_code, response_body=json.dumps(response.body), ) self._session.add(record) await self._session.flush()
async def _cache_in_redis( self, key: str, tenant_id: int, response: CachedResponse, ) -> None: payload = json.dumps({ "status_code": response.status_code, "body": response.body, }) await self._redis.setex( f"idem:{tenant_id}:{key}", TTL, payload, )Idempotency Middleware
The middleware intercepts mutating requests and checks for the idempotency key before the endpoint runs.
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpointfrom starlette.requests import Requestfrom starlette.responses import JSONResponse, Response
from src.core.tenant_context import get_current_tenantfrom src.infra.idempotency import IdempotencyStore, CachedResponse
MUTATING_METHODS = {"POST", "PUT", "PATCH", "DELETE"}
class IdempotencyMiddleware(BaseHTTPMiddleware): def __init__(self, app, store: IdempotencyStore) -> None: # noqa: ANN001 super().__init__(app) self._store = store
async def dispatch( self, request: Request, call_next: RequestResponseEndpoint, ) -> Response: if request.method not in MUTATING_METHODS: return await call_next(request)
idem_key = request.headers.get("Idempotency-Key") if idem_key is None: return await call_next(request)
tenant = get_current_tenant() if tenant is None: return await call_next(request)
# Check for cached response cached = await self._store.get(idem_key, tenant.id) if cached is not None: return JSONResponse( status_code=cached.status_code, content=cached.body, headers={"X-Idempotent-Replayed": "true"}, )
# Process request and cache response response = await call_next(request)
if 200 <= response.status_code < 300: body = b"" async for chunk in response.body_iterator: body += chunk if isinstance(chunk, bytes) else chunk.encode()
import json cached_response = CachedResponse( status_code=response.status_code, body=json.loads(body), ) await self._store.store(idem_key, tenant.id, cached_response)
return JSONResponse( status_code=response.status_code, content=cached_response.body, headers=dict(response.headers), )
return responseThe X-Idempotent-Replayed: true header tells the client this is a cached response from a previous execution, not a new one. This matters for debugging.
The Saga Pattern: Multi-Step Operations
Some operations span multiple services and cannot be wrapped in a single database transaction. In ShelfWise, creating an order requires four steps: reserve inventory, charge payment, confirm the order, and send a notification. If payment fails after inventory is reserved, we need to release the reservation.
Each forward step has a compensating action. If any step fails, the saga runs compensations in reverse order to undo everything that succeeded.
Saga Orchestrator
The orchestrator manages the state machine. Each saga is a sequence of steps, and each step has an execute and a compensate method.
from abc import ABC, abstractmethodfrom dataclasses import dataclass, fieldfrom enum import StrEnumfrom typing import Anyimport structlog
logger = structlog.get_logger()
class SagaStepStatus(StrEnum): PENDING = "pending" COMPLETED = "completed" FAILED = "failed" COMPENSATED = "compensated"
@dataclassclass SagaStep(ABC): name: str status: SagaStepStatus = SagaStepStatus.PENDING
@abstractmethod async def execute(self, context: dict[str, Any]) -> dict[str, Any]: """Run the forward action. Return data to merge into context.""" ...
@abstractmethod async def compensate(self, context: dict[str, Any]) -> None: """Undo the forward action.""" ...
@dataclassclass SagaResult: success: bool context: dict[str, Any] failed_step: str | None = None error: str | None = None
class SagaOrchestrator: """Runs a sequence of steps with automatic compensation on failure."""
def __init__(self, steps: list[SagaStep]) -> None: self._steps = steps
async def execute(self, initial_context: dict[str, Any]) -> SagaResult: context = dict(initial_context) completed: list[SagaStep] = []
for step in self._steps: try: logger.info("saga_step_executing", step=step.name) result = await step.execute(context) context.update(result) step.status = SagaStepStatus.COMPLETED completed.append(step) logger.info("saga_step_completed", step=step.name) except Exception as exc: step.status = SagaStepStatus.FAILED logger.error( "saga_step_failed", step=step.name, error=str(exc), ) await self._compensate(completed, context) return SagaResult( success=False, context=context, failed_step=step.name, error=str(exc), )
return SagaResult(success=True, context=context)
async def _compensate( self, completed: list[SagaStep], context: dict[str, Any], ) -> None: for step in reversed(completed): try: logger.info("saga_compensating", step=step.name) await step.compensate(context) step.status = SagaStepStatus.COMPENSATED logger.info("saga_compensated", step=step.name) except Exception as exc: logger.error( "saga_compensation_failed", step=step.name, error=str(exc), ) # Compensation failure is a critical alert — # requires manual interventionOrder Creation Saga
Here is the concrete saga for creating an order in ShelfWise.
from dataclasses import dataclassfrom typing import Any
from src.sagas.base import SagaStep, SagaOrchestratorfrom src.services.inventory_service import InventoryServicefrom src.services.payment_service import PaymentServicefrom src.services.order_service import OrderServicefrom src.services.notification_service import NotificationService
@dataclassclass ReserveInventoryStep(SagaStep): name: str = "reserve_inventory" inventory_service: InventoryService | None = None
async def execute(self, context: dict[str, Any]) -> dict[str, Any]: assert self.inventory_service is not None reservation = await self.inventory_service.reserve_stock( book_id=context["book_id"], warehouse_id=context["warehouse_id"], quantity=context["quantity"], ) return {"reservation_id": reservation.id}
async def compensate(self, context: dict[str, Any]) -> None: assert self.inventory_service is not None await self.inventory_service.release_reservation( context["reservation_id"], )
@dataclassclass ChargePaymentStep(SagaStep): name: str = "charge_payment" payment_service: PaymentService | None = None
async def execute(self, context: dict[str, Any]) -> dict[str, Any]: assert self.payment_service is not None charge = await self.payment_service.charge( customer_id=context["customer_id"], amount=context["total_amount"], idempotency_key=context["idempotency_key"], ) return {"charge_id": charge.id}
async def compensate(self, context: dict[str, Any]) -> None: assert self.payment_service is not None await self.payment_service.refund(context["charge_id"])
@dataclassclass ConfirmOrderStep(SagaStep): name: str = "confirm_order" order_service: OrderService | None = None
async def execute(self, context: dict[str, Any]) -> dict[str, Any]: assert self.order_service is not None order = await self.order_service.confirm( order_id=context["order_id"], charge_id=context["charge_id"], ) return {"confirmed_order_id": order.id}
async def compensate(self, context: dict[str, Any]) -> None: assert self.order_service is not None await self.order_service.cancel(context["order_id"])
async def create_order_saga( context: dict[str, Any], inventory_service: InventoryService, payment_service: PaymentService, order_service: OrderService,) -> dict[str, Any]: saga = SagaOrchestrator([ ReserveInventoryStep(inventory_service=inventory_service), ChargePaymentStep(payment_service=payment_service), ConfirmOrderStep(order_service=order_service), ]) result = await saga.execute(context) if not result.success: raise OrderCreationFailed( step=result.failed_step, reason=result.error, ) return result.context
class OrderCreationFailed(Exception): def __init__(self, step: str | None, reason: str | None) -> None: self.step = step self.reason = reason super().__init__(f"Order creation failed at {step}: {reason}")Notice that ChargePaymentStep.execute passes the idempotency_key to the payment service. If the saga fails after payment and restarts from scratch, the payment provider returns the cached charge instead of creating a new one. Sagas and idempotency work together.
Transactional Outbox
The saga raises a question: how do you reliably publish domain events (like “order confirmed”) when the database transaction and the message broker are two separate systems? If you publish the event first and the transaction rolls back, consumers see an event that never happened. If you commit first and the publish fails, the event is lost.
The transactional outbox pattern solves this by writing events to an outbox table in the same transaction as the business data. A separate poller reads the outbox and publishes to the message broker.
from datetime import UTC, datetimefrom sqlalchemy import Integer, String, Text, DateTime, Booleanfrom sqlalchemy.orm import Mapped, mapped_column
from src.db.base import Base
class OutboxEvent(Base): __tablename__ = "outbox_events"
id: Mapped[int] = mapped_column(Integer, primary_key=True) aggregate_type: Mapped[str] = mapped_column(String(100)) aggregate_id: Mapped[int] = mapped_column(Integer, index=True) event_type: Mapped[str] = mapped_column(String(100)) payload: Mapped[str] = mapped_column(Text) # JSON created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(UTC), ) published: Mapped[bool] = mapped_column(Boolean, default=False)# src/services/order_service.py — inside the confirm methodasync def confirm(self, order_id: int, charge_id: str) -> Order: order = await self._repo.get(order_id) order.status = OrderStatus.CONFIRMED order.charge_id = charge_id
# Write event in the SAME transaction as the order update event = OutboxEvent( aggregate_type="Order", aggregate_id=order.id, event_type="order.confirmed", payload=json.dumps({ "order_id": order.id, "customer_id": order.customer_id, "total": str(order.total), }), ) self._session.add(event) await self._session.flush() return orderThe outbox poller uses SELECT FOR UPDATE SKIP LOCKED to safely process events across multiple worker instances without duplicate delivery.
from sqlalchemy import select, update
async def poll_outbox(session: AsyncSession, publisher: EventPublisher) -> int: """Fetch and publish unpublished outbox events. Returns count published.""" stmt = ( select(OutboxEvent) .where(OutboxEvent.published == False) # noqa: E712 .order_by(OutboxEvent.created_at) .limit(100) .with_for_update(skip_locked=True) ) events = (await session.execute(stmt)).scalars().all()
for event in events: await publisher.publish( topic=event.event_type, payload=event.payload, ) event.published = True
await session.commit() return len(events)SKIP LOCKED means if another worker instance is already processing an event, this query skips it instead of waiting. No duplicate deliveries, no lock contention.
What Happens Without This
In ShelfWise’s first month, a customer placed an order during a brief network glitch. Their browser retried the payment request. Without idempotency, the payment gateway processed both requests — the customer was charged twice. The support ticket turned into a Twitter thread. The thread turned into a Hacker News comment.
We implemented idempotency keys the next day. Every payment request now carries an Idempotency-Key header. The payment gateway returns the cached result on retry. The customer is charged exactly once, regardless of how many times the request is sent.
Checklist
Before shipping data consistency patterns to production:
- Every mutable entity has a
versioncolumn withversion_id_col StaleDataErroris caught and retried with exponential backoff (max 3 attempts)- All mutating endpoints accept an
Idempotency-Keyheader - Idempotency responses are stored in both Redis (speed) and PostgreSQL (durability)
- Multi-step operations use the saga pattern with explicit compensating actions
- Domain events are written to a transactional outbox in the same transaction as business data
- Outbox poller uses
SELECT FOR UPDATE SKIP LOCKEDfor safe concurrent processing - Compensation failures trigger critical alerts for manual intervention
Next in Part 17, we tackle the most dangerous operation in multi-tenant SaaS: schema migrations on live databases with hundreds of tenants.