Tutorial

Clean Code Python: Event-Driven Architecture — Domain Events and CQRS Lite

When order placement triggers inventory, email, webhooks, analytics, and distributor notifications — doing it synchronously makes the endpoint slow and couples everything. Domain events decouple 'what happened' from 'what should happen next.'

Tin Dang avatar
Tin Dang
Network of interconnected nodes with messages flowing between independent processing stations

A customer places an order on ShelfWise. The system must reserve inventory, capture payment, send a confirmation email, fire a webhook to the distributor’s ERP, update the analytics dashboard, and log an audit trail. The naive implementation does all six things inside create_order().

That endpoint now takes 3.2 seconds. If the email provider is slow, the customer waits. If the webhook times out, the entire order fails — even though the payment already captured. If you need to add a seventh side effect next quarter, you modify the same function, retest everything, and pray nothing breaks.

Events fix this by separating the fact (“an order was placed”) from the reactions (“send email,” “notify distributor”). The order service publishes OrderPlaced. Five handlers subscribe and react independently. If the email handler fails, the order remains valid. The distributor webhook retries on its own schedule. The endpoint returns in 200ms.

Domain Events as First-Class Objects

A domain event is an immutable record of something that happened. Not a command (“place this order”), not a query (“get this order”) — a fact in past tense.

src/domain/events.py
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from uuid import UUID, uuid4
@dataclass(frozen=True, slots=True, kw_only=True)
class DomainEvent:
"""Base class for all domain events."""
event_id: UUID = field(default_factory=uuid4)
occurred_at: datetime = field(default_factory=lambda: datetime.now(UTC))
tenant_id: UUID
version: int = 1
@dataclass(frozen=True, slots=True, kw_only=True)
class OrderPlaced(DomainEvent):
order_id: UUID
customer_id: UUID
items: tuple[OrderItem, ...]
total_amount: float
currency: str = "USD"
@dataclass(frozen=True, slots=True, kw_only=True)
class InventoryReserved(DomainEvent):
order_id: UUID
reservations: tuple[Reservation, ...]
@dataclass(frozen=True, slots=True, kw_only=True)
class PaymentCaptured(DomainEvent):
order_id: UUID
payment_id: UUID
amount: float
currency: str
@dataclass(frozen=True, slots=True, kw_only=True)
class OrderFulfilled(DomainEvent):
order_id: UUID
shipped_at: datetime
tracking_number: str

Three design decisions matter here. First, events are frozen dataclasses — immutable after creation. You cannot change what happened. Second, every event carries tenant_id — handlers never need to look it up. Third, version enables schema evolution (more on this below).

Synchronous Side EffectsEvent-Driven
Coupling Order service depends on email, webhook, analytics, inventory Order service depends on nothing — publishes an event
Failure blast radius Any side-effect failure fails the order Side effects fail independently
Latency Sum of all side-effect latencies Only the core operation
Adding new reactions Modify order service, retest everything Add a new handler, zero changes to order service
Testing Mock 6 dependencies per test Test order service and each handler in isolation
Debugging One giant stack trace One trace per handler, correlated by event_id

The In-Process Event Bus

You do not need Kafka, RabbitMQ, or any external infrastructure to start with events. An in-process mediator is enough for a single-instance application, and it is the right starting point. Move to external infrastructure when you need multi-instance fan-out or guaranteed delivery across process restarts.

src/core/event_bus.py
from __future__ import annotations
import asyncio
import logging
from collections import defaultdict
from collections.abc import Callable, Awaitable
from typing import Any
logger = logging.getLogger(__name__)
# Type alias for event handlers
EventHandler = Callable[[Any], Awaitable[None]]
class EventBus:
"""In-process pub/sub event mediator.
Handlers run concurrently via TaskGroup. A failing handler
does not prevent other handlers from executing.
"""
def __init__(self) -> None:
self._handlers: dict[type, list[EventHandler]] = defaultdict(list)
def subscribe(self, event_type: type, handler: EventHandler) -> None:
"""Register a handler for an event type."""
self._handlers[event_type].append(handler)
async def publish(self, event: Any) -> list[Exception]:
"""Publish an event to all registered handlers.
Returns a list of exceptions from failed handlers.
Successful handlers are unaffected by failures in others.
"""
handlers = self._handlers.get(type(event), [])
if not handlers:
return []
exceptions: list[Exception] = []
async def _safe_handle(handler: EventHandler) -> None:
try:
await handler(event)
except Exception as exc:
logger.error(
"Handler %s failed for event %s: %s",
handler.__qualname__,
type(event).__name__,
exc,
exc_info=True,
)
exceptions.append(exc)
# Run all handlers concurrently
tasks = [asyncio.create_task(_safe_handle(h)) for h in handlers]
await asyncio.gather(*tasks)
return exceptions

