At 1M transactions per day, you cannot process everything in the request cycle. Order confirmation emails, webhook delivery, invoice PDF generation, and analytics aggregation must happen asynchronously. The API returns a 201 in 200ms. The background worker sends the email, fires the webhook, and generates the invoice over the next 30 seconds.
But background tasks are where tenant context silently disappears. The worker has no HTTP request, no middleware, no headers. The TenantContext contextvar from Part 9 is empty. If your task code calls TenantContext.get(), it returns None — and every operation runs without tenant scoping. One team learned this the hard way: six months of background analytics jobs ran without tenant context, attributing every tenant’s metrics to a single default account. The fix required reprocessing 180 days of data.
The solution is explicit: every task carries its tenant ID in the payload, and the worker reconstructs the TenantContext before executing any business logic.
Why ARQ Over Celery
| ARQ | Celery | Dramatiq | |
|---|---|---|---|
| Async native | Yes — built on asyncio from the ground up | No — async support is bolted on via eventlet/gevent | No — sync workers with optional async |
| Broker | Redis only | Redis, RabbitMQ, SQS | Redis, RabbitMQ |
| Dependencies | 1 (redis) | 12+ transitive dependencies | 3-5 depending on broker |
| Task definition | Plain async functions | Decorated functions with Celery app import | Decorated functions with actor import |
| Startup overhead | Minimal — single event loop | Heavy — worker prefork, broker connection pool | Moderate |
| Python 3.12+ compat | Full | Frequent breakage on new Python releases | Good |
| Best for | Async Python backends (FastAPI, Starlette) | Django projects, mixed sync/async | Sync Python backends |
ARQ is the natural choice for a FastAPI backend. The entire ShelfWise stack is async — FastAPI, SQLAlchemy async, httpx, Redis async. Celery’s sync-first architecture forces you to choose between running async code inside sync workers (defeating the purpose) or bolting on eventlet (adding complexity and subtle bugs). ARQ runs on the same event loop as the rest of the application.
Task Definition: The TenantTask Base
Every background task must carry the tenant ID. Forgetting it is the single most common source of multi-tenant bugs in background processing. The TenantTask pattern makes the tenant ID mandatory at the type level:
from dataclasses import dataclassfrom typing import Any
from src.core.tenant import TenantContext
@dataclass(frozen=True, slots=True, kw_only=True)class TenantTask: """Base payload for all background tasks.
tenant_id is required — not optional, not defaulted. The worker MUST reconstruct TenantContext from this field before executing any business logic. """ tenant_id: str
@dataclass(frozen=True, slots=True, kw_only=True)class SendOrderConfirmationTask(TenantTask): order_id: int customer_email: str
@dataclass(frozen=True, slots=True, kw_only=True)class DeliverWebhookTask(TenantTask): order_id: int webhook_url: str event_type: str
@dataclass(frozen=True, slots=True, kw_only=True)class GenerateInvoiceTask(TenantTask): order_id: int
@dataclass(frozen=True, slots=True, kw_only=True)class UpdateTrendingTask(TenantTask): book_ids: list[int]
@dataclass(frozen=True, slots=True, kw_only=True)class UpdateInventoryTask(TenantTask): items: list[dict[str, Any]]Using frozen=True dataclasses makes task payloads immutable — they cannot be accidentally modified between enqueue and execution. Using kw_only=True forces explicit field names at the call site, preventing positional argument mistakes.
Worker Setup: Reconstructing Context
The worker has no HTTP middleware. It must reconstruct the tenant context from the task payload before executing any business logic:
import structlogfrom arq import cronfrom arq.connections import RedisSettingsfrom opentelemetry import trace
from src.core.tenant import TenantContextfrom src.tasks.handlers import ( handle_send_order_confirmation, handle_deliver_webhook, handle_generate_invoice, handle_update_trending, handle_update_inventory,)
logger = structlog.get_logger()tracer = trace.get_tracer(__name__)
async def execute_task(ctx: dict, task_name: str, payload: dict) -> None: """Universal task executor that reconstructs tenant context.
Every task goes through this function. It: 1. Extracts tenant_id from the payload (mandatory) 2. Sets TenantContext for the duration of the task 3. Binds structlog context (request_id, tenant_id) 4. Creates an OpenTelemetry span for the task 5. Dispatches to the appropriate handler """ tenant_id = payload.get("tenant_id") if not tenant_id: logger.error("task_missing_tenant_id", task_name=task_name) raise ValueError(f"Task {task_name} missing required tenant_id")
# Reconstruct the context that middleware would normally provide TenantContext.set(tenant_id) structlog.contextvars.clear_contextvars() structlog.contextvars.bind_contextvars( tenant_id=tenant_id, task_name=task_name, task_id=ctx.get("job_id", "unknown"), )
with tracer.start_as_current_span(f"task.{task_name}") as span: span.set_attribute("tenant_id", tenant_id) span.set_attribute("task_name", task_name)
handlers = { "send_order_confirmation": handle_send_order_confirmation, "deliver_webhook": handle_deliver_webhook, "generate_invoice": handle_generate_invoice, "update_trending": handle_update_trending, "update_inventory": handle_update_inventory, }
handler = handlers.get(task_name) if not handler: logger.error("unknown_task", task_name=task_name) raise ValueError(f"Unknown task: {task_name}")
logger.info("task_started") await handler(ctx, payload) logger.info("task_completed")The execute_task function is the background equivalent of the ObservabilityMiddleware from Part 12. It sets up TenantContext, binds structlog context variables, and creates an OpenTelemetry span — all before any business logic runs. Every log line emitted by the handler includes tenant_id and task_name automatically.
Enqueuing Tasks from the Request Cycle
The API handler creates the order synchronously (within the request) and enqueues background tasks for everything else. The response returns immediately:
# src/services/order_service.py (extended)import structlogfrom arq.connections import ArqRedis
from src.core.tenant import TenantContextfrom src.tasks.base import ( SendOrderConfirmationTask, DeliverWebhookTask, GenerateInvoiceTask, UpdateTrendingTask, UpdateInventoryTask,)
logger = structlog.get_logger()
class OrderService: def __init__( self, *, order_repo: OrderRepositoryProtocol, task_queue: ArqRedis, ) -> None: self._order_repo = order_repo self._queue = task_queue
async def create_order(self, data: OrderCreate) -> OrderResponse: tenant_id = TenantContext.get()
# Synchronous: must complete before response order = await self._order_repo.create(data) logger.info("order_persisted", order_id=order.id)
# Asynchronous: enqueue and return immediately tasks = [ ("send_order_confirmation", SendOrderConfirmationTask( tenant_id=tenant_id, order_id=order.id, customer_email=data.customer_email, )), ("deliver_webhook", DeliverWebhookTask( tenant_id=tenant_id, order_id=order.id, webhook_url=data.webhook_url, event_type="order.created", )), ("generate_invoice", GenerateInvoiceTask( tenant_id=tenant_id, order_id=order.id, )), ("update_trending", UpdateTrendingTask( tenant_id=tenant_id, book_ids=[item.book_id for item in data.items], )), ("update_inventory", UpdateInventoryTask( tenant_id=tenant_id, items=[ {"book_id": item.book_id, "quantity": -item.quantity} for item in data.items ], )), ]
for task_name, payload in tasks: await self._queue.enqueue_job( "execute_task", task_name, payload.__dict__, )
logger.info("tasks_enqueued", count=len(tasks), order_id=order.id) return orderA large order with 50 items returns in ~200ms. The 5 background tasks execute over the next 30 seconds in the worker process. The customer sees their order confirmation page immediately; the email arrives shortly after.
Task Lifecycle
Retry Strategies: Exponential Backoff with Jitter
Transient failures — network timeouts, temporary rate limits, brief database unavailability — resolve themselves if you wait and retry. But retrying immediately floods the failing service. Exponential backoff with jitter spreads retries over time:
import randomimport asynciofrom collections.abc import Awaitable, Callablefrom typing import Any
import structlog
logger = structlog.get_logger()
async def with_retry( fn: Callable[..., Awaitable[Any]], *args: Any, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, retryable_exceptions: tuple[type[Exception], ...] = ( ConnectionError, TimeoutError, OSError, ), **kwargs: Any,) -> Any: """Execute an async function with exponential backoff and jitter.
Delay formula: min(max_delay, base_delay * 2^attempt) + random jitter Attempt 1: ~1-2s, Attempt 2: ~2-4s, Attempt 3: ~4-8s """ last_exception: Exception | None = None
for attempt in range(max_retries + 1): try: return await fn(*args, **kwargs) except retryable_exceptions as exc: last_exception = exc if attempt == max_retries: logger.error( "task_retries_exhausted", attempt=attempt, error=str(exc), ) break
delay = min(max_delay, base_delay * (2 ** attempt)) jitter = random.uniform(0, delay * 0.5) total_delay = delay + jitter
logger.warning( "task_retry_scheduled", attempt=attempt + 1, max_retries=max_retries, delay_seconds=round(total_delay, 2), error=str(exc), ) await asyncio.sleep(total_delay)
raise last_exception # type: ignore[misc]The jitter prevents thundering herd: if 100 tasks fail at the same time, they all retry at slightly different times instead of hitting the failing service simultaneously at the 2-second mark.
Task Handlers with Idempotency
A task may execute more than once. The worker might crash after completing the work but before acknowledging the task. The queue redelivers it. If the handler is not idempotent, the customer receives two confirmation emails:
import structlogfrom src.tasks.retry import with_retry
logger = structlog.get_logger()
async def handle_send_order_confirmation(ctx: dict, payload: dict) -> None: """Send order confirmation email. Idempotent via idempotency key.""" order_id = payload["order_id"] email = payload["customer_email"] tenant_id = payload["tenant_id"]
# Idempotency check: has this email already been sent? idempotency_key = f"email:order_confirm:{tenant_id}:{order_id}" redis = ctx["redis"]
already_sent = await redis.set(idempotency_key, "1", nx=True, ex=86400) if not already_sent: logger.info("task_skipped_duplicate", idempotency_key=idempotency_key) return
await with_retry( _send_email, to=email, template="order_confirmation", context={"order_id": order_id, "tenant_id": tenant_id}, ) logger.info("order_confirmation_sent", order_id=order_id)
async def handle_deliver_webhook(ctx: dict, payload: dict) -> None: """Deliver webhook to tenant's configured endpoint. Idempotent via delivery log.""" order_id = payload["order_id"] webhook_url = payload["webhook_url"] event_type = payload["event_type"] tenant_id = payload["tenant_id"]
idempotency_key = f"webhook:{event_type}:{tenant_id}:{order_id}" redis = ctx["redis"]
already_delivered = await redis.set(idempotency_key, "1", nx=True, ex=86400) if not already_delivered: logger.info("task_skipped_duplicate", idempotency_key=idempotency_key) return
await with_retry( _post_webhook, url=webhook_url, event_type=event_type, order_id=order_id, tenant_id=tenant_id, retryable_exceptions=(ConnectionError, TimeoutError), ) logger.info("webhook_delivered", order_id=order_id, event_type=event_type)The idempotency key is {action}:{entity}:{tenant_id}:{id}. The SET NX command (set if not exists) is atomic — if two workers pick up the same task, only one succeeds in setting the key. The other sees already_sent = False and skips execution. The key expires after 24 hours to prevent unbounded growth.
Dead Letter Queue
After max_retries exhausted, the task moves to a dead letter queue for manual investigation. It is not silently dropped:
import orjsonimport structlogfrom datetime import UTC, datetimefrom redis.asyncio import Redis
logger = structlog.get_logger()
async def move_to_dead_letter( redis: Redis, *, task_name: str, payload: dict, error: str, attempts: int,) -> None: """Move a permanently failed task to the dead letter queue.
Tasks in the DLQ are stored as a Redis sorted set keyed by timestamp. Operations can review, retry, or discard them via an admin endpoint. """ entry = { "task_name": task_name, "payload": payload, "error": error, "attempts": attempts, "failed_at": datetime.now(UTC).isoformat(), "tenant_id": payload.get("tenant_id", "unknown"), }
await redis.zadd( "dead_letter_queue", {orjson.dumps(entry): datetime.now(UTC).timestamp()}, )
logger.error( "task_moved_to_dead_letter", task_name=task_name, tenant_id=payload.get("tenant_id"), error=error, attempts=attempts, )The dead letter queue is a Redis sorted set ordered by failure time. Operations can list recent failures, inspect payloads, and selectively retry via an admin API. The alert threshold from Part 12 fires when any task enters the DLQ.
Priority Queues
Not all tasks are equal. Payment capture must happen before report generation. ARQ supports multiple queues with different priorities:
from arq.connections import RedisSettings
class QueueConfig: """Queue configuration with priority levels.""" CRITICAL = "arq:critical" # Payment, inventory — processed first DEFAULT = "arq:default" # Email, webhook — standard priority BACKGROUND = "arq:background" # Reports, analytics — processed when idle
# Enqueue with priorityasync def enqueue_critical(queue, task_name: str, payload: dict) -> None: """Enqueue to the critical queue. Payment, inventory operations.""" await queue.enqueue_job( "execute_task", task_name, payload, _queue_name=QueueConfig.CRITICAL, )
async def enqueue_background(queue, task_name: str, payload: dict) -> None: """Enqueue to the background queue. Reports, analytics.""" await queue.enqueue_job( "execute_task", task_name, payload, _queue_name=QueueConfig.BACKGROUND, )Run separate worker processes for each priority level. Critical workers process payment tasks within seconds. Background workers handle report generation during off-peak hours. If the background queue backs up, critical tasks are unaffected.
Graceful Worker Shutdown
When deploying a new version, workers must finish their current task before shutting down. Killing a worker mid-task means the task is neither completed nor re-queued — it is lost:
# src/tasks/worker.py (extended)import signalimport asyncioimport structlog
logger = structlog.get_logger()
class GracefulWorker: """Worker wrapper that handles SIGTERM for graceful shutdown."""
def __init__(self, drain_timeout: float = 30.0) -> None: self._shutting_down = False self._drain_timeout = drain_timeout
def install_signal_handlers(self) -> None: loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, self._handle_shutdown)
def _handle_shutdown(self) -> None: if self._shutting_down: logger.warning("forced_shutdown") raise SystemExit(1) self._shutting_down = True logger.info( "graceful_shutdown_initiated", drain_timeout=self._drain_timeout, )
@property def should_stop(self) -> bool: return self._shutting_downThe first SIGTERM sets _shutting_down = True. The worker finishes its current task and exits cleanly. A second SIGTERM forces immediate shutdown — use this only if the drain timeout is exceeded.
Periodic Tasks: Scheduled Jobs with Tenant Iteration
Some tasks run on a schedule rather than being enqueued by a request. Daily report generation, weekly analytics aggregation, and hourly cache warming all need to iterate over all tenants:
import structlogfrom arq import cron
from src.core.tenant import TenantContextfrom src.repositories.tenant_repo import TenantRepository
logger = structlog.get_logger()
async def daily_report_generation(ctx: dict) -> None: """Generate daily reports for all active tenants.
Runs at 02:00 UTC. Iterates tenants and enqueues one report task per tenant. Each task carries its own tenant_id — no shared state between tenants. """ tenant_repo = TenantRepository(ctx["db_session"]) tenants = await tenant_repo.list_active()
logger.info("periodic_report_started", tenant_count=len(tenants))
for tenant in tenants: await ctx["redis"].enqueue_job( "execute_task", "generate_daily_report", {"tenant_id": tenant.id, "report_date": str(ctx["today"])}, _queue_name="arq:background", )
logger.info("periodic_report_enqueued", tenant_count=len(tenants))
# ARQ worker configurationclass WorkerSettings: functions = [execute_task] cron_jobs = [ cron(daily_report_generation, hour=2, minute=0), ] redis_settings = RedisSettings(host="redis", port=6379) max_jobs = 20 job_timeout = 300The periodic task does not process reports directly. It enqueues one task per tenant, each with its own tenant_id. This means each report task gets its own retry logic, its own deadline, and its own entry in the dead letter queue if it fails. A failure for one tenant does not block reports for other tenants.
The ShelfWise Flow: Large Order Processing
A customer at Powell’s Books places a large order (50 items). Here is the complete flow:
The API responds in 200ms. The email sends successfully. The webhook delivery fails three times and moves to the dead letter queue. The alert from Part 12 fires, and on-call sees exactly which tenant, which order, and which webhook URL failed — including the full error trace.
The Context Propagation Failure Story
A ShelfWise engineering team added background tasks for analytics aggregation. The tasks worked correctly in development — every test passed, every metric looked right. Six months later, a product manager noticed that one tenant had 10x more “orders processed” than their actual order volume, while newer tenants showed zero.
The root cause: the analytics task did not carry tenant_id in its payload. In development, the TenantContext contextvar still held the value from the test setup. In production, the worker’s contextvar was empty on the first task and retained the value from the previous task on subsequent runs.
The first task that happened to execute after worker startup ran with tenant_id=None, which the repository silently treated as the default tenant. Every subsequent task inherited the tenant ID from whatever task ran before it — a random, non-deterministic assignment.
The fix was the TenantTask base class shown earlier: make tenant_id a required field at the type level so the code does not compile without it, and clear_contextvars() at the start of every task execution so context never bleeds between tasks.
Key Takeaways
- ARQ for async-native stacks. If your backend is FastAPI + SQLAlchemy async + httpx, your task queue should be async too. ARQ runs on the same event loop.
- TenantTask base class with mandatory tenant_id. The type system prevents enqueuing tasks without tenant context. This is not a convention — it is a compile-time guarantee.
- Reconstruct context in the worker.
TenantContext.set(),bind_contextvars(), andtracer.start_span()at the start of every task. The worker is the middleware for background processing. - Idempotency keys prevent duplicate execution.
SET NXin Redis ensures at-most-once semantics for side effects. Include tenant ID in the key. - Dead letter queue for permanent failures. Never silently drop a failed task. Alert on DLQ entries and provide admin tooling for inspection and retry.
- Clear contextvars between tasks. Without clearing, tenant context from the previous task contaminates the current one. This is the background-task equivalent of a cross-tenant data leak.
Next: Part 15 covers API versioning and backward compatibility — how to evolve your API contract without breaking existing clients.