Six weeks after ShelfWise launched, we needed to add a fulfillment_status column to the orders table. A developer ran ALTER TABLE orders ADD COLUMN fulfillment_status VARCHAR(20). The table had 50 million rows. PostgreSQL took an ACCESS EXCLUSIVE lock for eight minutes. During those eight minutes, every INSERT, UPDATE, and SELECT on the orders table queued behind the lock. Every tenant’s checkout flow failed. Every order status page returned a 504.
The developer did nothing wrong by PostgreSQL standards. But in a multi-tenant SaaS serving real customers, a migration that blocks reads for eight minutes is an eight-minute outage.
This post covers the patterns that make migrations safe: expand-contract, online DDL, tenant-aware migration runners, and recovery from partial failures.
The Expand-Contract Pattern
The core idea is to never make a breaking schema change in a single step. Instead, every migration is split into phases that are each individually backward-compatible.
Each phase is a separate deployment. If Phase 2 reveals a problem, you roll back Phase 2’s code without touching the schema. The database is always in a state that works with both the current and previous versions of the application.
| Strategy | Downtime Risk | Rollback Safety | Complexity | When to Use |
|---|---|---|---|---|
| Single ALTER TABLE | High — locks table | None — cannot undo | Trivial | Never in production |
| Expand-Contract | None — each step is safe | High — each phase reversible | Moderate | Default for all migrations |
| Shadow Table | None — writes to both | High — switch back to original | High | Large-scale restructuring |
| Blue-Green Database | None — full DB copy | Complete — switch DNS | Very high | Major version upgrades |
Concrete Example: Adding fulfillment_status
Here is how ShelfWise should have added that fulfillment_status column.
Phase 1: Expand — Add the column as nullable with no default. This is an instant metadata-only operation in PostgreSQL 11+. No table rewrite, no lock beyond a brief ACCESS EXCLUSIVE for catalog update (milliseconds).
"""Add fulfillment_status column (nullable, no default)."""
from alembic import opimport sqlalchemy as sa
revision = "001_expand"down_revision = "000_previous"
def upgrade() -> None: # Instant in PG 11+ — no table rewrite for nullable column op.add_column( "orders", sa.Column("fulfillment_status", sa.String(20), nullable=True), )
def downgrade() -> None: op.drop_column("orders", "fulfillment_status")Phase 2: Migrate — Deploy code that writes to the new column. Backfill existing rows in batches.
"""Backfill fulfillment_status from existing order status."""
from alembic import opimport sqlalchemy as sa
revision = "002_migrate"down_revision = "001_expand"
# Data migration — separate from schema migrationdef upgrade() -> None: conn = op.get_bind()
# Backfill in batches of 10,000 to avoid long-running transactions while True: result = conn.execute( sa.text(""" UPDATE orders SET fulfillment_status = CASE WHEN status = 'shipped' THEN 'shipped' WHEN status = 'delivered' THEN 'delivered' ELSE 'pending' END WHERE fulfillment_status IS NULL AND id IN ( SELECT id FROM orders WHERE fulfillment_status IS NULL LIMIT 10000 ) """) ) if result.rowcount == 0: break
def downgrade() -> None: # Nullify the column — code still handles NULL conn = op.get_bind() conn.execute(sa.text("UPDATE orders SET fulfillment_status = NULL"))Phase 3: Contract — After confirming all rows are backfilled and the new code is stable, add the NOT NULL constraint and drop any old columns.
"""Make fulfillment_status NOT NULL."""
from alembic import opimport sqlalchemy as sa
revision = "003_contract"down_revision = "002_migrate"
def upgrade() -> None: # Add NOT NULL constraint using ADD CONSTRAINT NOT VALID + VALIDATE # to avoid full table lock op.execute( "ALTER TABLE orders ADD CONSTRAINT orders_fulfillment_status_nn " "CHECK (fulfillment_status IS NOT NULL) NOT VALID" ) op.execute( "ALTER TABLE orders VALIDATE CONSTRAINT orders_fulfillment_status_nn" )
def downgrade() -> None: op.execute( "ALTER TABLE orders DROP CONSTRAINT orders_fulfillment_status_nn" )The NOT VALID + VALIDATE two-step is critical. NOT VALID adds the constraint without scanning existing rows (instant). VALIDATE scans existing rows but only takes a SHARE UPDATE EXCLUSIVE lock, which does not block reads or writes. A plain ALTER COLUMN SET NOT NULL takes an ACCESS EXCLUSIVE lock and scans the entire table.
Online DDL for PostgreSQL
PostgreSQL has specific patterns for safe DDL. Here are the operations that matter most and how to run them without locking.
CREATE INDEX CONCURRENTLY
A standard CREATE INDEX takes a SHARE lock on the table, blocking all writes for the duration. On a 50M-row table, that can be minutes.
"""Add index on fulfillment_status for query performance."""
from alembic import op
revision = "004_index"down_revision = "003_contract"
def upgrade() -> None: # CONCURRENTLY does not block writes — takes longer but is safe op.execute( "CREATE INDEX CONCURRENTLY IF NOT EXISTS " "ix_orders_fulfillment_status ON orders (fulfillment_status)" )
def downgrade() -> None: op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_orders_fulfillment_status")Instant ADD COLUMN (PostgreSQL 11+)
Since PostgreSQL 11, ADD COLUMN with a non-volatile default is instant — no table rewrite. But there is a trap: ADD COLUMN ... NOT NULL DEFAULT 'value' is instant for the default, but adding the NOT NULL constraint still requires a full table scan to verify no existing NULLs. Use the expand-contract pattern instead.
# Safe: instant metadata changeop.add_column("orders", sa.Column("priority", sa.Integer, nullable=True))
# Safe: instant with default (PG 11+)op.add_column("orders", sa.Column("priority", sa.Integer, server_default="0"))
# DANGEROUS: NOT NULL requires full table scan to verify# op.add_column("orders", sa.Column("priority", sa.Integer, nullable=False))Multi-Tenant Migration Runner
In a schema-per-tenant architecture, every migration must run across every tenant schema. A naive loop runs them sequentially — if you have 200 tenants and each migration takes 5 seconds, that is 16 minutes.
Worse, if migration 150 fails, tenants 1-149 are on the new schema and tenants 150-200 are on the old one. Your application must handle both schema versions simultaneously.
Tenant Schema Version Tracking
Track which schema version each tenant is on. This is the foundation for safe rollouts and partial failure recovery.
from dataclasses import dataclassfrom datetime import UTC, datetimefrom enum import StrEnum
from sqlalchemy import String, Integer, DateTime, select, updatefrom sqlalchemy.ext.asyncio import AsyncSession, AsyncEnginefrom sqlalchemy.orm import Mapped, mapped_column
from src.db.base import Base
class MigrationStatus(StrEnum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" ROLLED_BACK = "rolled_back"
class TenantSchemaVersion(Base): """Track migration state per tenant — stored in the shared schema.""" __tablename__ = "tenant_schema_versions"
tenant_id: Mapped[int] = mapped_column(Integer, primary_key=True) current_revision: Mapped[str] = mapped_column(String(50)) target_revision: Mapped[str] = mapped_column(String(50), nullable=True) status: Mapped[str] = mapped_column(String(20), default=MigrationStatus.COMPLETED) last_error: Mapped[str | None] = mapped_column(String(500), nullable=True) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(UTC), )
@dataclassclass TenantMigrationResult: tenant_id: int success: bool from_revision: str to_revision: str error: str | None = None duration_ms: float = 0.0The Migration Runner
The runner processes tenants with controlled concurrency. It does not blindly run all tenants in parallel — that would overload the database. Instead, it uses a semaphore to limit concurrent migrations.
import asyncioimport timeimport structlogfrom collections.abc import Sequence
from alembic import commandfrom alembic.config import Config
from src.db.tenant_migrations import ( TenantSchemaVersion, TenantMigrationResult, MigrationStatus,)
logger = structlog.get_logger()
MAX_CONCURRENT_MIGRATIONS = 5
class TenantMigrationRunner: """Run Alembic migrations across all tenant schemas with concurrency control."""
def __init__(self, alembic_config: Config, engine: AsyncEngine) -> None: self._config = alembic_config self._engine = engine self._semaphore = asyncio.Semaphore(MAX_CONCURRENT_MIGRATIONS)
async def migrate_all( self, target_revision: str, ) -> list[TenantMigrationResult]: """Migrate all tenants to the target revision.""" tenants = await self._get_tenants()
tasks = [ self._migrate_tenant(tenant, target_revision) for tenant in tenants ] results = await asyncio.gather(*tasks, return_exceptions=False)
succeeded = sum(1 for r in results if r.success) failed = sum(1 for r in results if not r.success) logger.info( "migration_batch_complete", total=len(results), succeeded=succeeded, failed=failed, target=target_revision, )
return results
async def _migrate_tenant( self, tenant: TenantSchemaVersion, target_revision: str, ) -> TenantMigrationResult: async with self._semaphore: start = time.monotonic() try: await self._mark_status( tenant.tenant_id, MigrationStatus.RUNNING, target_revision, )
# Run Alembic upgrade for this tenant's schema self._config.set_main_option( "sqlalchemy.url", str(self._engine.url), ) self._config.set_main_option( "tenant_schema", f"tenant_{tenant.tenant_id}", ) command.upgrade(self._config, target_revision)
await self._mark_status( tenant.tenant_id, MigrationStatus.COMPLETED, target_revision, )
duration = (time.monotonic() - start) * 1000 logger.info( "tenant_migration_completed", tenant_id=tenant.tenant_id, revision=target_revision, duration_ms=duration, ) return TenantMigrationResult( tenant_id=tenant.tenant_id, success=True, from_revision=tenant.current_revision, to_revision=target_revision, duration_ms=duration, )
except Exception as exc: duration = (time.monotonic() - start) * 1000 error_msg = str(exc)[:500] await self._mark_status( tenant.tenant_id, MigrationStatus.FAILED, target_revision, error=error_msg, ) logger.error( "tenant_migration_failed", tenant_id=tenant.tenant_id, error=error_msg, duration_ms=duration, ) return TenantMigrationResult( tenant_id=tenant.tenant_id, success=False, from_revision=tenant.current_revision, to_revision=target_revision, error=error_msg, duration_ms=duration, )
async def retry_failed(self, target_revision: str) -> list[TenantMigrationResult]: """Retry only tenants that failed the previous migration attempt.""" async with AsyncSessionLocal() as session: stmt = select(TenantSchemaVersion).where( TenantSchemaVersion.status == MigrationStatus.FAILED, ) failed = (await session.execute(stmt)).scalars().all()
tasks = [ self._migrate_tenant(tenant, target_revision) for tenant in failed ] return await asyncio.gather(*tasks, return_exceptions=False)
async def _get_tenants(self) -> Sequence[TenantSchemaVersion]: async with AsyncSessionLocal() as session: stmt = select(TenantSchemaVersion).order_by( TenantSchemaVersion.tenant_id, ) return (await session.execute(stmt)).scalars().all()
async def _mark_status( self, tenant_id: int, status: MigrationStatus, target_revision: str, error: str | None = None, ) -> None: async with AsyncSessionLocal() as session: stmt = ( update(TenantSchemaVersion) .where(TenantSchemaVersion.tenant_id == tenant_id) .values( status=status, target_revision=target_revision, last_error=error, updated_at=datetime.now(UTC), ) ) await session.execute(stmt) await session.commit()The retry_failed method is the recovery mechanism. When migration 151 fails, you fix the issue, then call retry_failed — it picks up only the tenants that failed, not the 150 that already succeeded.
Blue-Green Schema Compatibility
During the expand phase, two versions of your application are running simultaneously: the old version (pre-migration) and the new version (post-migration). Both must work with the current schema.
The rule is strict: new code must handle the column being NULL. Old code must ignore the new column. If either assumption breaks, you get errors during the rolling deploy.
# src/models/order.py — Phase 2 code (handles both states)class Order(TenantMixin, TimestampMixin, Base): __tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True) customer_id: Mapped[int] = mapped_column(ForeignKey("customers.id")) status: Mapped[str] = mapped_column(String(20)) total: Mapped[Decimal] = mapped_column(Numeric(10, 2))
# New column — nullable during expand phase fulfillment_status: Mapped[str | None] = mapped_column( String(20), nullable=True, )
@property def effective_fulfillment_status(self) -> str: """Backward-compatible accessor during migration.""" if self.fulfillment_status is not None: return self.fulfillment_status # Derive from old status column for unbackfilled rows if self.status == "shipped": return "shipped" if self.status == "delivered": return "delivered" return "pending"The effective_fulfillment_status property provides a consistent interface regardless of whether the row has been backfilled. Service layer code uses this property instead of the raw column. After Phase 3, the property simplifies to a direct column access.
Data Migrations: Separate and Batchable
Data migrations (backfilling values, transforming formats) are fundamentally different from schema migrations (adding columns, creating indexes). They should never be in the same Alembic revision.
The reason is operational: a schema migration that takes 50 milliseconds becomes a schema migration that takes 20 minutes when you add a backfill. If the backfill fails at row 40 million, you cannot cleanly downgrade the schema portion.
"""Standalone backfill script — run independently of Alembic."""
import asyncioimport structlogfrom sqlalchemy import textfrom sqlalchemy.ext.asyncio import create_async_engine
logger = structlog.get_logger()
BATCH_SIZE = 5_000SLEEP_BETWEEN_BATCHES = 0.1 # seconds — reduce DB pressure
async def backfill(database_url: str) -> int: engine = create_async_engine(database_url) total_updated = 0
async with engine.begin() as conn: while True: result = await conn.execute(text(""" WITH batch AS ( SELECT id FROM orders WHERE fulfillment_status IS NULL LIMIT :batch_size FOR UPDATE SKIP LOCKED ) UPDATE orders SET fulfillment_status = CASE WHEN status = 'shipped' THEN 'shipped' WHEN status = 'delivered' THEN 'delivered' ELSE 'pending' END FROM batch WHERE orders.id = batch.id """), {"batch_size": BATCH_SIZE})
if result.rowcount == 0: break
total_updated += result.rowcount logger.info( "backfill_progress", updated_this_batch=result.rowcount, total_updated=total_updated, ) await asyncio.sleep(SLEEP_BETWEEN_BATCHES)
logger.info("backfill_complete", total_updated=total_updated) return total_updated
if __name__ == "__main__": import sys asyncio.run(backfill(sys.argv[1]))Key decisions in this script:
FOR UPDATE SKIP LOCKEDprevents conflicts with concurrent writes and other backfill instancesSLEEP_BETWEEN_BATCHESreduces sustained load on the database — without it, the backfill monopolizes I/O- Batch size of 5,000 is small enough to avoid long-running transactions but large enough to be efficient
- CTE-based update ensures the
SELECTandUPDATEare in the same statement, preventing TOCTOU races
Migration Rollback Strategy
Every migration must have a working downgrade. This is not optional. If Phase 3 reveals a bug, you need to roll back to Phase 2 without data loss.
The test is straightforward: run upgrade, insert test data, run downgrade, verify the application works, run upgrade again, verify the test data survived. If any step fails, the migration is not ready.
import pytestfrom alembic import commandfrom alembic.config import Config
@pytest.fixturedef alembic_config(test_database_url: str) -> Config: config = Config("alembic.ini") config.set_main_option("sqlalchemy.url", test_database_url) return config
def test_migration_round_trip(alembic_config: Config) -> None: """Every migration must survive upgrade → downgrade → upgrade.""" command.upgrade(alembic_config, "head") command.downgrade(alembic_config, "base") command.upgrade(alembic_config, "head") # If we get here without errors, the migrations are reversiblePartial Failure Recovery
The worst migration scenario is partial success across tenants. Tenants 1-150 are on revision 003_contract. Tenant 151 failed because it had a row violating the new constraint. Tenants 152-200 are still on 002_migrate.
The migration runner’s status tracking handles this:
# CLI command for migration operationsasync def migration_status(engine: AsyncEngine) -> None: """Print migration status for all tenants.""" async with AsyncSessionLocal() as session: stmt = select(TenantSchemaVersion).order_by( TenantSchemaVersion.status, TenantSchemaVersion.tenant_id, ) versions = (await session.execute(stmt)).scalars().all()
by_status: dict[str, list[int]] = {} for v in versions: by_status.setdefault(v.status, []).append(v.tenant_id)
for status, tenant_ids in by_status.items(): print(f"{status}: {len(tenant_ids)} tenants") if status == MigrationStatus.FAILED: for tid in tenant_ids: tenant = next(v for v in versions if v.tenant_id == tid) print(f" Tenant {tid}: {tenant.last_error}")The recovery workflow:
- Run
migration_statusto identify failed tenants - Fix the data issue on the failed tenant (remove constraint-violating rows, fix bad data)
- Run
retry_failedto resume from where it stopped - Verify all tenants report
COMPLETED
Your application code must work with tenants on different schema versions during this window. The effective_fulfillment_status property pattern from earlier handles this at the model level.
What Happens Without This
That eight-minute CREATE INDEX without CONCURRENTLY was ShelfWise’s first production incident. The lesson was expensive: 200 tenants lost order processing for eight minutes. The postmortem identified three root causes: no expand-contract discipline, no concurrent DDL, and no migration testing against production-scale data.
The second incident was worse. A data migration ran inside the schema migration. It failed at row 30 million on tenant 151. The migration had already modified the schema for tenants 1-150. The downgrade path had not been tested. Recovery took four hours and a point-in-time backup restore.
Both incidents were entirely preventable with the patterns in this post.
Checklist
Before running any migration in production:
- Schema changes use expand-contract: nullable first, backfill second, constrain third
CREATE INDEX CONCURRENTLYfor all index creation on existing tablesNOT VALID+VALIDATE CONSTRAINTfor adding NOT NULL constraints- Data migrations are separate scripts, not Alembic revisions
- Backfills run in batches with sleep intervals between them
- Every migration has a tested downgrade path
- Multi-tenant runner tracks per-tenant migration status
retry_failedrecovers from partial failure without re-running successful tenants- Application code handles both old and new schema during the migration window
- New code runs against old schema in CI before deploying the migration
The next post in this series will tackle API versioning — how to evolve your endpoints without breaking existing integrations, and how multi-tenancy adds a layer of complexity when different tenants need to stay on different API versions.