Your application uses “Login with Google” via OAuth 2.0. After the authorization code exchange, you have an access token. You call the /userinfo endpoint to get the user’s email and name. It works — until Google rate-limits your /userinfo calls, or the endpoint is down, or the access token has expired by the time you call it.
The access token was never meant to carry identity. It is a key that unlocks API access. You are making an extra HTTP call to answer a question that should be answered in the original token exchange. OpenID Connect fixes this.
What OpenID Connect Adds to OAuth 2.0
OpenID Connect (OIDC) is a thin identity layer on top of OAuth 2.0. It adds exactly three things:
- ID Token: A JWT that contains the user’s identity claims (
sub,email,name). Returned alongside the access token. No extra API call needed. - UserInfo Endpoint: A standardized endpoint (optional) for fetching additional claims.
- Discovery Document: A well-known URL (
.well-known/openid-configuration) that describes the provider’s endpoints, supported scopes, and signing keys.
The Key Difference
| With OAuth 2.0 Only | With OpenID Connect |
|---|---|
| Exchange code → get access token | Exchange code → get access token + ID token |
Call /userinfo API to learn who the user is | Decode the ID token locally — user identity is in the JWT |
| Non-standard user profile endpoints | Standardized claims (sub, email, name, picture) |
| Each provider has different scopes | openid scope is universal across all OIDC providers |
The ID Token
An ID token is a JWT (signed, not encrypted by default) that contains standardized claims:
{ "iss": "https://accounts.google.com", "sub": "110248495921238986420", "aud": "your-client-id.apps.googleusercontent.com", "exp": 1700000000, "iat": 1699999000, "nonce": "random-nonce-from-your-app", "email": "user@gmail.com", "email_verified": true, "name": "Jane Doe", "picture": "https://lh3.googleusercontent.com/..."}| Claim | Purpose | Validation |
|---|---|---|
iss | Who issued the token | Must match the expected provider URL |
sub | Subject — unique user ID at the provider | Use this as the stable identifier, not email |
aud | Audience — your client ID | Must match YOUR client ID (prevents token confusion) |
exp | Expiry timestamp | Must be in the future |
iat | Issued-at timestamp | Should be recent |
nonce | Replay protection | Must match the nonce you sent in the auth request |
email | User’s email | Check email_verified before trusting it |
Critical: The sub claim is the only stable identifier. Emails can change. Names are not unique. The sub is guaranteed unique per provider.
Implementing Google SSO with OIDC
Step 1: Discovery
Instead of hardcoding Google’s endpoint URLs, fetch them from the discovery document:
import httpxfrom functools import lru_cache
GOOGLE_DISCOVERY_URL = "https://accounts.google.com/.well-known/openid-configuration"
@lru_cache(maxsize=1)async def get_google_config() -> dict: async with httpx.AsyncClient() as client: response = await client.get(GOOGLE_DISCOVERY_URL) return response.json() # Returns: # { # "authorization_endpoint": "https://accounts.google.com/o/oauth2/v2/auth", # "token_endpoint": "https://oauth2.googleapis.com/token", # "userinfo_endpoint": "https://openidconnect.googleapis.com/v1/userinfo", # "jwks_uri": "https://www.googleapis.com/oauth2/v3/certs", # ... # }Step 2: Initiate the Flow
import secretsfrom urllib.parse import urlencode
from fastapi import APIRouter, Requestfrom fastapi.responses import RedirectResponse
from src.core.config import settingsfrom src.core.oidc import get_google_config
router = APIRouter(prefix="/auth/google", tags=["oidc"])
@router.get("/login")async def google_login(request: Request): config = await get_google_config() state = secrets.token_urlsafe(32) nonce = secrets.token_urlsafe(32)
params = urlencode({ "client_id": settings.google_client_id, "redirect_uri": settings.google_redirect_uri, "response_type": "code", "scope": "openid email profile", # "openid" triggers OIDC "state": state, "nonce": nonce, "access_type": "offline", # Request refresh token "prompt": "consent", # Force consent screen })
response = RedirectResponse(f"{config['authorization_endpoint']}?{params}")
# Store state AND nonce for verification response.set_cookie("oauth_state", state, httponly=True, secure=True, samesite="lax", max_age=600) response.set_cookie("oauth_nonce", nonce, httponly=True, secure=True, samesite="lax", max_age=600)
return responseThe scope: "openid email profile" is what triggers OIDC. The openid scope tells the authorization server to return an ID token.
Step 3: Validate the ID Token
import jwtfrom jwt import PyJWKClient
# Cache the JWKS client — it fetches Google's public keysjwks_client = PyJWKClient("https://www.googleapis.com/oauth2/v3/certs")
def validate_id_token(id_token: str, expected_nonce: str) -> dict: """Validate a Google ID token and return the claims.""" # Fetch the signing key from Google's JWKS signing_key = jwks_client.get_signing_key_from_jwt(id_token)
# Decode and validate claims = jwt.decode( id_token, signing_key.key, algorithms=["RS256"], audience=settings.google_client_id, # Validates 'aud' claim issuer="https://accounts.google.com", # Validates 'iss' claim options={"require": ["sub", "iss", "aud", "exp", "iat", "nonce"]}, )
# Verify nonce (replay protection) if claims.get("nonce") != expected_nonce: raise ValueError("Nonce mismatch — possible replay attack")
# Verify email is verified if not claims.get("email_verified", False): raise ValueError("Email not verified by Google")
return claimsWhy validate locally instead of calling /userinfo? The ID token is already signed by Google. Validating the signature and claims is a local operation — no network call, no rate limiting, no additional latency. You only need the /userinfo endpoint for claims not included in the ID token.
Step 4: Handle the Callback
@router.get("/callback")async def google_callback( code: str, state: str, request: Request, response: Response, db: AsyncSession = Depends(get_db),): # Verify state stored_state = request.cookies.get("oauth_state") stored_nonce = request.cookies.get("oauth_nonce") if not stored_state or not secrets.compare_digest(stored_state, state): raise HTTPException(status_code=400, detail="Invalid OAuth state")
config = await get_google_config()
# Exchange code for tokens async with httpx.AsyncClient() as client: token_response = await client.post( config["token_endpoint"], data={ "code": code, "client_id": settings.google_client_id, "client_secret": settings.google_client_secret, "redirect_uri": settings.google_redirect_uri, "grant_type": "authorization_code", }, ) tokens = token_response.json()
# Validate the ID token (no extra API call needed!) id_token = tokens.get("id_token") if not id_token: raise HTTPException(status_code=400, detail="No ID token in response")
claims = validate_id_token(id_token, expected_nonce=stored_nonce)
# Find or create user using the stable 'sub' claim user = await find_or_create_oauth_user( db, provider="google", provider_id=claims["sub"], # Stable — never changes email=claims["email"], # May change — not a primary key display_name=claims.get("name", claims["email"]), avatar_url=claims.get("picture"), )
# Issue your own session/JWT jwt_token = create_access_token(user_id=str(user.id), roles=user.roles)
# Clean up OAuth cookies response.delete_cookie("oauth_state") response.delete_cookie("oauth_nonce")
return RedirectResponse( f"{settings.frontend_url}/auth/callback?token={jwt_token}" )Single Sign-On (SSO) Architecture
SSO means a user authenticates once and gains access to multiple applications without re-entering credentials. OIDC is the most common protocol for implementing SSO.
How SSO works at the session level:
- User opens App 1, which redirects to the IdP.
- User logs in at the IdP. The IdP sets a session cookie on its domain.
- User is redirected back to App 1 with an authorization code.
- User opens App 2, which redirects to the IdP.
- The IdP sees its session cookie — user is already authenticated. No login prompt.
- User is redirected back to App 2 with an authorization code.
The user logged in once but has sessions in both applications.
SP-Initiated vs. IdP-Initiated SSO
| Pattern | Flow | Common In |
|---|---|---|
| SP-initiated | User starts at the app → redirected to IdP → redirected back | Most web apps (the flow we built above) |
| IdP-initiated | User starts at the IdP portal → clicks app → redirected to app | Enterprise dashboards (Okta, Azure AD portal) |
OIDC vs. SAML: When to Use Which
SAML (Security Assertion Markup Language) is the older enterprise SSO standard. Both solve SSO, but they differ significantly:
| Dimension | OpenID Connect | SAML 2.0 |
|---|---|---|
| Year | 2014 | 2005 |
| Format | JSON + JWT | XML |
| Transport | HTTPS | HTTPS (XML SOAP) |
| Token | ID Token (JWT, ~1KB) | SAML Assertion (XML, ~10KB) |
| Best for | Web apps, SPAs, mobile, APIs | Enterprise legacy, on-prem |
| Complexity | Moderate | High (XML signatures, canonicalization) |
| Mobile support | Native | Poor (XML parsing, no native SDK support) |
| Library ecosystem | Excellent (python-jose, PyJWT, authlib) | Limited (python3-saml, pysaml2) |
| Common providers | Google, GitHub, Auth0, Okta | Okta, Azure AD, PingFederate, ADFS |
The practical answer: Use OIDC unless a customer’s enterprise IdP only supports SAML. Even then, consider putting a SAML-to-OIDC bridge (Auth0, Okta) in front so your application only speaks OIDC.
The OAuth User Model
When users can sign in via multiple providers, your user model needs a linked accounts table:
class OAuthAccount(Base): __tablename__ = "oauth_accounts"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4) user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id")) provider: Mapped[str] = mapped_column(String(50)) # "google", "github" provider_user_id: Mapped[str] = mapped_column(String(255)) # The 'sub' claim provider_email: Mapped[str | None] = mapped_column(String(320), nullable=True)
__table_args__ = ( UniqueConstraint("provider", "provider_user_id", name="uq_provider_account"), )
user: Mapped["User"] = relationship(back_populates="oauth_accounts")async def find_or_create_oauth_user( db: AsyncSession, provider: str, provider_id: str, email: str, display_name: str, avatar_url: str | None = None,) -> User: # Check if this OAuth account exists result = await db.execute( select(OAuthAccount) .where(OAuthAccount.provider == provider, OAuthAccount.provider_user_id == provider_id) .options(joinedload(OAuthAccount.user)) ) oauth_account = result.scalar_one_or_none()
if oauth_account: return oauth_account.user
# Check if a user with this email exists (link accounts) result = await db.execute(select(User).where(User.email == email)) user = result.scalar_one_or_none()
if not user: user = User( email=email, display_name=display_name, hashed_password="", # No password — OAuth-only user is_active=True, is_verified=True, # Email verified by provider ) db.add(user) await db.flush()
# Link the OAuth account link = OAuthAccount( user_id=user.id, provider=provider, provider_user_id=provider_id, provider_email=email, ) db.add(link) await db.commit()
return userWhat’s Next
In Part 7, we shift from authentication to authorization. You know who the user is — now you need to decide what they can do. We will implement RBAC (role-based), ABAC (attribute-based), and policy-based authorization patterns using FastAPI’s dependency injection.