Handler registration is explicit and happens at startup:

src/core/bootstrap.py
from src.core.event_bus import EventBus
from src.handlers.inventory import reserve_inventory
from src.handlers.notifications import send_order_confirmation
from src.handlers.webhooks import fire_distributor_webhook
from src.handlers.analytics import update_order_analytics
from src.handlers.audit import log_order_audit_trail
from src.domain.events import OrderPlaced
def configure_event_bus() -> EventBus:
bus = EventBus()
bus.subscribe(OrderPlaced, reserve_inventory)
bus.subscribe(OrderPlaced, send_order_confirmation)
bus.subscribe(OrderPlaced, fire_distributor_webhook)
bus.subscribe(OrderPlaced, update_order_analytics)
bus.subscribe(OrderPlaced, log_order_audit_trail)
return bus

Now the order service publishes a single event and moves on:

src/services/order_service.py
class OrderService:
def __init__(
self,
repo: OrderRepositoryProtocol,
event_bus: EventBus,
session: AsyncSession,
) -> None:
self._repo = repo
self._bus = event_bus
self._session = session
async def place_order(
self, payload: PlaceOrderRequest, tenant_id: UUID, customer_id: UUID
) -> Order:
# Core operation — the only thing that MUST succeed
order = Order(
tenant_id=tenant_id,
customer_id=customer_id,
items=payload.items,
total_amount=payload.total_amount,
)
await self._repo.save(order)
await self._session.flush()
# Publish event — handlers fire concurrently
event = OrderPlaced(
tenant_id=tenant_id,
order_id=order.id,
customer_id=customer_id,
items=tuple(payload.items),
total_amount=payload.total_amount,
)
errors = await self._bus.publish(event)
if errors:
# Log failures but do NOT fail the order
logger.warning(
"Order %s placed successfully, but %d handler(s) failed",
order.id, len(errors),
)
return order

The Transactional Outbox

The in-process bus has a gap: if the application crashes after committing the order but before publishing the event, the event is lost. The transactional outbox pattern closes this gap by writing the event to an outbox table in the same database transaction as the aggregate change.

The outbox table stores serialized events:

src/models/outbox.py
from __future__ import annotations
import json
from dataclasses import asdict
from datetime import UTC, datetime
from uuid import UUID, uuid4
from sqlalchemy import String, Text
from sqlalchemy.orm import Mapped, mapped_column
from src.db.base import Base
class OutboxEntry(Base):
__tablename__ = "outbox"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
event_type: Mapped[str] = mapped_column(String(200), index=True)
event_id: Mapped[UUID] = mapped_column(index=True)
tenant_id: Mapped[UUID] = mapped_column(index=True)
payload: Mapped[str] = mapped_column(Text) # JSON-serialized event
created_at: Mapped[datetime] = mapped_column(
default=lambda: datetime.now(UTC), index=True
)
processed_at: Mapped[datetime | None] = mapped_column(default=None)
error: Mapped[str | None] = mapped_column(Text, default=None)

Writing to the outbox is part of the service transaction — same session, same commit:

# src/services/order_service.py (updated)
async def place_order(self, payload: PlaceOrderRequest, ...) -> Order:
order = Order(...)
await self._repo.save(order)
event = OrderPlaced(
tenant_id=tenant_id,
order_id=order.id,
customer_id=customer_id,
items=tuple(payload.items),
total_amount=payload.total_amount,
)
# Write event to outbox in the SAME transaction
outbox_entry = OutboxEntry(
event_type="OrderPlaced",
event_id=event.event_id,
tenant_id=event.tenant_id,
payload=json.dumps(asdict(event), default=str),
)
self._session.add(outbox_entry)
await self._session.commit()
# If commit fails, both the order AND the event are rolled back.
# If commit succeeds, the event is guaranteed to be in the outbox.
return order

The outbox processor runs as a background task (from Part 14), polling for unprocessed entries:

src/tasks/outbox_processor.py
from __future__ import annotations
import json
from datetime import UTC, datetime
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.event_bus import EventBus
from src.domain.events import REGISTRY
from src.models.outbox import OutboxEntry
class OutboxProcessor:
"""Process outbox entries and dispatch to event handlers."""
def __init__(self, session_factory, event_bus: EventBus, batch_size: int = 50) -> None:
self._session_factory = session_factory
self._bus = event_bus
self._batch_size = batch_size
async def process_batch(self) -> int:
"""Process a batch of unprocessed outbox entries. Returns count processed."""
async with self._session_factory() as session:
stmt = (
select(OutboxEntry)
.where(OutboxEntry.processed_at.is_(None))
.order_by(OutboxEntry.created_at)
.limit(self._batch_size)
.with_for_update(skip_locked=True)
)
entries = (await session.scalars(stmt)).all()
processed = 0
for entry in entries:
try:
event_cls = REGISTRY[entry.event_type]
event = event_cls(**json.loads(entry.payload))
await self._bus.publish(event)
entry.processed_at = datetime.now(UTC)
processed += 1
except Exception as exc:
entry.error = str(exc)
await session.commit()
return processed

