Tutorial

Clean Code Python: Caching Architecture — Redis, Invalidation, and Tenant Isolation

Without caching at 1M transactions per day, the database becomes the bottleneck within weeks. But caching in a multi-tenant system is treacherous — one missing tenant prefix in a cache key leaks data between tenants.

Tin Dang avatar
Tin Dang
Layered geological strata showing distinct separation between levels representing multi-tier data retrieval

Without caching, every request hits the database. At ShelfWise’s scale of 1M transactions per day, that means the catalog endpoint alone generates 500 queries per second for data that changes once a day. The database becomes the bottleneck within weeks, and the fix is not a bigger database — the fix is not asking the database for data it already gave you.

But caching with multi-tenancy is treacherous. A cache key of catalog:fiction without a tenant prefix means Powell’s Books sees Strand’s catalog. That is not a performance bug. That is a data breach. Every cache key in a multi-tenant system must be scoped to a tenant, and the architecture must make it impossible to forget.

Multi-Layer Cache Architecture

A production caching system has three layers. Each layer trades capacity for speed:

In-memory is fastest but limited to a single process and lost on restart. Redis is shared across all processes but requires a network round-trip. The database is the source of truth but the slowest layer. A cache miss cascades downward; a cache hit returns at the highest available layer.

Tenant-Scoped Cache Keys

The universal pattern for multi-tenant cache keys: {tenant_id}:{entity}:{identifier}. No exceptions.

src/cache/keys.py
from typing import Final
CACHE_VERSION: Final[str] = "v1"
def cache_key(tenant_id: str, entity: str, identifier: str | int) -> str:
"""Build a tenant-scoped, versioned cache key.
Format: v1:{tenant_id}:{entity}:{identifier}
Example: v1:powells:catalog:fiction
"""
return f"{CACHE_VERSION}:{tenant_id}:{entity}:{identifier}"
def tenant_pattern(tenant_id: str) -> str:
"""Pattern for all keys belonging to a tenant. Used for bulk invalidation."""
return f"{CACHE_VERSION}:{tenant_id}:*"

The version prefix allows cache-wide invalidation during schema changes. Bump CACHE_VERSION to v2 and all existing keys become orphans that expire naturally — no explicit flush needed during deployments.

In-Memory Cache: TTLCache for Hot Data

Tenant configuration and feature flags (from Part 10) are read on every request and change rarely. These are ideal candidates for in-memory caching with cachetools.TTLCache:

src/cache/memory.py
from cachetools import TTLCache
from typing import Any
from src.cache.keys import cache_key
class InMemoryCache:
"""Per-process in-memory cache with TTL expiration.
Best for: tenant config, feature flags, static lookups.
Not for: user-specific data, frequently changing data.
"""
def __init__(self, maxsize: int = 2048, ttl: float = 60.0) -> None:
self._store: TTLCache[str, Any] = TTLCache(maxsize=maxsize, ttl=ttl)
def get(self, tenant_id: str, entity: str, identifier: str | int) -> Any | None:
key = cache_key(tenant_id, entity, identifier)
return self._store.get(key)
def set(
self, tenant_id: str, entity: str, identifier: str | int, value: Any
) -> None:
key = cache_key(tenant_id, entity, identifier)
self._store[key] = value
def delete(self, tenant_id: str, entity: str, identifier: str | int) -> None:
key = cache_key(tenant_id, entity, identifier)
self._store.pop(key, None)
def flush_tenant(self, tenant_id: str) -> int:
"""Remove all entries for a tenant. Returns count of removed entries."""
prefix = f"v1:{tenant_id}:"
keys_to_remove = [k for k in self._store if k.startswith(prefix)]
for key in keys_to_remove:
del self._store[key]
return len(keys_to_remove)

The TTLCache evicts entries after ttl seconds and caps total entries at maxsize. For tenant config that changes via admin API, the 60-second TTL means at most 60 seconds of stale config after a change — acceptable for most use cases.

Redis Cache: Shared State Across Processes

In-memory cache is per-process. Redis gives you a shared cache that survives process restarts and is accessible to all workers. Use orjson for serialization — it is 3-10x faster than the standard json module and handles datetime, UUID, and Decimal natively.

