Without caching, every request hits the database. At ShelfWise’s scale of 1M transactions per day, that means the catalog endpoint alone generates 500 queries per second for data that changes once a day. The database becomes the bottleneck within weeks, and the fix is not a bigger database — the fix is not asking the database for data it already gave you.
But caching with multi-tenancy is treacherous. A cache key of catalog:fiction without a tenant prefix means Powell’s Books sees Strand’s catalog. That is not a performance bug. That is a data breach. Every cache key in a multi-tenant system must be scoped to a tenant, and the architecture must make it impossible to forget.
Multi-Layer Cache Architecture
A production caching system has three layers. Each layer trades capacity for speed:
In-memory is fastest but limited to a single process and lost on restart. Redis is shared across all processes but requires a network round-trip. The database is the source of truth but the slowest layer. A cache miss cascades downward; a cache hit returns at the highest available layer.
Tenant-Scoped Cache Keys
The universal pattern for multi-tenant cache keys: {tenant_id}:{entity}:{identifier}. No exceptions.
from typing import Final
CACHE_VERSION: Final[str] = "v1"
def cache_key(tenant_id: str, entity: str, identifier: str | int) -> str: """Build a tenant-scoped, versioned cache key.
Format: v1:{tenant_id}:{entity}:{identifier} Example: v1:powells:catalog:fiction """ return f"{CACHE_VERSION}:{tenant_id}:{entity}:{identifier}"
def tenant_pattern(tenant_id: str) -> str: """Pattern for all keys belonging to a tenant. Used for bulk invalidation.""" return f"{CACHE_VERSION}:{tenant_id}:*"The version prefix allows cache-wide invalidation during schema changes. Bump CACHE_VERSION to v2 and all existing keys become orphans that expire naturally — no explicit flush needed during deployments.
In-Memory Cache: TTLCache for Hot Data
Tenant configuration and feature flags (from Part 10) are read on every request and change rarely. These are ideal candidates for in-memory caching with cachetools.TTLCache:
from cachetools import TTLCachefrom typing import Any
from src.cache.keys import cache_key
class InMemoryCache: """Per-process in-memory cache with TTL expiration.
Best for: tenant config, feature flags, static lookups. Not for: user-specific data, frequently changing data. """
def __init__(self, maxsize: int = 2048, ttl: float = 60.0) -> None: self._store: TTLCache[str, Any] = TTLCache(maxsize=maxsize, ttl=ttl)
def get(self, tenant_id: str, entity: str, identifier: str | int) -> Any | None: key = cache_key(tenant_id, entity, identifier) return self._store.get(key)
def set( self, tenant_id: str, entity: str, identifier: str | int, value: Any ) -> None: key = cache_key(tenant_id, entity, identifier) self._store[key] = value
def delete(self, tenant_id: str, entity: str, identifier: str | int) -> None: key = cache_key(tenant_id, entity, identifier) self._store.pop(key, None)
def flush_tenant(self, tenant_id: str) -> int: """Remove all entries for a tenant. Returns count of removed entries.""" prefix = f"v1:{tenant_id}:" keys_to_remove = [k for k in self._store if k.startswith(prefix)] for key in keys_to_remove: del self._store[key] return len(keys_to_remove)The TTLCache evicts entries after ttl seconds and caps total entries at maxsize. For tenant config that changes via admin API, the 60-second TTL means at most 60 seconds of stale config after a change — acceptable for most use cases.
Redis Cache: Shared State Across Processes
In-memory cache is per-process. Redis gives you a shared cache that survives process restarts and is accessible to all workers. Use orjson for serialization — it is 3-10x faster than the standard json module and handles datetime, UUID, and Decimal natively.
import orjsonfrom redis.asyncio import Redisfrom typing import Any
from src.cache.keys import cache_key, tenant_pattern
class RedisCache: """Shared Redis cache with tenant-scoped keys and TTL management."""
def __init__(self, redis: Redis, default_ttl: int = 300) -> None: self._redis = redis self._default_ttl = default_ttl
async def get( self, tenant_id: str, entity: str, identifier: str | int ) -> Any | None: key = cache_key(tenant_id, entity, identifier) raw = await self._redis.get(key) if raw is None: return None return orjson.loads(raw)
async def set( self, tenant_id: str, entity: str, identifier: str | int, value: Any, ttl: int | None = None, ) -> None: key = cache_key(tenant_id, entity, identifier) raw = orjson.dumps(value) await self._redis.set(key, raw, ex=ttl or self._default_ttl)
async def delete( self, tenant_id: str, entity: str, identifier: str | int ) -> None: key = cache_key(tenant_id, entity, identifier) await self._redis.delete(key)
async def flush_tenant(self, tenant_id: str) -> int: """Delete all keys for a tenant. Used for GDPR deletion and tenant offboarding.""" pattern = tenant_pattern(tenant_id) cursor, keys = b"0", [] while True: cursor, batch = await self._redis.scan( cursor=cursor, match=pattern, count=500 ) keys.extend(batch) if cursor == b"0": break
if keys: await self._redis.delete(*keys) return len(keys)Cache-Aside Pattern: The Cached Repository
The cleanest way to add caching is the cached repository decorator. It wraps the base repository from Part 3 and adds caching transparently. The service layer does not know or care whether the repository is cached:
import structlogfrom typing import override
from src.cache.memory import InMemoryCachefrom src.cache.redis import RedisCachefrom src.core.protocols import CatalogRepositoryProtocolfrom src.schemas.catalog import CatalogItem
logger = structlog.get_logger()
class CachedCatalogRepository: """Cache-aside wrapper around the real catalog repository.
Lookup order: in-memory -> Redis -> database. On DB hit: populate both Redis and in-memory. On write: invalidate both caches. """
def __init__( self, *, repo: CatalogRepositoryProtocol, memory: InMemoryCache, redis: RedisCache, ) -> None: self._repo = repo self._memory = memory self._redis = redis
async def get_by_category( self, tenant_id: str, category: str ) -> list[CatalogItem]: # Layer 1: In-memory cached = self._memory.get(tenant_id, "catalog", category) if cached is not None: logger.debug("cache_hit", layer="memory", entity="catalog") return [CatalogItem.model_validate(item) for item in cached]
# Layer 2: Redis cached = await self._redis.get(tenant_id, "catalog", category) if cached is not None: logger.debug("cache_hit", layer="redis", entity="catalog") self._memory.set( tenant_id, "catalog", category, cached ) return [CatalogItem.model_validate(item) for item in cached]
# Layer 3: Database logger.debug("cache_miss", entity="catalog") items = await self._repo.get_by_category(tenant_id, category)
# Populate both cache layers serializable = [item.model_dump(mode="json") for item in items] self._memory.set(tenant_id, "catalog", category, serializable) await self._redis.set( tenant_id, "catalog", category, serializable, ttl=600 )
return items
async def update(self, tenant_id: str, item: CatalogItem) -> CatalogItem: result = await self._repo.update(tenant_id, item)
# Invalidate cache for the affected category self._memory.delete(tenant_id, "catalog", item.category) await self._redis.delete(tenant_id, "catalog", item.category)
logger.info("cache_invalidated", entity="catalog", category=item.category) return resultThe service layer receives a CatalogRepositoryProtocol. In production, dependency injection (Part 5) provides CachedCatalogRepository. In tests, it provides FakeRepository. The caching layer is an infrastructure concern — it never leaks into business logic.
The ShelfWise Impact
Before caching: the catalog endpoint handles 500 requests per second, each executing a database query. That is 500 queries per second for data that changes when a publisher updates their catalog — roughly once per day.
After caching with a 10-minute Redis TTL and 60-second in-memory TTL: the first request after TTL expiry hits the database. The next 499 requests serve from cache. Database queries drop from 500/s to roughly 8/s (one per TTL expiry window across categories). Latency drops from 200ms (database) to 2ms (in-memory) or 5ms (Redis).
Cache Stampede Prevention
When a popular cache key expires, hundreds of concurrent requests see a cache miss simultaneously and all hit the database. This is a cache stampede, and it can take down the database during traffic spikes.
Probabilistic Early Expiration
Recompute the cache slightly before it expires. Each request that reads a nearly-expired key has a probability of triggering a background refresh:
import randomimport timefrom typing import Any
from src.cache.redis import RedisCache
async def get_with_early_expiry( cache: RedisCache, tenant_id: str, entity: str, identifier: str | int, ttl: int, beta: float = 1.0,) -> tuple[Any | None, bool]: """Return cached value and whether an early refresh should be triggered.
Uses probabilistic early expiration (XFetch algorithm) to prevent stampedes. As the key approaches expiry, the probability of triggering a refresh increases. """ pipe = cache._redis.pipeline() key = f"v1:{tenant_id}:{entity}:{identifier}" pipe.get(key) pipe.ttl(key) raw, remaining_ttl = await pipe.execute()
if raw is None: return None, True # Cache miss — must refresh
# Probability of early refresh increases as TTL approaches 0 # At TTL=300 with remaining=30, this triggers ~10% of requests gap = ttl - remaining_ttl if gap > 0: expiry_probability = beta * random.random() threshold = gap / ttl if expiry_probability < threshold: return orjson.loads(raw), True # Return stale, signal refresh
return orjson.loads(raw), False # Fresh enough, no refresh neededDistributed Lock for Single Recomputation
When a refresh is needed, only one process should recompute. Use a Redis lock to serialize cache rebuilds:
# src/cache/stampede.py (continued)import orjsonfrom redis.asyncio import Redis
async def refresh_with_lock( redis: Redis, tenant_id: str, entity: str, identifier: str | int, ttl: int, compute_fn: Any,) -> Any: """Recompute a cache value with distributed locking.
Only one process acquires the lock and recomputes. Others wait briefly and retry from cache. Prevents N processes hitting the database simultaneously. """ lock_key = f"lock:v1:{tenant_id}:{entity}:{identifier}"
lock = redis.lock(lock_key, timeout=10, blocking_timeout=1) acquired = await lock.acquire(blocking=False)
if not acquired: # Another process is recomputing — wait and read from cache await asyncio.sleep(0.1) return await RedisCache(redis).get(tenant_id, entity, identifier)
try: value = await compute_fn() cache = RedisCache(redis) await cache.set(tenant_id, entity, identifier, value, ttl=ttl) return value finally: await lock.release()Cache Invalidation Strategies
| Strategy | How It Works | Best For | Drawback |
|---|---|---|---|
| TTL-based | Key expires after fixed duration | Data that tolerates staleness (catalog, config) | Stale for up to TTL duration after source changes |
| Event-based | Invalidate on write/update event | Data that must be fresh immediately (inventory, pricing) | Requires event propagation infrastructure |
| Version-based | Include version in key; bump version on change | Schema migrations, deployment rollouts | Old versions linger until TTL expires |
Use TTL-based invalidation as the default. Add event-based invalidation only for data where staleness causes business impact — inventory counts, pricing, and account status.
Event-Based Invalidation
When a publisher updates Powell’s catalog, invalidate only Powell’s catalog cache. Other tenants are unaffected:
import structlogfrom src.cache.memory import InMemoryCachefrom src.cache.redis import RedisCache
logger = structlog.get_logger()
class CatalogCacheInvalidator: """Listens for catalog update events and invalidates affected cache entries."""
def __init__(self, *, memory: InMemoryCache, redis: RedisCache) -> None: self._memory = memory self._redis = redis
async def on_catalog_updated( self, tenant_id: str, category: str ) -> None: """Invalidate cache for a specific tenant's category.""" self._memory.delete(tenant_id, "catalog", category) await self._redis.delete(tenant_id, "catalog", category) logger.info( "catalog_cache_invalidated", tenant_id=tenant_id, category=category, )
async def on_tenant_deleted(self, tenant_id: str) -> None: """GDPR: flush all cached data for a deleted tenant.""" memory_count = self._memory.flush_tenant(tenant_id) redis_count = await self._redis.flush_tenant(tenant_id) logger.info( "tenant_cache_flushed", tenant_id=tenant_id, memory_keys=memory_count, redis_keys=redis_count, )Graceful Degradation When Redis Is Unavailable
Redis is a cache, not the source of truth. When Redis is down, the application must continue serving requests from the database — slower, but functional. Never let a cache failure become an application failure.
import structlogfrom redis.asyncio import Redisfrom redis.exceptions import RedisErrorfrom typing import Any
from src.cache.keys import cache_key
logger = structlog.get_logger()
class ResilientRedisCache: """Redis cache that degrades gracefully on connection failures.
Every Redis operation is wrapped in a try/except. On failure: - GET returns None (cache miss -> falls through to database) - SET is silently skipped (data is still in the database) - DELETE is silently skipped (key will expire via TTL) """
def __init__(self, redis: Redis, default_ttl: int = 300) -> None: self._redis = redis self._default_ttl = default_ttl
async def get( self, tenant_id: str, entity: str, identifier: str | int ) -> Any | None: try: key = cache_key(tenant_id, entity, identifier) raw = await self._redis.get(key) if raw is None: return None return orjson.loads(raw) except RedisError: logger.warning("redis_unavailable", operation="get") return None # Degrade to database
async def set( self, tenant_id: str, entity: str, identifier: str | int, value: Any, ttl: int | None = None, ) -> None: try: key = cache_key(tenant_id, entity, identifier) raw = orjson.dumps(value) await self._redis.set(key, raw, ex=ttl or self._default_ttl) except RedisError: logger.warning("redis_unavailable", operation="set") # Silently skip — data is in the database
async def delete( self, tenant_id: str, entity: str, identifier: str | int ) -> None: try: key = cache_key(tenant_id, entity, identifier) await self._redis.delete(key) except RedisError: logger.warning("redis_unavailable", operation="delete") # Key will expire via TTLWrite-Through vs Write-Behind
Not all data uses cache-aside. Some data benefits from writing to the cache at the same time as the database (write-through) or writing to the cache first and syncing to the database later (write-behind):
| Pattern | Write Flow | Best For | Risk |
|---|---|---|---|
| Cache-aside | Write to DB, invalidate cache | Read-heavy data (catalog, config) | Stale reads between write and next cache miss |
| Write-through | Write to DB and cache simultaneously | Data read immediately after write (user profile) | Write latency increases by cache write time |
| Write-behind | Write to cache, async sync to DB | High-write-frequency data (analytics, counters) | Data loss if cache crashes before sync |
For ShelfWise, catalog data uses cache-aside (read-heavy, changes rarely). User session data uses write-through (read immediately after login). Analytics counters use write-behind (high frequency, eventual consistency is acceptable).
Key Takeaways
- Every cache key must include the tenant ID. The
cache_key()function enforces the{version}:{tenant_id}:{entity}:{id}pattern. Missing the tenant prefix is a data breach, not a bug. - Two cache layers. In-memory
TTLCachefor hot data at ~1ms. Redis for shared data at ~5ms. Database as the source of truth at ~200ms. Each layer fills the one above on miss. - Cached repository decorator. Wraps the base repository from Part 3 without modifying business logic. Caching is an infrastructure concern injected via DI from Part 5.
- Prevent cache stampedes. Probabilistic early expiration and distributed locks prevent hundreds of concurrent cache misses from overwhelming the database.
- Degrade gracefully. Redis is a cache, not a dependency. When Redis is down, every operation returns a cache miss and the database handles the load. Log at WARNING, not ERROR.
- Flush tenant data on deletion. GDPR requires removing all tenant data, including cached copies.
flush_tenant()with cursor-basedSCANremoves tenant keys without blocking Redis.
Next: Part 14 covers background tasks — async queues, worker context propagation, and the critical problem of tenant context disappearing when work moves off the request cycle.