Python’s async keyword does not make your code faster. It makes your I/O-bound code more efficient. Understanding the difference determines whether your FastAPI app handles 10 concurrent requests or 10,000.
The confusion usually sounds like this: “We added async everywhere, so it should be concurrent now.” Adding async def to a function makes it a coroutine — that is necessary but not sufficient. You also need to avoid blocking the event loop, use asyncio.gather() correctly, and understand when SQLAlchemy’s async session is and is not safe to use concurrently.
Concurrency vs Parallelism in Python
Parallelism means multiple CPU cores executing code simultaneously. Python’s GIL (Global Interpreter Lock) prevents this for CPU-bound work — only one thread executes Python bytecode at a time.
Concurrency means managing multiple tasks by interleaving them — while one task is waiting for I/O (a database response, an HTTP call, a file read), another task runs. This is what asyncio gives you.
The event loop processes one coroutine at a time. When a coroutine hits an await, it suspends and the loop picks up another waiting coroutine. The result: your FastAPI server handles many concurrent requests without spawning a thread per request.
Async does not reduce total work. A 300ms database query takes 300ms either way. Async lets your server do other work during that wait instead of blocking a thread.
When asyncio.gather() Is Wrong
The most dangerous misuse of asyncio.gather() is passing coroutines that share a single AsyncSession. This looks correct but produces intermittent production failures:
# BUG: two coroutines sharing the same session in gather()async def get_order_details(order_id: int, session: AsyncSession): order, books = await asyncio.gather( session.execute(select(Order).where(Order.id == order_id)), session.execute(select(Book).join(OrderItem).where(OrderItem.order_id == order_id)), ) # SQLAlchemy async sessions are NOT safe for concurrent use. # This causes: sqlalchemy.exc.InvalidRequestError # It may work in tests (sequential execution under low load) # and fail in production under concurrent traffic.The reason is that AsyncSession maintains internal state (transaction context, identity map, pending flush queue) that is not designed for concurrent access. Two coroutines modifying this state simultaneously corrupts it.
Correct Pattern: Separate Sessions for Parallel Work
When you genuinely need to parallelize independent database queries — for example, loading a dashboard that needs both recent orders and a reading list — give each coroutine its own session:
# CORRECT: independent sessions for parallel queriesasync def fetch_dashboard_data(user_id: int): async def get_recent_orders(): async with AsyncSessionLocal() as session: result = await session.execute( select(Order).where(Order.user_id == user_id).limit(5) ) return result.scalars().all()
async def get_reading_list(): async with AsyncSessionLocal() as session: result = await session.execute( select(Book).join(ReadingList).where(ReadingList.user_id == user_id) ) return result.scalars().all()
orders, books = await asyncio.gather( get_recent_orders(), get_reading_list(), ) return {"orders": orders, "books": books}This works because the two coroutines are genuinely independent. They query different tables, hold separate connections from the pool, and do not share any mutable state. The total wall-clock time approaches max(query_a, query_b) instead of query_a + query_b.
When the queries are dependent — when query B uses a result from query A — do not parallelize them. Run them sequentially in the same session:
# CORRECT: dependent queries run sequentially in one sessionasync def get_order_with_user(order_id: int, session: AsyncSession): result = await session.execute( select(Order).where(Order.id == order_id) ) order = result.scalar_one_or_none() if order is None: raise OrderNotFoundError(order_id)
user_result = await session.execute( select(User).where(User.id == order.user_id) ) return order, user_result.scalar_one()Lazy Loading Is a Trap in Async SQLAlchemy
SQLAlchemy’s lazy loading fires a synchronous database query when you access a relationship attribute. In a synchronous context, this just works (and is part of why it is a common pitfall — it looks fine in sync code). In async SQLAlchemy, it raises a MissingGreenlet error because there is no event loop thread to execute the synchronous query on:
# N+1 bug in async context — raises MissingGreenletasync def list_orders(session: AsyncSession): result = await session.execute(select(Order)) orders = result.scalars().all() for order in orders: # Triggers a SYNC lazy load — raises: # sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called print(order.user.name)The fix is to use eager loading strategies — selectinload or joinedload — so the related objects are fetched as part of the original query:
# CORRECT: eager load with selectinloadasync def list_orders(session: AsyncSession): result = await session.execute( select(Order) .options(selectinload(Order.user)) .options(selectinload(Order.items).selectinload(OrderItem.book)) ) orders = result.scalars().all() # order.user and order.items are pre-loaded — no lazy queries, no MissingGreenlet for order in orders: print(order.user.name) # safe| Strategy | selectinload | joinedload |
|---|---|---|
| How it works | Runs a second SELECT IN query for the relationship | Uses SQL JOIN in the original query |
| SQL queries generated | 2 queries (parent + children batch) | 1 query (larger result set) |
| Async safe | Yes | Yes |
| Best for | Collections (one-to-many) | Single objects (many-to-one) |
| Memory usage | Lower (separate result sets) | Higher (cartesian product for collections) |
Use selectinload for one-to-many relationships (an order’s items, an author’s posts). Use joinedload for many-to-one relationships where the JOIN does not multiply rows (an order’s user, a post’s author).
Timeouts and Cancellation
A slow database query blocks the coroutine awaiting it and delays other requests that need the same event loop iteration. Production systems need query timeouts to prevent one slow query from cascading:
import asynciofrom sqlalchemy import selectfrom sqlalchemy.ext.asyncio import AsyncSessionfrom src.core.exceptions import ServiceTimeoutErrorfrom src.models import Bookfrom src.schemas.book import BookResponse
async def get_book_with_timeout(book_id: int, session: AsyncSession) -> BookResponse | None: try: async with asyncio.timeout(5.0): # Python 3.11+ result = await session.execute( select(Book).where(Book.id == book_id) ) book = result.scalar_one_or_none() return BookResponse.model_validate(book) if book else None except TimeoutError: await session.rollback() raise ServiceTimeoutError("Database query exceeded 5 second limit")Always rollback() the session after a timeout. A session that times out mid-transaction may have partial writes. Rolling back ensures the session is in a consistent state before being returned to the pool.
Background Tasks Without Blocking the Response
Some work should happen after the response is sent — not before. Sending a confirmation email, triggering a webhook, updating analytics counters. These do not affect the response body, so making the client wait for them is pure overhead.
FastAPI’s BackgroundTasks is the correct tool here:
from fastapi import BackgroundTasksfrom src.api.deps import OrderServiceDepfrom src.schemas.order import OrderCreate, OrderResponse
@router.post("/orders/")async def create_order( data: OrderCreate, background_tasks: BackgroundTasks, service: OrderServiceDep,) -> OrderResponse: order = await service.place_order(data) # Response is sent to the client immediately. # send_order_confirmation runs after — client does not wait. background_tasks.add_task(send_order_confirmation, order.id, data.user_email) return order
async def send_order_confirmation(order_id: int, email: str) -> None: # Runs in the same event loop after the response is returned. # Use a fresh session here — the request-scoped session is already closed. async with AsyncSessionLocal() as session: order = await session.get(Order, order_id) await email_client.send( to=email, subject=f"Order #{order_id} confirmed", body=render_confirmation_email(order), )BackgroundTasks runs the task in the same event loop as the request, but after the response is fully sent. It is appropriate for lightweight async tasks. For CPU-heavy work or tasks that must survive server restarts, use a task queue (Celery, ARQ) instead.
Detecting Blocking Code in an Async Server
The hardest async bugs are the ones that work locally and fail at scale — blocking code that appears to work because load is low. A synchronous time.sleep() or a requests.get() call inside an async def blocks the entire event loop, not just the current coroutine.
The tool to find these: asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) with debug mode enabled during testing:
# In your test setup or dev startup:import asyncioasyncio.get_event_loop().set_debug(True)# Logs a warning for coroutines that take longer than 0.1s# Also warns on blocking calls that hold the loop for > 0.1sThe most common sources of accidental blocking in FastAPI code:
requests.get()— usehttpx.AsyncClientinsteadopen()/Path.read_text()— useaiofilesfor I/O-heavy file work- CPU-heavy work — offload to
asyncio.run_in_executor(None, sync_fn)with a thread pool
Key Takeaways
- Async is concurrency, not parallelism. Python’s GIL prevents true parallel execution.
asynciomakes I/O-bound waiting efficient, not CPU-bound computation faster. AsyncSessionis not concurrent-safe. Never pass a shared session to coroutines insideasyncio.gather(). Give each coroutine its own session.- Lazy loading raises
MissingGreenletin async context. Useselectinloadfor one-to-many andjoinedloadfor many-to-one relationships in all async queries. - Separate sessions for
gather(). Independent queries can run in parallel safely — but each coroutine must own its session. The session factory is cheap. - Use
BackgroundTasksfor post-response work. Email, webhooks, analytics counters — none of these should block the client response.
Next: Part 8 brings everything together — a complete testing strategy for the BookStore API covering unit tests (with fakes), integration tests (with a real test database), and API-level tests with TestClient and dependency overrides.