Tutorial

Clean Code Python: Health Checks, Graceful Shutdown, and Zero-Downtime Deploys

Without health checks, your load balancer sends traffic to dying instances. Without graceful shutdown, every deploy drops requests. Here is how ShelfWise achieves zero-downtime deployments with proper probes, SIGTERM handling, and connection draining.

Tin Dang avatar
Tin Dang
Industrial control panel with green status indicators and smooth operational gauges

ShelfWise deploys three times a day. Each deploy takes about 30 seconds to roll out. At peak load, the platform handles 12 requests per second. That means every deploy risks dropping roughly 360 requests — orders that vanish, inventory updates that never land, webhooks that fire but never complete.

For months, nobody noticed. The frontend retried silently, users saw a brief spinner, and support tickets trickled in as “intermittent errors.” Then the team calculated the cost: 12 dropped requests per deploy, 3 deploys per day, 30 days per month. Over a thousand lost transactions per month, some of them order placements from enterprise tenants with SLAs.

The fix is not “deploy less often.” The fix is making deploys invisible to every client, every time.

The Three Health Probes

Kubernetes, ECS, and every serious load balancer support three distinct probes. Each answers a different question, and conflating them causes cascading failures.

src/api/health.py
from __future__ import annotations
from datetime import UTC, datetime
from typing import Annotated
from fastapi import APIRouter, Depends, status
from fastapi.responses import JSONResponse
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.session import get_session
router = APIRouter(prefix="/health", tags=["health"])
@router.get("/live", status_code=status.HTTP_200_OK)
async def liveness() -> dict[str, str]:
"""Is the process running and not deadlocked?
This endpoint does zero I/O. If it fails, the process is broken
and the orchestrator should kill and replace it.
"""
return {"status": "alive", "timestamp": datetime.now(UTC).isoformat()}
@router.get("/ready")
async def readiness(
session: Annotated[AsyncSession, Depends(get_session)],
) -> JSONResponse:
"""Can this instance serve traffic?
Checks every downstream dependency. If any check fails,
the load balancer stops sending traffic but does NOT kill
the instance — it may recover.
"""
checks: dict[str, bool] = {}
all_healthy = True
# Database
try:
await session.execute(text("SELECT 1"))
checks["database"] = True
except Exception:
checks["database"] = False
all_healthy = False
# Redis
try:
from src.cache.redis import get_redis_pool
redis = get_redis_pool()
await redis.ping()
checks["redis"] = True
except Exception:
checks["redis"] = False
all_healthy = False
status_code = status.HTTP_200_OK if all_healthy else status.HTTP_503_SERVICE_UNAVAILABLE
return JSONResponse(
content={"status": "ready" if all_healthy else "degraded", "checks": checks},
status_code=status_code,
)
@router.get("/startup")
async def startup_probe() -> JSONResponse:
"""Has the application finished initializing?
Returns 200 only after migrations are verified, caches are warmed,
and the application is fully ready to handle its first request.
Unlike readiness, this runs only during boot — not continuously.
"""
from src.core.app_state import app_state
if not app_state.startup_complete:
return JSONResponse(
content={"status": "starting"},
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
)
return JSONResponse(content={"status": "started"}, status_code=status.HTTP_200_OK)
ProbeQuestionFailure ActionI/O AllowedFrequency
Liveness Is the process alive? Kill and restart No — must be instant Every 10s
Readiness Can it serve traffic? Remove from load balancer Yes — check dependencies Every 5s
Startup Has it finished booting? Keep waiting (do not kill) Yes — one-time checks Every 3s until ready

Dependency Health Checks with Timeout

A readiness check that hangs for 30 seconds waiting for a dead Redis is worse than no check at all. Every dependency check needs a timeout shorter than the probe interval.

src/core/health_checks.py
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from typing import Callable, Awaitable
@dataclass(frozen=True, slots=True)
class HealthCheckResult:
name: str
healthy: bool
latency_ms: float
error: str | None = None
@dataclass
class HealthChecker:
"""Runs dependency checks with individual timeouts."""
checks: dict[str, Callable[[], Awaitable[None]]] = field(default_factory=dict)
timeout_seconds: float = 3.0
def register(self, name: str, check: Callable[[], Awaitable[None]]) -> None:
self.checks[name] = check
async def run_all(self) -> list[HealthCheckResult]:
"""Run all checks concurrently with per-check timeouts."""
results: list[HealthCheckResult] = []
async def _run_one(name: str, check: Callable[[], Awaitable[None]]) -> HealthCheckResult:
start = asyncio.get_event_loop().time()
try:
async with asyncio.timeout(self.timeout_seconds):
await check()
elapsed = (asyncio.get_event_loop().time() - start) * 1000
return HealthCheckResult(name=name, healthy=True, latency_ms=elapsed)
except TimeoutError:
elapsed = (asyncio.get_event_loop().time() - start) * 1000
return HealthCheckResult(
name=name, healthy=False, latency_ms=elapsed,
error=f"Timeout after {self.timeout_seconds}s",
)
except Exception as exc:
elapsed = (asyncio.get_event_loop().time() - start) * 1000
return HealthCheckResult(
name=name, healthy=False, latency_ms=elapsed, error=str(exc),
)
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(_run_one(name, check))
for name, check in self.checks.items()
]
return [t.result() for t in tasks]