src/cache/redis.py
import orjson
from redis.asyncio import Redis
from typing import Any
from src.cache.keys import cache_key, tenant_pattern
class RedisCache:
"""Shared Redis cache with tenant-scoped keys and TTL management."""
def __init__(self, redis: Redis, default_ttl: int = 300) -> None:
self._redis = redis
self._default_ttl = default_ttl
async def get(
self, tenant_id: str, entity: str, identifier: str | int
) -> Any | None:
key = cache_key(tenant_id, entity, identifier)
raw = await self._redis.get(key)
if raw is None:
return None
return orjson.loads(raw)
async def set(
self,
tenant_id: str,
entity: str,
identifier: str | int,
value: Any,
ttl: int | None = None,
) -> None:
key = cache_key(tenant_id, entity, identifier)
raw = orjson.dumps(value)
await self._redis.set(key, raw, ex=ttl or self._default_ttl)
async def delete(
self, tenant_id: str, entity: str, identifier: str | int
) -> None:
key = cache_key(tenant_id, entity, identifier)
await self._redis.delete(key)
async def flush_tenant(self, tenant_id: str) -> int:
"""Delete all keys for a tenant. Used for GDPR deletion and tenant offboarding."""
pattern = tenant_pattern(tenant_id)
cursor, keys = b"0", []
while True:
cursor, batch = await self._redis.scan(
cursor=cursor, match=pattern, count=500
)
keys.extend(batch)
if cursor == b"0":
break
if keys:
await self._redis.delete(*keys)
return len(keys)

Cache-Aside Pattern: The Cached Repository

The cleanest way to add caching is the cached repository decorator. It wraps the base repository from Part 3 and adds caching transparently. The service layer does not know or care whether the repository is cached:

src/repositories/cached_catalog.py
import structlog
from typing import override
from src.cache.memory import InMemoryCache
from src.cache.redis import RedisCache
from src.core.protocols import CatalogRepositoryProtocol
from src.schemas.catalog import CatalogItem
logger = structlog.get_logger()
class CachedCatalogRepository:
"""Cache-aside wrapper around the real catalog repository.
Lookup order: in-memory -> Redis -> database.
On DB hit: populate both Redis and in-memory.
On write: invalidate both caches.
"""
def __init__(
self,
*,
repo: CatalogRepositoryProtocol,
memory: InMemoryCache,
redis: RedisCache,
) -> None:
self._repo = repo
self._memory = memory
self._redis = redis
async def get_by_category(
self, tenant_id: str, category: str
) -> list[CatalogItem]:
# Layer 1: In-memory
cached = self._memory.get(tenant_id, "catalog", category)
if cached is not None:
logger.debug("cache_hit", layer="memory", entity="catalog")
return [CatalogItem.model_validate(item) for item in cached]
# Layer 2: Redis
cached = await self._redis.get(tenant_id, "catalog", category)
if cached is not None:
logger.debug("cache_hit", layer="redis", entity="catalog")
self._memory.set(
tenant_id, "catalog", category, cached
)
return [CatalogItem.model_validate(item) for item in cached]
# Layer 3: Database
logger.debug("cache_miss", entity="catalog")
items = await self._repo.get_by_category(tenant_id, category)
# Populate both cache layers
serializable = [item.model_dump(mode="json") for item in items]
self._memory.set(tenant_id, "catalog", category, serializable)
await self._redis.set(
tenant_id, "catalog", category, serializable, ttl=600
)
return items
async def update(self, tenant_id: str, item: CatalogItem) -> CatalogItem:
result = await self._repo.update(tenant_id, item)
# Invalidate cache for the affected category
self._memory.delete(tenant_id, "catalog", item.category)
await self._redis.delete(tenant_id, "catalog", item.category)
logger.info("cache_invalidated", entity="catalog", category=item.category)
return result

The service layer receives a CatalogRepositoryProtocol. In production, dependency injection (Part 5) provides CachedCatalogRepository. In tests, it provides FakeRepository. The caching layer is an infrastructure concern — it never leaks into business logic.

The ShelfWise Impact

Before caching: the catalog endpoint handles 500 requests per second, each executing a database query. That is 500 queries per second for data that changes when a publisher updates their catalog — roughly once per day.

