Tutorial

Clean Code Python: Background Tasks — Queues, Workers, and Context Propagation

At 1M transactions per day, you cannot process everything in the request cycle. But background tasks are where tenant context silently disappears — the worker has no request, no middleware, no headers. Here is how to propagate context safely.

Tin Dang avatar
Tin Dang
Factory assembly line with items moving between stations showing sequential processing across separate workstations

At 1M transactions per day, you cannot process everything in the request cycle. Order confirmation emails, webhook delivery, invoice PDF generation, and analytics aggregation must happen asynchronously. The API returns a 201 in 200ms. The background worker sends the email, fires the webhook, and generates the invoice over the next 30 seconds.

But background tasks are where tenant context silently disappears. The worker has no HTTP request, no middleware, no headers. The TenantContext contextvar from Part 9 is empty. If your task code calls TenantContext.get(), it returns None — and every operation runs without tenant scoping. One team learned this the hard way: six months of background analytics jobs ran without tenant context, attributing every tenant’s metrics to a single default account. The fix required reprocessing 180 days of data.

The solution is explicit: every task carries its tenant ID in the payload, and the worker reconstructs the TenantContext before executing any business logic.

Why ARQ Over Celery

ARQCeleryDramatiq
Async native Yes — built on asyncio from the ground up No — async support is bolted on via eventlet/gevent No — sync workers with optional async
Broker Redis only Redis, RabbitMQ, SQS Redis, RabbitMQ
Dependencies 1 (redis) 12+ transitive dependencies 3-5 depending on broker
Task definition Plain async functions Decorated functions with Celery app import Decorated functions with actor import
Startup overhead Minimal — single event loop Heavy — worker prefork, broker connection pool Moderate
Python 3.12+ compat Full Frequent breakage on new Python releases Good
Best for Async Python backends (FastAPI, Starlette) Django projects, mixed sync/async Sync Python backends

ARQ is the natural choice for a FastAPI backend. The entire ShelfWise stack is async — FastAPI, SQLAlchemy async, httpx, Redis async. Celery’s sync-first architecture forces you to choose between running async code inside sync workers (defeating the purpose) or bolting on eventlet (adding complexity and subtle bugs). ARQ runs on the same event loop as the rest of the application.

Task Definition: The TenantTask Base

Every background task must carry the tenant ID. Forgetting it is the single most common source of multi-tenant bugs in background processing. The TenantTask pattern makes the tenant ID mandatory at the type level:

src/tasks/base.py
from dataclasses import dataclass
from typing import Any
from src.core.tenant import TenantContext
@dataclass(frozen=True, slots=True, kw_only=True)
class TenantTask:
"""Base payload for all background tasks.
tenant_id is required — not optional, not defaulted.
The worker MUST reconstruct TenantContext from this field
before executing any business logic.
"""
tenant_id: str
@dataclass(frozen=True, slots=True, kw_only=True)
class SendOrderConfirmationTask(TenantTask):
order_id: int
customer_email: str
@dataclass(frozen=True, slots=True, kw_only=True)
class DeliverWebhookTask(TenantTask):
order_id: int
webhook_url: str
event_type: str
@dataclass(frozen=True, slots=True, kw_only=True)
class GenerateInvoiceTask(TenantTask):
order_id: int
@dataclass(frozen=True, slots=True, kw_only=True)
class UpdateTrendingTask(TenantTask):
book_ids: list[int]
@dataclass(frozen=True, slots=True, kw_only=True)
class UpdateInventoryTask(TenantTask):
items: list[dict[str, Any]]

Using frozen=True dataclasses makes task payloads immutable — they cannot be accidentally modified between enqueue and execution. Using kw_only=True forces explicit field names at the call site, preventing positional argument mistakes.

Worker Setup: Reconstructing Context

The worker has no HTTP middleware. It must reconstruct the tenant context from the task payload before executing any business logic:

src/tasks/worker.py
import structlog
from arq import cron
from arq.connections import RedisSettings
from opentelemetry import trace
from src.core.tenant import TenantContext
from src.tasks.handlers import (
handle_send_order_confirmation,
handle_deliver_webhook,
handle_generate_invoice,
handle_update_trending,
handle_update_inventory,
)
logger = structlog.get_logger()
tracer = trace.get_tracer(__name__)
async def execute_task(ctx: dict, task_name: str, payload: dict) -> None:
"""Universal task executor that reconstructs tenant context.
Every task goes through this function. It:
1. Extracts tenant_id from the payload (mandatory)
2. Sets TenantContext for the duration of the task
3. Binds structlog context (request_id, tenant_id)
4. Creates an OpenTelemetry span for the task
5. Dispatches to the appropriate handler
"""
tenant_id = payload.get("tenant_id")
if not tenant_id:
logger.error("task_missing_tenant_id", task_name=task_name)
raise ValueError(f"Task {task_name} missing required tenant_id")
# Reconstruct the context that middleware would normally provide
TenantContext.set(tenant_id)
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
tenant_id=tenant_id,
task_name=task_name,
task_id=ctx.get("job_id", "unknown"),
)
with tracer.start_as_current_span(f"task.{task_name}") as span:
span.set_attribute("tenant_id", tenant_id)
span.set_attribute("task_name", task_name)
handlers = {
"send_order_confirmation": handle_send_order_confirmation,
"deliver_webhook": handle_deliver_webhook,
"generate_invoice": handle_generate_invoice,
"update_trending": handle_update_trending,
"update_inventory": handle_update_inventory,
}
handler = handlers.get(task_name)
if not handler:
logger.error("unknown_task", task_name=task_name)
raise ValueError(f"Unknown task: {task_name}")
logger.info("task_started")
await handler(ctx, payload)
logger.info("task_completed")

The execute_task function is the background equivalent of the ObservabilityMiddleware from Part 12. It sets up TenantContext, binds structlog context variables, and creates an OpenTelemetry span — all before any business logic runs. Every log line emitted by the handler includes tenant_id and task_name automatically.

Enqueuing Tasks from the Request Cycle

The API handler creates the order synchronously (within the request) and enqueues background tasks for everything else. The response returns immediately:

# src/services/order_service.py (extended)
import structlog
from arq.connections import ArqRedis
from src.core.tenant import TenantContext
from src.tasks.base import (
SendOrderConfirmationTask,
DeliverWebhookTask,
GenerateInvoiceTask,
UpdateTrendingTask,
UpdateInventoryTask,
)
logger = structlog.get_logger()
class OrderService:
def __init__(
self,
*,
order_repo: OrderRepositoryProtocol,
task_queue: ArqRedis,
) -> None:
self._order_repo = order_repo
self._queue = task_queue
async def create_order(self, data: OrderCreate) -> OrderResponse:
tenant_id = TenantContext.get()
# Synchronous: must complete before response
order = await self._order_repo.create(data)
logger.info("order_persisted", order_id=order.id)
# Asynchronous: enqueue and return immediately
tasks = [
("send_order_confirmation", SendOrderConfirmationTask(
tenant_id=tenant_id,
order_id=order.id,
customer_email=data.customer_email,
)),
("deliver_webhook", DeliverWebhookTask(
tenant_id=tenant_id,
order_id=order.id,
webhook_url=data.webhook_url,
event_type="order.created",
)),
("generate_invoice", GenerateInvoiceTask(
tenant_id=tenant_id,
order_id=order.id,
)),
("update_trending", UpdateTrendingTask(
tenant_id=tenant_id,
book_ids=[item.book_id for item in data.items],
)),
("update_inventory", UpdateInventoryTask(
tenant_id=tenant_id,
items=[
{"book_id": item.book_id, "quantity": -item.quantity}
for item in data.items
],
)),
]
for task_name, payload in tasks:
await self._queue.enqueue_job(
"execute_task",
task_name,
payload.__dict__,
)
logger.info("tasks_enqueued", count=len(tasks), order_id=order.id)
return order