The event registry maps type names back to classes:

# src/domain/events.py (addition)
REGISTRY: dict[str, type[DomainEvent]] = {
"OrderPlaced": OrderPlaced,
"InventoryReserved": InventoryReserved,
"PaymentCaptured": PaymentCaptured,
"OrderFulfilled": OrderFulfilled,
}

CQRS-Lite: Separate Read and Write Models

ShelfWise’s dashboard needs to show “orders this month, total revenue, top-selling books, orders pending fulfillment” — all for the current tenant. Querying this from normalized tables requires joining orders, order_items, books, payments, and shipments. That query is expensive, and it runs on every dashboard load.

CQRS-lite separates the concern: writes go to normalized tables (the write model), and event handlers project data into denormalized read tables (the read model).

src/models/read_models.py
class OrderSummary(Base):
"""Denormalized read model — updated by event handlers, never by services."""
__tablename__ = "order_summaries"
order_id: Mapped[UUID] = mapped_column(primary_key=True)
tenant_id: Mapped[UUID] = mapped_column(index=True)
customer_name: Mapped[str] = mapped_column(String(200))
item_count: Mapped[int]
total_amount: Mapped[float]
currency: Mapped[str] = mapped_column(String(3))
status: Mapped[str] = mapped_column(String(50)) # "placed", "fulfilled", "cancelled"
placed_at: Mapped[datetime]
fulfilled_at: Mapped[datetime | None] = mapped_column(default=None)
top_item_title: Mapped[str] = mapped_column(String(300)) # Most expensive item
class TenantDashboard(Base):
"""Aggregate read model per tenant — one row, updated incrementally."""
__tablename__ = "tenant_dashboards"
tenant_id: Mapped[UUID] = mapped_column(primary_key=True)
orders_this_month: Mapped[int] = mapped_column(default=0)
revenue_this_month: Mapped[float] = mapped_column(default=0.0)
pending_fulfillment: Mapped[int] = mapped_column(default=0)
updated_at: Mapped[datetime] = mapped_column(
default=lambda: datetime.now(UTC)
)

The projection handler listens for OrderPlaced and updates the read models:

src/handlers/projections.py
from __future__ import annotations
from datetime import UTC, datetime
from sqlalchemy.ext.asyncio import AsyncSession
from src.domain.events import OrderPlaced, OrderFulfilled
from src.models.read_models import OrderSummary, TenantDashboard
class OrderProjectionHandler:
def __init__(self, session_factory) -> None:
self._session_factory = session_factory
async def on_order_placed(self, event: OrderPlaced) -> None:
async with self._session_factory() as session:
# 1. Insert order summary
top_item = max(event.items, key=lambda i: i.price)
summary = OrderSummary(
order_id=event.order_id,
tenant_id=event.tenant_id,
customer_name=await self._resolve_customer_name(
session, event.customer_id
),
item_count=len(event.items),
total_amount=event.total_amount,
currency=event.currency,
status="placed",
placed_at=event.occurred_at,
top_item_title=top_item.title,
)
session.add(summary)
# 2. Update dashboard counters
dashboard = await session.get(TenantDashboard, event.tenant_id)
if dashboard:
dashboard.orders_this_month += 1
dashboard.revenue_this_month += event.total_amount
dashboard.pending_fulfillment += 1
dashboard.updated_at = datetime.now(UTC)
await session.commit()
async def on_order_fulfilled(self, event: OrderFulfilled) -> None:
async with self._session_factory() as session:
summary = await session.get(OrderSummary, event.order_id)
if summary:
summary.status = "fulfilled"
summary.fulfilled_at = event.shipped_at
dashboard = await session.get(TenantDashboard, event.tenant_id)
if dashboard:
dashboard.pending_fulfillment -= 1
dashboard.updated_at = datetime.now(UTC)
await session.commit()

The dashboard endpoint reads from the denormalized table — a single-row lookup, no joins:

@router.get("/dashboard")
async def get_dashboard(
tenant: Annotated[Tenant, Depends(get_current_tenant)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> TenantDashboardResponse:
dashboard = await session.get(TenantDashboard, tenant.id)
return TenantDashboardResponse.model_validate(dashboard)

Event Versioning and Upcasting

Events are stored in the outbox (and potentially in an event store). Schemas evolve. Six months from now, OrderPlaced needs a discount_code field. You cannot change the serialized events already in the outbox. Upcasting transforms old event versions to the current schema at read time.

src/domain/upcasters.py
from __future__ import annotations
from typing import Any
def upcast_order_placed(data: dict[str, Any], from_version: int) -> dict[str, Any]:
"""Upcast OrderPlaced from any version to current."""
if from_version < 2:
# v1 → v2: added discount_code (default: no discount)
data.setdefault("discount_code", None)
if from_version < 3:
# v2 → v3: renamed total_amount to total_cents (converted to int cents)
if "total_amount" in data and "total_cents" not in data:
data["total_cents"] = int(data.pop("total_amount") * 100)
return data
UPCASTERS: dict[str, Any] = {
"OrderPlaced": upcast_order_placed,
}

Event Replay: Rebuilding Read Models

Because events are stored in the outbox, you can rebuild any read model from scratch. Corrupted dashboard? Truncate the table and replay all OrderPlaced and OrderFulfilled events. This is a management command, not a runtime operation.

src/management/rebuild_projections.py
from __future__ import annotations
import asyncio
from sqlalchemy import select, delete
from src.models.outbox import OutboxEntry
from src.models.read_models import OrderSummary, TenantDashboard
from src.handlers.projections import OrderProjectionHandler
from src.domain.events import REGISTRY
async def rebuild_order_projections(session_factory) -> int:
"""Replay all order events to rebuild read models."""
handler = OrderProjectionHandler(session_factory)
replayed = 0
async with session_factory() as session:
# Clear existing read models
await session.execute(delete(OrderSummary))
await session.execute(delete(TenantDashboard))
await session.commit()
async with session_factory() as session:
stmt = (
select(OutboxEntry)
.where(
OutboxEntry.event_type.in_(["OrderPlaced", "OrderFulfilled"]),
OutboxEntry.processed_at.is_not(None),
)
.order_by(OutboxEntry.created_at)
)
entries = (await session.scalars(stmt)).all()
for entry in entries:
event_cls = REGISTRY[entry.event_type]
event = deserialize_with_upcast(entry)
match event:
case _ if isinstance(event, OrderPlaced):
await handler.on_order_placed(event)
case _ if isinstance(event, OrderFulfilled):
await handler.on_order_fulfilled(event)
replayed += 1
return replayed

The ShelfWise Scenario: End to End

A customer at Powell’s Books places an order for 3 titles. Here is what happens:

  1. POST /api/v1/orders — Order service creates the order row and writes an OrderPlaced event to the outbox. Single transaction. Endpoint returns in 180ms.

  2. Outbox processor (running every 500ms) picks up the event and publishes it to the in-process bus.

  3. Five handlers fire concurrently:

    • reserve_inventory — decrements stock for 3 books. Succeeds.
    • send_order_confirmation — calls the email provider. Provider returns 500. Handler logs the error and schedules a retry.
    • fire_distributor_webhook — POSTs to Powell’s ERP endpoint. Succeeds in 1.2s.
    • update_order_analytics — increments counters. Succeeds.
    • log_order_audit_trail — writes audit entry. Succeeds.
  4. The email failure does not affect the order. It does not affect inventory. It does not affect the webhook. The retry mechanism (from Part 14) picks it up 60 seconds later and succeeds.

  5. The dashboard at GET /api/v1/dashboard reads from TenantDashboard — a single row, no joins. It shows the new order within 500ms of placement.

This is the power of event-driven architecture: the order service has zero knowledge of email, webhooks, analytics, or dashboards. Adding a sixth handler next quarter requires zero changes to the order service. The system is decoupled in code, in failure modes, and in time.

When to Graduate to External Infrastructure

The in-process bus works until it does not. Here are the signals that you need Kafka, RabbitMQ, or SQS:

  • Multiple instances — events published on instance A must reach handlers on instance B.
  • Guaranteed delivery across restarts — the outbox handles crash recovery, but an external queue handles consumer restarts.
  • Event replay at scale — replaying millions of events from a database outbox is slow. Kafka retains events natively.
  • Cross-service communication — separate services need to react to your events.

The architecture in this post is designed for that migration. Events are already serialized in the outbox. Replace the outbox processor with a Kafka producer, replace the in-process bus subscriptions with Kafka consumer groups, and every handler works unchanged. The domain events, the handler signatures, and the read model projections stay exactly the same.

That is the point of clean architecture: the infrastructure changes, the domain stays.

0

Next in this series

Clean Code Python: API Versioning and Backward-Compatible Evolution

Continue reading