After caching with a 10-minute Redis TTL and 60-second in-memory TTL: the first request after TTL expiry hits the database. The next 499 requests serve from cache. Database queries drop from 500/s to roughly 8/s (one per TTL expiry window across categories). Latency drops from 200ms (database) to 2ms (in-memory) or 5ms (Redis).

Cache Stampede Prevention

When a popular cache key expires, hundreds of concurrent requests see a cache miss simultaneously and all hit the database. This is a cache stampede, and it can take down the database during traffic spikes.

Probabilistic Early Expiration

Recompute the cache slightly before it expires. Each request that reads a nearly-expired key has a probability of triggering a background refresh:

src/cache/stampede.py
import random
import time
from typing import Any
from src.cache.redis import RedisCache
async def get_with_early_expiry(
cache: RedisCache,
tenant_id: str,
entity: str,
identifier: str | int,
ttl: int,
beta: float = 1.0,
) -> tuple[Any | None, bool]:
"""Return cached value and whether an early refresh should be triggered.
Uses probabilistic early expiration (XFetch algorithm) to prevent stampedes.
As the key approaches expiry, the probability of triggering a refresh increases.
"""
pipe = cache._redis.pipeline()
key = f"v1:{tenant_id}:{entity}:{identifier}"
pipe.get(key)
pipe.ttl(key)
raw, remaining_ttl = await pipe.execute()
if raw is None:
return None, True # Cache miss — must refresh
# Probability of early refresh increases as TTL approaches 0
# At TTL=300 with remaining=30, this triggers ~10% of requests
gap = ttl - remaining_ttl
if gap > 0:
expiry_probability = beta * random.random()
threshold = gap / ttl
if expiry_probability < threshold:
return orjson.loads(raw), True # Return stale, signal refresh
return orjson.loads(raw), False # Fresh enough, no refresh needed

Distributed Lock for Single Recomputation

When a refresh is needed, only one process should recompute. Use a Redis lock to serialize cache rebuilds:

# src/cache/stampede.py (continued)
import orjson
from redis.asyncio import Redis
async def refresh_with_lock(
redis: Redis,
tenant_id: str,
entity: str,
identifier: str | int,
ttl: int,
compute_fn: Any,
) -> Any:
"""Recompute a cache value with distributed locking.
Only one process acquires the lock and recomputes. Others wait briefly
and retry from cache. Prevents N processes hitting the database simultaneously.
"""
lock_key = f"lock:v1:{tenant_id}:{entity}:{identifier}"
lock = redis.lock(lock_key, timeout=10, blocking_timeout=1)
acquired = await lock.acquire(blocking=False)
if not acquired:
# Another process is recomputing — wait and read from cache
await asyncio.sleep(0.1)
return await RedisCache(redis).get(tenant_id, entity, identifier)
try:
value = await compute_fn()
cache = RedisCache(redis)
await cache.set(tenant_id, entity, identifier, value, ttl=ttl)
return value
finally:
await lock.release()

Cache Invalidation Strategies

StrategyHow It WorksBest ForDrawback
TTL-based Key expires after fixed duration Data that tolerates staleness (catalog, config) Stale for up to TTL duration after source changes
Event-based Invalidate on write/update event Data that must be fresh immediately (inventory, pricing) Requires event propagation infrastructure
Version-based Include version in key; bump version on change Schema migrations, deployment rollouts Old versions linger until TTL expires

Use TTL-based invalidation as the default. Add event-based invalidation only for data where staleness causes business impact — inventory counts, pricing, and account status.

Event-Based Invalidation

When a publisher updates Powell’s catalog, invalidate only Powell’s catalog cache. Other tenants are unaffected:

src/events/catalog_events.py
import structlog
from src.cache.memory import InMemoryCache
from src.cache.redis import RedisCache
logger = structlog.get_logger()
class CatalogCacheInvalidator:
"""Listens for catalog update events and invalidates affected cache entries."""
def __init__(self, *, memory: InMemoryCache, redis: RedisCache) -> None:
self._memory = memory
self._redis = redis
async def on_catalog_updated(
self, tenant_id: str, category: str
) -> None:
"""Invalidate cache for a specific tenant's category."""
self._memory.delete(tenant_id, "catalog", category)
await self._redis.delete(tenant_id, "catalog", category)
logger.info(
"catalog_cache_invalidated",
tenant_id=tenant_id,
category=category,
)
async def on_tenant_deleted(self, tenant_id: str) -> None:
"""GDPR: flush all cached data for a deleted tenant."""
memory_count = self._memory.flush_tenant(tenant_id)
redis_count = await self._redis.flush_tenant(tenant_id)
logger.info(
"tenant_cache_flushed",
tenant_id=tenant_id,
memory_keys=memory_count,
redis_keys=redis_count,
)