A large order with 50 items returns in ~200ms. The 5 background tasks execute over the next 30 seconds in the worker process. The customer sees their order confirmation page immediately; the email arrives shortly after.

Task Lifecycle

Retry Strategies: Exponential Backoff with Jitter

Transient failures — network timeouts, temporary rate limits, brief database unavailability — resolve themselves if you wait and retry. But retrying immediately floods the failing service. Exponential backoff with jitter spreads retries over time:

src/tasks/retry.py
import random
import asyncio
from collections.abc import Awaitable, Callable
from typing import Any
import structlog
logger = structlog.get_logger()
async def with_retry(
fn: Callable[..., Awaitable[Any]],
*args: Any,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
retryable_exceptions: tuple[type[Exception], ...] = (
ConnectionError,
TimeoutError,
OSError,
),
**kwargs: Any,
) -> Any:
"""Execute an async function with exponential backoff and jitter.
Delay formula: min(max_delay, base_delay * 2^attempt) + random jitter
Attempt 1: ~1-2s, Attempt 2: ~2-4s, Attempt 3: ~4-8s
"""
last_exception: Exception | None = None
for attempt in range(max_retries + 1):
try:
return await fn(*args, **kwargs)
except retryable_exceptions as exc:
last_exception = exc
if attempt == max_retries:
logger.error(
"task_retries_exhausted",
attempt=attempt,
error=str(exc),
)
break
delay = min(max_delay, base_delay * (2 ** attempt))
jitter = random.uniform(0, delay * 0.5)
total_delay = delay + jitter
logger.warning(
"task_retry_scheduled",
attempt=attempt + 1,
max_retries=max_retries,
delay_seconds=round(total_delay, 2),
error=str(exc),
)
await asyncio.sleep(total_delay)
raise last_exception # type: ignore[misc]

The jitter prevents thundering herd: if 100 tasks fail at the same time, they all retry at slightly different times instead of hitting the failing service simultaneously at the 2-second mark.

Task Handlers with Idempotency

A task may execute more than once. The worker might crash after completing the work but before acknowledging the task. The queue redelivers it. If the handler is not idempotent, the customer receives two confirmation emails:

src/tasks/handlers.py
import structlog
from src.tasks.retry import with_retry
logger = structlog.get_logger()
async def handle_send_order_confirmation(ctx: dict, payload: dict) -> None:
"""Send order confirmation email. Idempotent via idempotency key."""
order_id = payload["order_id"]
email = payload["customer_email"]
tenant_id = payload["tenant_id"]
# Idempotency check: has this email already been sent?
idempotency_key = f"email:order_confirm:{tenant_id}:{order_id}"
redis = ctx["redis"]
already_sent = await redis.set(idempotency_key, "1", nx=True, ex=86400)
if not already_sent:
logger.info("task_skipped_duplicate", idempotency_key=idempotency_key)
return
await with_retry(
_send_email,
to=email,
template="order_confirmation",
context={"order_id": order_id, "tenant_id": tenant_id},
)
logger.info("order_confirmation_sent", order_id=order_id)
async def handle_deliver_webhook(ctx: dict, payload: dict) -> None:
"""Deliver webhook to tenant's configured endpoint. Idempotent via delivery log."""
order_id = payload["order_id"]
webhook_url = payload["webhook_url"]
event_type = payload["event_type"]
tenant_id = payload["tenant_id"]
idempotency_key = f"webhook:{event_type}:{tenant_id}:{order_id}"
redis = ctx["redis"]
already_delivered = await redis.set(idempotency_key, "1", nx=True, ex=86400)
if not already_delivered:
logger.info("task_skipped_duplicate", idempotency_key=idempotency_key)
return
await with_retry(
_post_webhook,
url=webhook_url,
event_type=event_type,
order_id=order_id,
tenant_id=tenant_id,
retryable_exceptions=(ConnectionError, TimeoutError),
)
logger.info("webhook_delivered", order_id=order_id, event_type=event_type)

