Your application authenticates users perfectly — bcrypt passwords, short-lived JWTs, Google SSO. Then a customer asks: “Can our marketing team read reports but not export data? Can project leads approve documents but not delete them? Can contractors see only their own projects?” You realize your entire authorization model is a boolean is_admin column.
Authentication tells you who the user is. Authorization tells you what they can do. This post builds three authorization models in FastAPI — each solving a different level of access control complexity — and shows you when to use each.
The Three Models
| Model | Question | Complexity | Best For |
|---|---|---|---|
| RBAC | ”Does the user’s role allow this?” | Low | Apps with well-defined organizational roles |
| ABAC | ”Do the user’s/resource’s attributes satisfy the policy?” | Medium | Apps needing contextual rules (time, location, data classification) |
| ReBAC | ”What is the user’s relationship to this specific resource?” | High | Apps with shared resources and complex ownership (Google Docs, GitHub) |
RBAC: Role-Based Access Control
RBAC is the most common model. Users are assigned roles, and roles have permissions. A user can do something if their role grants the required permission.
The Data Model
from __future__ import annotationsfrom uuid import UUID, uuid4
from sqlalchemy import ForeignKey, String, Table, Columnfrom sqlalchemy.orm import Mapped, mapped_column, relationship
from src.db.base import Base
# Association tablesuser_roles = Table( "user_roles", Base.metadata, Column("user_id", ForeignKey("users.id"), primary_key=True), Column("role_id", ForeignKey("roles.id"), primary_key=True),)
role_permissions = Table( "role_permissions", Base.metadata, Column("role_id", ForeignKey("roles.id"), primary_key=True), Column("permission_id", ForeignKey("permissions.id"), primary_key=True),)
class Role(Base): __tablename__ = "roles"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) name: Mapped[str] = mapped_column(String(50), unique=True) # "admin", "editor", "viewer" description: Mapped[str] = mapped_column(String(200)) permissions: Mapped[list["Permission"]] = relationship(secondary=role_permissions)
class Permission(Base): __tablename__ = "permissions"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) resource: Mapped[str] = mapped_column(String(50)) # "documents", "users" action: Mapped[str] = mapped_column(String(50)) # "create", "read", "update", "delete"
@property def key(self) -> str: return f"{self.resource}:{self.action}"FastAPI Dependencies for RBAC
from functools import wrapsfrom fastapi import Depends, HTTPException, status
async def get_user_permissions( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db),) -> set[str]: """Load all permissions for the current user across all their roles.""" result = await db.execute( select(Permission) .join(role_permissions) .join(Role) .join(user_roles) .where(user_roles.c.user_id == user.id) ) permissions = result.scalars().all() return {p.key for p in permissions}
def require_permission(resource: str, action: str): """Dependency factory: check if the user has a specific permission.""" required = f"{resource}:{action}"
async def checker( permissions: set[str] = Depends(get_user_permissions), ): if required not in permissions: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing permission: {required}", )
return checker
def require_any_permission(*perms: str): """Dependency factory: check if the user has ANY of the listed permissions."""
async def checker( permissions: set[str] = Depends(get_user_permissions), ): if not permissions.intersection(perms): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing one of: {', '.join(perms)}", )
return checkerUsing RBAC in Routes
@router.get("/documents", dependencies=[Depends(require_permission("documents", "read"))])async def list_documents(db: AsyncSession = Depends(get_db)): ...
@router.post("/documents", dependencies=[Depends(require_permission("documents", "create"))])async def create_document(body: CreateDocumentRequest, db: AsyncSession = Depends(get_db)): ...
@router.delete( "/documents/{doc_id}", dependencies=[Depends(require_permission("documents", "delete"))],)async def delete_document(doc_id: UUID, db: AsyncSession = Depends(get_db)): ...When RBAC Breaks Down
RBAC struggles with contextual rules:
- “Editors can edit documents, but only during business hours”
- “Managers can approve expenses, but only up to $5,000”
- “Users can delete documents, but only ones they created”
These rules depend on attributes of the user, the resource, or the environment — not just the role. For that, you need ABAC.
ABAC: Attribute-Based Access Control
ABAC evaluates policies against attributes of four categories:
- Subject attributes: User role, department, clearance level
- Resource attributes: Document classification, owner, creation date
- Action: read, write, delete, approve
- Environment: Time of day, IP address, device type
Implementing ABAC in FastAPI
from dataclasses import dataclassfrom datetime import datetime, timezonefrom enum import Enumfrom typing import Any
class Effect(Enum): ALLOW = "allow" DENY = "deny"
@dataclassclass PolicyContext: subject: dict[str, Any] # User attributes resource: dict[str, Any] # Resource attributes action: str # The operation environment: dict[str, Any] # Contextual attributes
@dataclassclass Policy: name: str effect: Effect condition: callable # (PolicyContext) -> bool
def evaluate(self, ctx: PolicyContext) -> Effect | None: """Returns the effect if the condition matches, None otherwise.""" if self.condition(ctx): return self.effect return None
# Define policies as codepolicies: list[Policy] = [ # Admins can do anything Policy( name="admin_full_access", effect=Effect.ALLOW, condition=lambda ctx: "admin" in ctx.subject.get("roles", []), ),
# Editors can update documents during business hours Policy( name="editor_business_hours", effect=Effect.ALLOW, condition=lambda ctx: ( "editor" in ctx.subject.get("roles", []) and ctx.action in ("read", "update") and 9 <= ctx.environment.get("hour", 0) <= 17 ), ),
# Users can only delete their own documents Policy( name="owner_delete", effect=Effect.ALLOW, condition=lambda ctx: ( ctx.action == "delete" and ctx.resource.get("owner_id") == ctx.subject.get("user_id") ), ),
# Deny access to classified documents unless clearance matches Policy( name="classification_check", effect=Effect.DENY, condition=lambda ctx: ( ctx.resource.get("classification") == "confidential" and ctx.subject.get("clearance") != "confidential" ), ),]
def evaluate_policies(ctx: PolicyContext) -> bool: """Evaluate all policies. Deny takes precedence over Allow.""" result = Effect.DENY # Default deny
for policy in policies: effect = policy.evaluate(ctx) if effect == Effect.DENY: return False # Explicit deny — short circuit if effect == Effect.ALLOW: result = Effect.ALLOW
return result == Effect.ALLOWABAC FastAPI Dependency
from src.core.abac import PolicyContext, evaluate_policies
def require_abac(action: str): """ABAC dependency factory — evaluates policies against request context."""
async def checker( request: Request, user: User = Depends(get_current_user), resource: dict | None = None, ): ctx = PolicyContext( subject={ "user_id": str(user.id), "roles": user.role_names, "department": user.department, "clearance": user.clearance_level, }, resource=resource or {}, action=action, environment={ "hour": datetime.now(timezone.utc).hour, "ip": request.client.host, "method": request.method, }, )
if not evaluate_policies(ctx): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Access denied by policy for action: {action}", )
return checker
# Usage — with resource attributes loaded from DB@router.put("/documents/{doc_id}")async def update_document( doc_id: UUID, body: UpdateDocumentRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db),): document = await db.get(Document, doc_id) if not document: raise HTTPException(status_code=404)
# Build resource context and evaluate checker = require_abac("update") await checker( request=..., user=user, resource={ "owner_id": str(document.owner_id), "classification": document.classification, }, )
# Proceed with update ...ReBAC: Relationship-Based Access Control
ReBAC determines access based on the relationship between the user and the resource. This is the model behind Google Docs (“anyone with the link can view”), GitHub (“repository collaborators can push”), and Notion (“workspace members can edit”).
A Simplified ReBAC Implementation
class ResourceRelation(Base): __tablename__ = "resource_relations"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) subject_type: Mapped[str] = mapped_column(String(50)) # "user", "team" subject_id: Mapped[UUID] = mapped_column() relation: Mapped[str] = mapped_column(String(50)) # "owner", "editor", "viewer" resource_type: Mapped[str] = mapped_column(String(50)) # "document", "folder" resource_id: Mapped[UUID] = mapped_column()
__table_args__ = ( UniqueConstraint( "subject_type", "subject_id", "relation", "resource_type", "resource_id", name="uq_relation", ), Index("ix_resource_lookup", "resource_type", "resource_id"), )# Relation hierarchy — owner inherits editor, editor inherits viewerRELATION_HIERARCHY: dict[str, set[str]] = { "owner": {"owner", "editor", "viewer"}, "editor": {"editor", "viewer"}, "viewer": {"viewer"},}
async def check_relation( db: AsyncSession, user_id: UUID, resource_type: str, resource_id: UUID, required_relation: str,) -> bool: """Check if user has the required (or higher) relation to a resource.""" # Get all relations for this user and resource result = await db.execute( select(ResourceRelation.relation).where( ResourceRelation.subject_type == "user", ResourceRelation.subject_id == user_id, ResourceRelation.resource_type == resource_type, ResourceRelation.resource_id == resource_id, ) ) user_relations = {r for (r,) in result.all()}
# Check via team memberships too team_ids = await get_user_team_ids(db, user_id) if team_ids: result = await db.execute( select(ResourceRelation.relation).where( ResourceRelation.subject_type == "team", ResourceRelation.subject_id.in_(team_ids), ResourceRelation.resource_type == resource_type, ResourceRelation.resource_id == resource_id, ) ) user_relations.update(r for (r,) in result.all())
# Check if any relation grants the required access for rel in user_relations: if required_relation in RELATION_HIERARCHY.get(rel, set()): return True
return FalseReBAC FastAPI Dependency
def require_relation(resource_type: str, relation: str, id_param: str = "doc_id"): """Dependency that checks resource-level relationship."""
async def checker( request: Request, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): resource_id = request.path_params.get(id_param) if not resource_id: raise HTTPException(status_code=400, detail="Missing resource ID")
has_access = await check_relation( db, user.id, resource_type, UUID(resource_id), relation, )
if not has_access: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"You do not have '{relation}' access to this {resource_type}", )
return checker
# Usage@router.get( "/documents/{doc_id}", dependencies=[Depends(require_relation("document", "viewer"))],)async def get_document(doc_id: UUID, db: AsyncSession = Depends(get_db)): ...
@router.put( "/documents/{doc_id}", dependencies=[Depends(require_relation("document", "editor"))],)async def update_document(doc_id: UUID, db: AsyncSession = Depends(get_db)): ...Comparing the Three Models
| Dimension | RBAC | ABAC | ReBAC |
|---|---|---|---|
| Decision based on | User’s role | Multiple attributes | User-resource relationship |
| Granularity | Coarse (role-level) | Fine (attribute-level) | Fine (resource-level) |
| Complexity | Low | Medium | High |
| Performance | 1 DB query (roles) | Multiple attributes to gather | Graph traversal |
| Best for | Internal tools, admin panels | Compliance-heavy apps | Collaborative platforms |
| Example rule | ”Admins can delete" | "Editors can edit during business hours" | "Only the document owner can share” |
| Scaling challenge | Role explosion | Policy complexity | Relationship graph size |
Production Patterns
Pattern 1: Layered Authorization
Most production systems combine models. Check the cheap, coarse-grained rules first (RBAC), then the expensive, fine-grained rules (ABAC/ReBAC):
@router.delete("/documents/{doc_id}")async def delete_document( doc_id: UUID, user: User = Depends(get_current_user), permissions: set[str] = Depends(get_user_permissions), # RBAC db: AsyncSession = Depends(get_db),): # Layer 1: RBAC — does the role allow deletion at all? if "documents:delete" not in permissions: raise HTTPException(status_code=403, detail="Role lacks delete permission")
# Layer 2: ReBAC — does the user own or have editor access to this specific document? has_access = await check_relation(db, user.id, "document", doc_id, "owner") if not has_access: raise HTTPException(status_code=403, detail="Only the document owner can delete")
# Both checks passed — proceed ...Pattern 2: External Policy Engine
For complex ABAC rules, consider an external policy engine like Open Policy Agent (OPA) or AWS Cedar. Your FastAPI app sends the context; the engine returns the decision.
# Pseudocode: OPA integrationasync def evaluate_opa(ctx: PolicyContext) -> bool: async with httpx.AsyncClient() as client: response = await client.post( "http://opa:8181/v1/data/authz/allow", json={"input": asdict(ctx)}, ) return response.json().get("result", False)This separates policy from code — policies can be updated without redeploying your application.
What’s Next
In Part 8, we bring everything together. A comprehensive comparison matrix, a decision tree for choosing the right auth approach, common production combinations, and a security checklist for shipping auth in production.