Register checks during application startup:

health_checker = HealthChecker(timeout_seconds=2.0)
health_checker.register("postgres", check_postgres)
health_checker.register("redis", check_redis)
health_checker.register("s3", check_s3_bucket)

Graceful Shutdown: The SIGTERM Handler

When the orchestrator decides to replace your instance, it sends SIGTERM. You have a window — typically 30 seconds — to finish in-flight work before SIGKILL arrives. Without a handler, the process dies mid-request.

Here is the sequence that ShelfWise follows:

The implementation uses FastAPI’s lifespan context manager — a single function that handles both startup and shutdown:

src/main.py
from __future__ import annotations
import asyncio
import signal
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import FastAPI
from src.core.app_state import app_state
from src.db.session import engine, async_session_factory
from src.cache.redis import redis_pool
from src.tasks.scheduler import task_scheduler
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""Manage application lifecycle: startup and shutdown."""
# --- STARTUP ---
# 1. Run migrations check
await verify_migrations_current()
# 2. Warm critical caches
await warm_tenant_config_cache()
# 3. Start background task scheduler
await task_scheduler.start()
# 4. Mark startup complete (startup probe begins returning 200)
app_state.startup_complete = True
# 5. Register SIGTERM handler for graceful shutdown
shutdown_event = asyncio.Event()
loop = asyncio.get_running_loop()
def _signal_handler() -> None:
app_state.shutting_down = True
shutdown_event.set()
loop.add_signal_handler(signal.SIGTERM, _signal_handler)
yield
# --- SHUTDOWN (runs on SIGTERM or normal exit) ---
app_state.shutting_down = True
# 1. Stop accepting new background tasks
await task_scheduler.stop(timeout=15.0)
# 2. Wait for in-flight HTTP requests to drain
# (Uvicorn handles this via --timeout-graceful-shutdown)
await asyncio.sleep(0.5) # Brief pause for request completion
# 3. Close database connections
await engine.dispose()
# 4. Close Redis connections
await redis_pool.aclose()
# 5. Flush logs
import logging
for handler in logging.root.handlers:
handler.flush()
app = FastAPI(lifespan=lifespan)

The app_state.shutting_down flag is checked by the readiness probe. The moment it is set, readiness returns 503, the load balancer stops routing new traffic, and existing requests complete naturally.

src/core/app_state.py
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class AppState:
"""Mutable application state — not a singleton, injected via lifespan."""
startup_complete: bool = False
shutting_down: bool = False
active_requests: int = 0
app_state = AppState()

The Real Failure: ShelfWise Before Graceful Shutdown

Here is what happened before the SIGTERM handler existed:

  1. Deploy triggered at 2:14 PM during peak operations.
  2. Orchestrator sent SIGTERM to old instance.
  3. Instance died immediately — 8 in-flight requests killed mid-execution.
  4. Two of those were order placements. The database transaction started but never committed. Inventory was decremented (in a prior query) but the order row was never written.
  5. Result: phantom inventory loss. Two tenants reported missing stock with no corresponding orders.
  6. The team spent 4 hours reconciling inventory manually.

After implementing graceful shutdown:

  1. Deploy triggered at 2:14 PM.
  2. SIGTERM received. Readiness probe returns 503.
  3. Load balancer drains traffic to new instance within 3 seconds.
  4. 8 in-flight requests complete normally, including a 2-second report generation.
  5. Background tasks finish their current batch (inventory sync for 3 tenants).
  6. Connection pools close. Process exits with code 0.
  7. Zero dropped requests. Zero manual intervention. Team does not even notice.

Deployment Strategies

Graceful shutdown handles the instance level. Deployment strategy handles the fleet level — how you roll out new code across multiple instances without downtime.

StrategyHow It WorksRollback SpeedRiskBest For
Rolling Replace instances one at a time Minutes (re-deploy old) Brief mixed versions Stateless services, most deploys
Blue-Green Run two full environments, swap traffic Seconds (flip DNS/LB) Double infrastructure cost Critical releases, database migrations
Canary Route 5% traffic to new version, monitor, expand Seconds (route 0% to canary) Lowest risk, highest complexity High-traffic services, breaking changes

ShelfWise uses rolling deploys for routine releases and canary deploys for changes that touch the database or payment flow.

Canary Deploy: 5% Traffic, Auto-Rollback

The canary pipeline monitors error rate and p99 latency. If either regresses beyond the threshold, it automatically rolls back — no human intervention required.

deploy/canary_check.py
"""Canary health verification — runs every 30s during canary window."""
from __future__ import annotations
from dataclasses import dataclass
import httpx
@dataclass(frozen=True, slots=True)
class CanaryMetrics:
error_rate: float # 0.0 to 1.0
p99_latency_ms: float
request_count: int
@dataclass(frozen=True, slots=True)
class CanaryThresholds:
max_error_rate: float = 0.01 # 1% error budget
max_p99_latency_ms: float = 500 # 500ms p99 ceiling
min_requests: int = 100 # Minimum sample size before judging
async def evaluate_canary(
metrics: CanaryMetrics,
baseline: CanaryMetrics,
thresholds: CanaryThresholds,
) -> tuple[bool, str]:
"""Return (should_promote, reason)."""
if metrics.request_count < thresholds.min_requests:
return True, "Insufficient sample size — continuing canary"
if metrics.error_rate > thresholds.max_error_rate:
return False, (
f"Error rate {metrics.error_rate:.2%} exceeds "
f"threshold {thresholds.max_error_rate:.2%}"
)
if metrics.p99_latency_ms > thresholds.max_p99_latency_ms:
return False, (
f"p99 latency {metrics.p99_latency_ms:.0f}ms exceeds "
f"threshold {thresholds.max_p99_latency_ms:.0f}ms"
)
# Compare against baseline — reject if 2x regression
if baseline.p99_latency_ms > 0:
ratio = metrics.p99_latency_ms / baseline.p99_latency_ms
if ratio > 2.0:
return False, f"p99 latency {ratio:.1f}x baseline regression"
return True, "All metrics within thresholds"

Feature Flags: Decoupling Deploy from Release

The safest deploy is one that changes nothing visible. Feature flags let you deploy code to production without activating it — then turn it on for specific tenants, percentages, or environments.

src/core/feature_flags.py
from __future__ import annotations
from uuid import UUID
class FeatureFlags:
"""Simple feature flag evaluation — backed by tenant config from Part 10."""
def __init__(self, flags: dict[str, bool | list[str]]) -> None:
self._flags = flags
def is_enabled(self, flag: str, *, tenant_id: UUID | None = None) -> bool:
"""Check if a feature flag is enabled."""
value = self._flags.get(flag)
if value is None:
return False
if isinstance(value, bool):
return value
# List of tenant IDs — progressive rollout
if isinstance(value, list) and tenant_id is not None:
return str(tenant_id) in value
return False

Usage in the order service:

async def create_order(self, payload: CreateOrderRequest) -> Order:
order = await self._repo.create(payload)
if self._feature_flags.is_enabled("new_invoice_engine", tenant_id=order.tenant_id):
await self._new_invoice_service.generate(order)
else:
await self._legacy_invoice_service.generate(order)
return order

Deploy the new invoice engine on Monday. Enable it for your internal test tenant. Monitor for a week. Enable for 10% of tenants. Monitor. Roll to 100%. Remove the flag. At no point did a deploy carry risk — the flag controlled exposure, not the deployment pipeline.

Health Check Testing

Health checks are production infrastructure. They need tests like any other feature.

tests/api/test_health.py
import pytest
from unittest.mock import AsyncMock, patch
from httpx import AsyncClient
class TestHealthEndpoints:
@pytest.mark.asyncio
async def test_liveness_always_succeeds(self, client: AsyncClient) -> None:
resp = await client.get("/health/live")
assert resp.status_code == 200
assert resp.json()["status"] == "alive"
@pytest.mark.asyncio
async def test_readiness_fails_when_db_down(self, client: AsyncClient) -> None:
with patch("src.api.health.get_session") as mock_session:
mock_session.return_value.__aenter__ = AsyncMock(
side_effect=ConnectionRefusedError
)
resp = await client.get("/health/ready")
assert resp.status_code == 503
assert resp.json()["checks"]["database"] is False
@pytest.mark.asyncio
async def test_readiness_returns_503_during_shutdown(
self, client: AsyncClient
) -> None:
from src.core.app_state import app_state
app_state.shutting_down = True
try:
resp = await client.get("/health/ready")
assert resp.status_code == 503
finally:
app_state.shutting_down = False

The Deployment Checklist

Every ShelfWise release follows this checklist. Items 1 through 4 are automated in CI — humans only intervene if something fails.

  1. Feature flag isolation — New behavior behind flags, defaulting to off.
  2. Health probes verified — Startup, readiness, and liveness endpoints tested in staging.
  3. Graceful shutdown tested — Send SIGTERM to staging instance during load test, verify zero dropped requests.
  4. Canary pipeline configured — Error rate and latency thresholds set, auto-rollback enabled.
  5. Rollback plan documented — For database migrations, include the reverse migration script.
  6. Monitoring dashboards open — Error rate, p99 latency, active connections during rollout.

The goal is not zero-risk deploys — that would mean never deploying. The goal is making deploys boring. When every deploy is invisible to users, the team deploys more often, ships smaller changes, and catches problems earlier. That feedback loop is worth more than any single feature.

0

Next in this series

Clean Code Python: Event-Driven Architecture — Domain Events and CQRS Lite

Continue reading