The idempotency key is {action}:{entity}:{tenant_id}:{id}. The SET NX command (set if not exists) is atomic — if two workers pick up the same task, only one succeeds in setting the key. The other sees already_sent = False and skips execution. The key expires after 24 hours to prevent unbounded growth.

Dead Letter Queue

After max_retries exhausted, the task moves to a dead letter queue for manual investigation. It is not silently dropped:

src/tasks/dead_letter.py
import orjson
import structlog
from datetime import UTC, datetime
from redis.asyncio import Redis
logger = structlog.get_logger()
async def move_to_dead_letter(
redis: Redis,
*,
task_name: str,
payload: dict,
error: str,
attempts: int,
) -> None:
"""Move a permanently failed task to the dead letter queue.
Tasks in the DLQ are stored as a Redis sorted set keyed by timestamp.
Operations can review, retry, or discard them via an admin endpoint.
"""
entry = {
"task_name": task_name,
"payload": payload,
"error": error,
"attempts": attempts,
"failed_at": datetime.now(UTC).isoformat(),
"tenant_id": payload.get("tenant_id", "unknown"),
}
await redis.zadd(
"dead_letter_queue",
{orjson.dumps(entry): datetime.now(UTC).timestamp()},
)
logger.error(
"task_moved_to_dead_letter",
task_name=task_name,
tenant_id=payload.get("tenant_id"),
error=error,
attempts=attempts,
)

The dead letter queue is a Redis sorted set ordered by failure time. Operations can list recent failures, inspect payloads, and selectively retry via an admin API. The alert threshold from Part 12 fires when any task enters the DLQ.

Priority Queues

Not all tasks are equal. Payment capture must happen before report generation. ARQ supports multiple queues with different priorities:

src/tasks/queues.py
from arq.connections import RedisSettings
class QueueConfig:
"""Queue configuration with priority levels."""
CRITICAL = "arq:critical" # Payment, inventory — processed first
DEFAULT = "arq:default" # Email, webhook — standard priority
BACKGROUND = "arq:background" # Reports, analytics — processed when idle
# Enqueue with priority
async def enqueue_critical(queue, task_name: str, payload: dict) -> None:
"""Enqueue to the critical queue. Payment, inventory operations."""
await queue.enqueue_job(
"execute_task",
task_name,
payload,
_queue_name=QueueConfig.CRITICAL,
)
async def enqueue_background(queue, task_name: str, payload: dict) -> None:
"""Enqueue to the background queue. Reports, analytics."""
await queue.enqueue_job(
"execute_task",
task_name,
payload,
_queue_name=QueueConfig.BACKGROUND,
)

Run separate worker processes for each priority level. Critical workers process payment tasks within seconds. Background workers handle report generation during off-peak hours. If the background queue backs up, critical tasks are unaffected.

Graceful Worker Shutdown

When deploying a new version, workers must finish their current task before shutting down. Killing a worker mid-task means the task is neither completed nor re-queued — it is lost:

# src/tasks/worker.py (extended)
import signal
import asyncio
import structlog
logger = structlog.get_logger()
class GracefulWorker:
"""Worker wrapper that handles SIGTERM for graceful shutdown."""
def __init__(self, drain_timeout: float = 30.0) -> None:
self._shutting_down = False
self._drain_timeout = drain_timeout
def install_signal_handlers(self) -> None:
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self._handle_shutdown)
def _handle_shutdown(self) -> None:
if self._shutting_down:
logger.warning("forced_shutdown")
raise SystemExit(1)
self._shutting_down = True
logger.info(
"graceful_shutdown_initiated",
drain_timeout=self._drain_timeout,
)
@property
def should_stop(self) -> bool:
return self._shutting_down

The first SIGTERM sets _shutting_down = True. The worker finishes its current task and exits cleanly. A second SIGTERM forces immediate shutdown — use this only if the drain timeout is exceeded.