Graceful Degradation When Redis Is Unavailable

Redis is a cache, not the source of truth. When Redis is down, the application must continue serving requests from the database — slower, but functional. Never let a cache failure become an application failure.

src/cache/resilient_redis.py
import structlog
from redis.asyncio import Redis
from redis.exceptions import RedisError
from typing import Any
from src.cache.keys import cache_key
logger = structlog.get_logger()
class ResilientRedisCache:
"""Redis cache that degrades gracefully on connection failures.
Every Redis operation is wrapped in a try/except. On failure:
- GET returns None (cache miss -> falls through to database)
- SET is silently skipped (data is still in the database)
- DELETE is silently skipped (key will expire via TTL)
"""
def __init__(self, redis: Redis, default_ttl: int = 300) -> None:
self._redis = redis
self._default_ttl = default_ttl
async def get(
self, tenant_id: str, entity: str, identifier: str | int
) -> Any | None:
try:
key = cache_key(tenant_id, entity, identifier)
raw = await self._redis.get(key)
if raw is None:
return None
return orjson.loads(raw)
except RedisError:
logger.warning("redis_unavailable", operation="get")
return None # Degrade to database
async def set(
self,
tenant_id: str,
entity: str,
identifier: str | int,
value: Any,
ttl: int | None = None,
) -> None:
try:
key = cache_key(tenant_id, entity, identifier)
raw = orjson.dumps(value)
await self._redis.set(key, raw, ex=ttl or self._default_ttl)
except RedisError:
logger.warning("redis_unavailable", operation="set")
# Silently skip — data is in the database
async def delete(
self, tenant_id: str, entity: str, identifier: str | int
) -> None:
try:
key = cache_key(tenant_id, entity, identifier)
await self._redis.delete(key)
except RedisError:
logger.warning("redis_unavailable", operation="delete")
# Key will expire via TTL

Write-Through vs Write-Behind

Not all data uses cache-aside. Some data benefits from writing to the cache at the same time as the database (write-through) or writing to the cache first and syncing to the database later (write-behind):

PatternWrite FlowBest ForRisk
Cache-aside Write to DB, invalidate cache Read-heavy data (catalog, config) Stale reads between write and next cache miss
Write-through Write to DB and cache simultaneously Data read immediately after write (user profile) Write latency increases by cache write time
Write-behind Write to cache, async sync to DB High-write-frequency data (analytics, counters) Data loss if cache crashes before sync

For ShelfWise, catalog data uses cache-aside (read-heavy, changes rarely). User session data uses write-through (read immediately after login). Analytics counters use write-behind (high frequency, eventual consistency is acceptable).

Key Takeaways

  • Every cache key must include the tenant ID. The cache_key() function enforces the {version}:{tenant_id}:{entity}:{id} pattern. Missing the tenant prefix is a data breach, not a bug.
  • Two cache layers. In-memory TTLCache for hot data at ~1ms. Redis for shared data at ~5ms. Database as the source of truth at ~200ms. Each layer fills the one above on miss.
  • Cached repository decorator. Wraps the base repository from Part 3 without modifying business logic. Caching is an infrastructure concern injected via DI from Part 5.
  • Prevent cache stampedes. Probabilistic early expiration and distributed locks prevent hundreds of concurrent cache misses from overwhelming the database.
  • Degrade gracefully. Redis is a cache, not a dependency. When Redis is down, every operation returns a cache miss and the database handles the load. Log at WARNING, not ERROR.
  • Flush tenant data on deletion. GDPR requires removing all tenant data, including cached copies. flush_tenant() with cursor-based SCAN removes tenant keys without blocking Redis.

Next: Part 14 covers background tasks — async queues, worker context propagation, and the critical problem of tenant context disappearing when work moves off the request cycle.

0

Next in this series

Clean Code Python: Background Tasks — Queues, Workers, and Context Propagation

Continue reading