Your multi-tenant platform works. Tenants are isolated, background tasks process orders, caches are warm, rate limits are enforced. Then an enterprise prospect — Penguin Random House — sends a security questionnaire: “Describe your role-based access controls. How are API keys scoped and rotated? Provide audit logs for all state changes in the last 90 days. Include evidence of cross-tenant isolation verification.”
You have none of this. The deal is worth $400K ARR. You have six weeks.
This post builds three security pillars into ShelfWise: granular RBAC with tenant-scoped permissions, cryptographically secure API key management, and an append-only audit trail that satisfies SOC 2 auditors.
The RBAC Model
Most applications start with a boolean is_admin column and hope for the best. That breaks the moment a tenant needs a “warehouse staff” role that can update inventory but not view financial reports. ShelfWise needs a permission model that tenants can customize without code changes.
The schema uses a standard role-permission join table. Permissions are strings in resource.action format — no enums, because tenants will create custom resources.
from __future__ import annotationsfrom uuid import UUID, uuid4from sqlalchemy import ForeignKey, String, UniqueConstraintfrom sqlalchemy.orm import Mapped, mapped_column, relationshipfrom src.db.base import Base
class Permission(Base): __tablename__ = "permissions"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) resource: Mapped[str] = mapped_column(String(100)) # "orders", "inventory" action: Mapped[str] = mapped_column(String(50)) # "create", "read", "update", "delete"
__table_args__ = ( UniqueConstraint("resource", "action", name="uq_permission_resource_action"), )
@property def key(self) -> str: return f"{self.resource}.{self.action}"
class Role(Base): __tablename__ = "roles"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) tenant_id: Mapped[UUID] = mapped_column(ForeignKey("tenants.id"), index=True) name: Mapped[str] = mapped_column(String(100)) description: Mapped[str] = mapped_column(String(500), default="") is_system: Mapped[bool] = mapped_column(default=False) # Seed roles, not deletable
permissions: Mapped[list[Permission]] = relationship( secondary="role_permissions", lazy="selectin" )
__table_args__ = ( UniqueConstraint("tenant_id", "name", name="uq_role_tenant_name"), )
class RolePermission(Base): __tablename__ = "role_permissions"
role_id: Mapped[UUID] = mapped_column(ForeignKey("roles.id"), primary_key=True) permission_id: Mapped[UUID] = mapped_column( ForeignKey("permissions.id"), primary_key=True )Notice tenant_id on Role. Each tenant gets their own role set seeded from system defaults. Penguin Random House can create a “warehouse_lead” role without affecting any other tenant.
Permission Checking as a FastAPI Dependency
The permission check is a dependency that runs before the endpoint handler. It reads the current user’s roles, collects permissions, and checks for the required one. No decorator magic, no global state — a pure dependency chain.
from __future__ import annotationsfrom typing import Annotatedfrom uuid import UUIDfrom fastapi import Depends, HTTPException, Request, statusfrom sqlalchemy.ext.asyncio import AsyncSessionfrom src.core.tenant import Tenantfrom src.db.session import get_session
async def get_current_user(request: Request) -> AuthenticatedUser: """Extract and validate JWT from Authorization header.""" token = request.headers.get("Authorization", "").removeprefix("Bearer ") if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) return await decode_and_verify(token)
async def get_user_permissions( user: Annotated[AuthenticatedUser, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)],) -> frozenset[str]: """Load all permission keys for the current user.""" stmt = ( select(Permission.resource, Permission.action) .join(RolePermission, RolePermission.permission_id == Permission.id) .join(Role, Role.id == RolePermission.role_id) .join(UserRole, UserRole.role_id == Role.id) .where(UserRole.user_id == user.id, Role.tenant_id == user.tenant_id) ) rows = (await session.execute(stmt)).all() return frozenset(f"{r.resource}.{r.action}" for r in rows)
def require_permission(permission: str): """Factory that returns a dependency checking for a specific permission.""" async def _check( user_perms: Annotated[frozenset[str], Depends(get_user_permissions)], ) -> None: if permission not in user_perms: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing permission: {permission}", ) return _checkUsage in a route is one line:
@router.post("/orders", dependencies=[Depends(require_permission("orders.create"))])async def create_order( payload: CreateOrderRequest, service: Annotated[OrderService, Depends(get_order_service)],) -> OrderResponse: return await service.create(payload)API Key Management
JWT auth works for users in browsers. Machine-to-machine integrations — Penguin Random House’s ERP syncing inventory every hour — need API keys. The requirements are strict: keys must be cryptographically random, stored as hashes (never plaintext), scoped to specific permissions, and revocable without rotating the user’s password.
| Storage Approach | How It Works | Risk | Verdict |
|---|---|---|---|
| Plaintext in DB | Store raw key, compare directly | DB breach exposes all keys | Never acceptable |
| Symmetric encryption | Encrypt key with app secret | App secret compromise exposes all keys | Marginal improvement |
| SHA-256 hash | Store hash, compare hash of incoming key | Cannot recover key — show once at creation | Production standard |
| Bcrypt/Argon2 | Slow hash, timing-safe compare | Higher CPU cost per request | Overkill for high-entropy keys |
ShelfWise uses SHA-256 for API keys. The keys are 256-bit random, so brute-forcing a hash is computationally infeasible — we do not need the slow-hash defense that passwords require.
from __future__ import annotationsimport hashlibimport secretsfrom dataclasses import dataclassfrom datetime import UTC, datetimefrom uuid import UUID, uuid4
@dataclass(frozen=True, slots=True)class APIKeyCreate: """Result of creating a new API key. The raw key is shown once.""" id: UUID raw_key: str # Shown to user exactly once key_prefix: str # "sk_live_abc12345" — for identification in logs key_hash: str # Stored in database
def generate_api_key(tenant_id: UUID) -> APIKeyCreate: """Generate a cryptographically secure API key.""" raw = secrets.token_urlsafe(32) # 256 bits of entropy prefix = f"sk_live_{raw[:8]}" key_hash = hashlib.sha256(raw.encode()).hexdigest()
return APIKeyCreate( id=uuid4(), raw_key=raw, key_prefix=prefix, key_hash=key_hash, )
def verify_api_key(raw_key: str, stored_hash: str) -> bool: """Constant-time comparison to prevent timing attacks.""" computed = hashlib.sha256(raw_key.encode()).hexdigest() return secrets.compare_digest(computed, stored_hash)The API key model stores the hash, a human-readable prefix for log correlation, scoped permissions, and an expiration date:
class APIKey(Base): __tablename__ = "api_keys"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) tenant_id: Mapped[UUID] = mapped_column(ForeignKey("tenants.id"), index=True) created_by: Mapped[UUID] = mapped_column(ForeignKey("users.id")) name: Mapped[str] = mapped_column(String(100)) # "ERP Sync Key" key_prefix: Mapped[str] = mapped_column(String(20)) # "sk_live_abc12345" key_hash: Mapped[str] = mapped_column(String(64)) # SHA-256 hex scopes: Mapped[list[str]] = mapped_column(ARRAY(String)) # ["inventory.read", "inventory.update"] expires_at: Mapped[datetime | None] = mapped_column(default=None) revoked_at: Mapped[datetime | None] = mapped_column(default=None) last_used_at: Mapped[datetime | None] = mapped_column(default=None)API key auth is a separate dependency from JWT auth. The router chooses which auth scheme to use, or accepts either:
async def authenticate_api_key( request: Request, session: Annotated[AsyncSession, Depends(get_session)],) -> APIKeyContext: """Authenticate via X-API-Key header.""" raw_key = request.headers.get("X-API-Key") if not raw_key: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# Look up all non-revoked, non-expired keys and compare hashes stmt = select(APIKey).where( APIKey.revoked_at.is_(None), or_(APIKey.expires_at.is_(None), APIKey.expires_at > datetime.now(UTC)), ) keys = (await session.scalars(stmt)).all()
for key in keys: if verify_api_key(raw_key, key.key_hash): # Update last_used_at asynchronously key.last_used_at = datetime.now(UTC) await session.commit() return APIKeyContext( tenant_id=key.tenant_id, scopes=frozenset(key.scopes), key_id=key.id, )
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)Audit Logging: Every State Change, Forever
SOC 2 requires that every state change is recorded with who made it, what changed, when, and from where. The audit log is append-only — no UPDATE, no DELETE, ever.
class AuditLog(Base): """Append-only audit trail. No UPDATE or DELETE operations permitted.""" __tablename__ = "audit_logs"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) tenant_id: Mapped[UUID] = mapped_column(index=True) actor_id: Mapped[UUID | None] = mapped_column(default=None) # None for system actions actor_type: Mapped[str] = mapped_column(String(20)) # "user", "api_key", "system" action: Mapped[str] = mapped_column(String(100)) # "order.created", "inventory.updated" resource_type: Mapped[str] = mapped_column(String(50)) resource_id: Mapped[str] = mapped_column(String(100)) before_state: Mapped[dict | None] = mapped_column(JSON, default=None) after_state: Mapped[dict | None] = mapped_column(JSON, default=None) ip_address: Mapped[str | None] = mapped_column(String(45), default=None) user_agent: Mapped[str | None] = mapped_column(String(500), default=None) timestamp: Mapped[datetime] = mapped_column( default=lambda: datetime.now(UTC), index=True )The audit service captures before/after state diffs. It participates in the same database transaction as the operation it is auditing — if the operation rolls back, so does the audit entry. No phantom audit records.
from __future__ import annotationsfrom dataclasses import asdictfrom typing import Anyfrom uuid import UUIDfrom sqlalchemy.ext.asyncio import AsyncSessionfrom src.models.audit_log import AuditLog
class AuditService: def __init__(self, session: AsyncSession) -> None: self._session = session
async def log( self, *, tenant_id: UUID, actor_id: UUID | None, actor_type: str, action: str, resource_type: str, resource_id: str, before: dict[str, Any] | None = None, after: dict[str, Any] | None = None, ip_address: str | None = None, user_agent: str | None = None, ) -> None: entry = AuditLog( tenant_id=tenant_id, actor_id=actor_id, actor_type=actor_type, action=action, resource_type=resource_type, resource_id=str(resource_id), before_state=before, after_state=after, ip_address=ip_address, user_agent=user_agent, ) self._session.add(entry) # No commit — caller's transaction boundary controls thisIntegration with the order service shows audit logging as a natural extension, not a bolt-on:
# src/services/order_service.py (excerpt)async def create_order( self, payload: CreateOrderRequest, actor: AuthenticatedUser, request_meta: RequestMeta,) -> Order: order = Order(tenant_id=actor.tenant_id, **payload.model_dump()) self._session.add(order)
await self._audit.log( tenant_id=actor.tenant_id, actor_id=actor.id, actor_type="user", action="order.created", resource_type="order", resource_id=str(order.id), after=order.to_audit_dict(), ip_address=request_meta.ip, user_agent=request_meta.user_agent, )
await self._session.flush() return orderTenant Isolation Verification
Trust but verify. Automated tests should prove that tenant isolation holds — not just that it works today, but that every new feature maintains it.
import pytestfrom uuid import uuid4from httpx import AsyncClient
TENANT_A_TOKEN: str # JWT for tenant ATENANT_B_TOKEN: str # JWT for tenant B
@pytest.fixtureasync def seeded_tenants(async_client: AsyncClient) -> tuple[str, str]: """Create two tenants with separate data.""" tenant_a = await create_tenant_with_data(async_client, name="Acme Books") tenant_b = await create_tenant_with_data(async_client, name="Beta Publishing") return tenant_a.token, tenant_b.token
class TestCrossTenantIsolation: """Verify that no endpoint leaks data across tenants."""
@pytest.mark.asyncio async def test_tenant_a_cannot_read_tenant_b_orders( self, async_client: AsyncClient, seeded_tenants: tuple[str, str] ) -> None: token_a, token_b = seeded_tenants
# Create order as tenant B resp = await async_client.post( "/api/v1/orders", json={"items": [{"book_id": str(uuid4()), "quantity": 1}]}, headers={"Authorization": f"Bearer {token_b}"}, ) order_id = resp.json()["id"]
# Attempt to read as tenant A — must fail resp = await async_client.get( f"/api/v1/orders/{order_id}", headers={"Authorization": f"Bearer {token_a}"}, ) assert resp.status_code == 404 # Not 403 — do not confirm existence
@pytest.mark.asyncio async def test_tenant_a_cannot_list_tenant_b_inventory( self, async_client: AsyncClient, seeded_tenants: tuple[str, str] ) -> None: token_a, _ = seeded_tenants
resp = await async_client.get( "/api/v1/inventory", headers={"Authorization": f"Bearer {token_a}"}, ) items = resp.json()["items"]
# Every item must belong to tenant A for item in items: assert item["tenant_id"] == get_tenant_id_from_token(token_a)Security Headers Middleware
Defense in depth. Even if your application code is perfect, missing security headers leave the door open to XSS, clickjacking, and protocol downgrade attacks.
from starlette.middleware.base import BaseHTTPMiddlewarefrom starlette.requests import Requestfrom starlette.responses import Response
class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next) -> Response: response = await call_next(request) response.headers["Strict-Transport-Security"] = ( "max-age=63072000; includeSubDomains; preload" ) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" response.headers["Permissions-Policy"] = ( "camera=(), microphone=(), geolocation=()" ) response.headers["Content-Security-Policy"] = ( "default-src 'self'; script-src 'self'; style-src 'self' 'unsafe-inline'" ) return responsePutting It Together: The Penguin Random House Checklist
ShelfWise’s enterprise tenant needs:
-
Custom RBAC roles — “warehouse_lead” with
inventory.*andorders.readpermissions. Done: roles are tenant-scoped, created via the admin API. -
Scoped API keys — ERP sync key with only
inventory.readandinventory.update. Done: keys are scoped to specific permission strings, checked on every request. -
Audit logs for 90 days — Every order, inventory change, and user action logged with before/after state. Done: append-only
audit_logstable, partitioned monthly, queryable by resource type and date range. -
Quarterly isolation verification — Automated test suite proving cross-tenant access is impossible. Done:
TestCrossTenantIsolationruns in CI on every merge to main. -
Key rotation without downtime — Old key and new key active simultaneously during migration window. Done: multiple active keys per tenant, revocation is explicit.
Security is not a feature you ship once. It is a set of constraints that every future feature must satisfy. The RBAC model, API key system, and audit trail built here become the foundation that Parts 19 and 20 build on — because a health check that skips auth, or an event handler that skips audit logging, undoes everything.