Periodic Tasks: Scheduled Jobs with Tenant Iteration

Some tasks run on a schedule rather than being enqueued by a request. Daily report generation, weekly analytics aggregation, and hourly cache warming all need to iterate over all tenants:

src/tasks/periodic.py
import structlog
from arq import cron
from src.core.tenant import TenantContext
from src.repositories.tenant_repo import TenantRepository
logger = structlog.get_logger()
async def daily_report_generation(ctx: dict) -> None:
"""Generate daily reports for all active tenants.
Runs at 02:00 UTC. Iterates tenants and enqueues one report task per tenant.
Each task carries its own tenant_id — no shared state between tenants.
"""
tenant_repo = TenantRepository(ctx["db_session"])
tenants = await tenant_repo.list_active()
logger.info("periodic_report_started", tenant_count=len(tenants))
for tenant in tenants:
await ctx["redis"].enqueue_job(
"execute_task",
"generate_daily_report",
{"tenant_id": tenant.id, "report_date": str(ctx["today"])},
_queue_name="arq:background",
)
logger.info("periodic_report_enqueued", tenant_count=len(tenants))
# ARQ worker configuration
class WorkerSettings:
functions = [execute_task]
cron_jobs = [
cron(daily_report_generation, hour=2, minute=0),
]
redis_settings = RedisSettings(host="redis", port=6379)
max_jobs = 20
job_timeout = 300

The periodic task does not process reports directly. It enqueues one task per tenant, each with its own tenant_id. This means each report task gets its own retry logic, its own deadline, and its own entry in the dead letter queue if it fails. A failure for one tenant does not block reports for other tenants.

The ShelfWise Flow: Large Order Processing

A customer at Powell’s Books places a large order (50 items). Here is the complete flow:

The API responds in 200ms. The email sends successfully. The webhook delivery fails three times and moves to the dead letter queue. The alert from Part 12 fires, and on-call sees exactly which tenant, which order, and which webhook URL failed — including the full error trace.

The Context Propagation Failure Story

A ShelfWise engineering team added background tasks for analytics aggregation. The tasks worked correctly in development — every test passed, every metric looked right. Six months later, a product manager noticed that one tenant had 10x more “orders processed” than their actual order volume, while newer tenants showed zero.

The root cause: the analytics task did not carry tenant_id in its payload. In development, the TenantContext contextvar still held the value from the test setup. In production, the worker’s contextvar was empty on the first task and retained the value from the previous task on subsequent runs.

The first task that happened to execute after worker startup ran with tenant_id=None, which the repository silently treated as the default tenant. Every subsequent task inherited the tenant ID from whatever task ran before it — a random, non-deterministic assignment.

The fix was the TenantTask base class shown earlier: make tenant_id a required field at the type level so the code does not compile without it, and clear_contextvars() at the start of every task execution so context never bleeds between tasks.

Key Takeaways

  • ARQ for async-native stacks. If your backend is FastAPI + SQLAlchemy async + httpx, your task queue should be async too. ARQ runs on the same event loop.
  • TenantTask base class with mandatory tenant_id. The type system prevents enqueuing tasks without tenant context. This is not a convention — it is a compile-time guarantee.
  • Reconstruct context in the worker. TenantContext.set(), bind_contextvars(), and tracer.start_span() at the start of every task. The worker is the middleware for background processing.
  • Idempotency keys prevent duplicate execution. SET NX in Redis ensures at-most-once semantics for side effects. Include tenant ID in the key.
  • Dead letter queue for permanent failures. Never silently drop a failed task. Alert on DLQ entries and provide admin tooling for inspection and retry.
  • Clear contextvars between tasks. Without clearing, tenant context from the previous task contaminates the current one. This is the background-task equivalent of a cross-tenant data leak.

Next: Part 15 covers API versioning and backward compatibility — how to evolve your API contract without breaking existing clients.

0

Next in this series

Clean Code Python: Rate Limiting and Noisy Neighbor Prevention

Continue reading