Topic 2 · Section 2.1
The Event Loop
The event loop is the beating heart of every FastAPI application. It's a single-threaded loop that juggles thousands of concurrent connections by cleverly switching between tasks whenever one is waiting for I/O. Understanding it explains every async design decision FastAPI makes.
2.1.1
Event Loop Fundamentals
Event Loop Architecture
▶
The event loop is a single loop that runs forever, continuously checking: "Is there a task ready to run? Run it. Did it pause to wait for I/O? Switch to the next task." This is called cooperative multitasking — tasks voluntarily yield control using
await.
Event Loop — Core Cycle
┌─────────────────────────────────────────────────────────┐
│ EVENT LOOP │
│ │
│ ┌──────────┐ ┌──────────┐ ┌───────────────────┐ │
│ │ Ready │───▶│ Run │───▶│ Task hits `await` │ │
│ │ Queue │ │ callback │ │ → registers I/O │ │
│ └──────────┘ └──────────┘ └────────┬──────────┘ │
│ ▲ │ │
│ ┌────┴──────┐ ┌────────▼──────────┐ │
│ │ I/O done │◀──────────────────│ OS / selector │ │
│ │ → wake up │ │ (epoll / kqueue) │ │
│ └───────────┘ └───────────────────┘ │
└─────────────────────────────────────────────────────────┘
One thread. No blocking. Thousands of concurrent I/O ops.
Why FastAPI uses this: A traditional web server blocks one thread per request waiting for DB/HTTP. With the event loop, a single thread handles thousands of requests simultaneously — whenever one waits for a DB query, another runs.
pythonevent_loop_basics.py
import asyncio # Get or create the event loop async def main(): loop = asyncio.get_event_loop() # get running loop print(f"Loop running: {loop.is_running()}") # True (we're inside it) # Schedule a callback to run in the NEXT loop iteration loop.call_soon(lambda: print("Scheduled callback ran!")) # Schedule a callback after a delay loop.call_later(1.0, lambda: print("Ran after 1 second")) # Yield to let other tasks run (+ let callbacks fire) await asyncio.sleep(0) # ← zero sleep = yield to event loop # Entry point: runs the event loop until main() completes asyncio.run(main()) # In FastAPI — Uvicorn creates and manages the event loop for you: # uvicorn main:app --host 0.0.0.0 --port 8000 # Uvicorn runs the event loop, FastAPI registers route handlers as coroutines.
Cooperative Multitasking
▶
Unlike OS threads (preemptive — OS forcibly switches), asyncio uses cooperative multitasking — a coroutine runs until it explicitly yields with
await. This means: if you never await, you block the entire event loop for every other request.
Critical FastAPI pitfall: Calling
time.sleep(), a blocking DB driver (psycopg2), or any CPU-heavy code inside an async def handler BLOCKS the event loop — all other requests stall until it finishes. Always use async alternatives or offload to a thread pool.pythoncooperative_multitasking.py
import asyncio, time # ❌ WRONG — blocks the event loop (request B waits for request A's sleep) async def bad_handler(): time.sleep(2) # BLOCKING — freezes the entire loop! return {"status": "done"} # ✅ CORRECT — yields to the event loop during the wait async def good_handler(): await asyncio.sleep(2) # NON-BLOCKING — other requests run during this return {"status": "done"} # Demo: cooperative switching between two coroutines async def task_a(): print("A: starting") await asyncio.sleep(1) # yields here → task_b gets to run print("A: done") async def task_b(): print("B: starting") await asyncio.sleep(0.5) # yields → but finishes before A print("B: done") async def main(): await asyncio.gather(task_a(), task_b()) asyncio.run(main()) # Output: # A: starting # B: starting ← B runs while A is sleeping # B: done ← B finishes first (0.5s) # A: done ← A finishes later (1s)
2.1.2
Scheduling — Ready Queue & Delayed Queue
Ready Queue, Delayed Queue & Task Switching
▶
The event loop maintains two internal queues:
• Ready queue — callbacks/coroutines that can run right now (I/O completed, sleep expired, freshly created tasks).
• Delayed queue — callbacks scheduled for a future time via
Each loop iteration: run everything in the ready queue, then check the OS for I/O events, then promote any elapsed delayed callbacks to the ready queue.
• Ready queue — callbacks/coroutines that can run right now (I/O completed, sleep expired, freshly created tasks).
• Delayed queue — callbacks scheduled for a future time via
call_later() or asyncio.sleep(n). A heap (min-priority queue) sorted by scheduled time.
Each loop iteration: run everything in the ready queue, then check the OS for I/O events, then promote any elapsed delayed callbacks to the ready queue.
pythonscheduling_internals.py
import asyncio async def demonstrate_scheduling(): loop = asyncio.get_event_loop() # call_soon → added to READY queue immediately loop.call_soon(lambda: print("[ready] I run next iteration")) # call_later → added to DELAYED queue, moved to ready after 0.1s loop.call_later(0.1, lambda: print("[delayed] I run after 0.1s")) loop.call_later(0.5, lambda: print("[delayed] I run after 0.5s")) # call_at → schedule at an absolute event loop time now = loop.time() loop.call_at(now + 0.2, lambda: print("[delayed] I run at loop_time+0.2")) # asyncio.sleep internally uses call_later await asyncio.sleep(1) # wait for all callbacks to fire # Task switching: every `await` is a potential switch point async def show_switch_points(): print("1: before await") await asyncio.sleep(0) # switch point — other tasks can run here print("2: after first await") await asyncio.sleep(0) # another switch point print("3: done") asyncio.run(demonstrate_scheduling())
Topic 2 · Section 2.2
Coroutines
Coroutines are the unit of concurrency in FastAPI. Every route handler you define with
async def is a coroutine. Understanding how they're created, suspended, and resumed explains exactly how your API handles concurrent requests.
2.2.1
async — Coroutine Creation & Lifecycle
Coroutine Creation
▶
Adding
async def instead of def turns a function into a coroutine function. Calling it doesn't run it — it returns a coroutine object. The event loop runs it when you await it or wrap it in a Task.
async def fn()
→
fn() called
→
coroutine object
→
await / Task
→
executes
pythoncoroutine_creation.py
import asyncio, inspect # 1. Define a coroutine function async def fetch_user(user_id: int) -> dict: await asyncio.sleep(0.1) # simulates DB/HTTP call return {"id": user_id, "name": "Alice"} # 2. Calling it does NOT run the code — returns a coroutine object coro = fetch_user(42) print(type(coro)) # <class 'coroutine'> print(inspect.iscoroutine(coro)) # True # 3. To actually run it, you must await it or pass to asyncio.run() async def main(): result = await fetch_user(42) # NOW it executes print(result) # {'id': 42, 'name': 'Alice'} asyncio.run(main()) # ── FastAPI route: every handler is a coroutine ── # @app.get("/users/{user_id}") # async def get_user(user_id: int): ← coroutine function # user = await user_service.get(user_id) ← suspends here, other requests run # return user
Coroutine Lifecycle
▶
A coroutine goes through four states: Created → Running → Suspended (at every
await) → Closed (returned or raised). You can inspect these states programmatically.
Coroutine Lifecycle
CREATED RUNNING SUSPENDED CLOSED
──────── ─────── ───────── ──────
coro = fn() → await coro → hits await → return/raise
(not started) (executing) (waiting I/O) (done)
│ │
└───────────────┘
can oscillate many times
before finally closing
python
import asyncio, inspect async def my_coro(): await asyncio.sleep(0.1) return "done" async def inspect_lifecycle(): coro = my_coro() print(coro.cr_frame) # frame object (not None = not yet started) print(inspect.getcoroutinestate(coro)) # CORO_CREATED task = asyncio.create_task(coro) await asyncio.sleep(0) # let it start print(inspect.getcoroutinestate(coro)) # CORO_SUSPENDED (at sleep) result = await task print(inspect.getcoroutinestate(coro)) # CORO_CLOSED print(result) # "done" asyncio.run(inspect_lifecycle())
2.2.2
await — Suspension & Resumption
How await suspends and resumes
▶
await expr does three things:
1. Suspends the current coroutine (saves its state/frame).
2. Gives control back to the event loop — other coroutines can now run.
3. Resumes this coroutine once the awaited thing completes, with the result.
You can only
await on awaitable objects: coroutines, Tasks, Futures, or objects implementing __await__.
pythonawait_mechanics.py
import asyncio async def slow_db_query(query: str) -> list: print(f" DB: executing '{query}'") await asyncio.sleep(0.2) # simulates 200ms DB latency print(f" DB: '{query}' done") return ["row1", "row2"] async def handle_request_a(): print("Request A: start") rows = await slow_db_query("SELECT * FROM users") # ↑ SUSPENDS here. Event loop runs request B while waiting. print(f"Request A: got {len(rows)} rows") async def handle_request_b(): print("Request B: start") rows = await slow_db_query("SELECT * FROM orders") print(f"Request B: got {len(rows)} rows") async def main(): # Both run CONCURRENTLY — total time ~0.2s, not 0.4s await asyncio.gather( handle_request_a(), handle_request_b(), ) asyncio.run(main()) # What you CANNOT await (runtime TypeError): # await 42 → not awaitable # await "hello" → not awaitable # await time.sleep → not awaitable (use asyncio.sleep!)
FastAPI rule: Use
async def for route handlers that do I/O (DB, HTTP, file). Use plain def for CPU-only handlers (FastAPI runs those in a thread pool automatically).Topic 2 · Section 2.3
Tasks
Tasks are scheduled coroutines — they run concurrently in the background without you having to
await them immediately. Mastering tasks lets you run multiple DB queries in parallel, fire background jobs, and implement timeouts in FastAPI routes.
2.3.1
Task Management
create_task — Fire and Concurrently Run
▶
asyncio.create_task(coro) wraps a coroutine in a Task and schedules it to run on the event loop immediately — without waiting for it. This is how you achieve true concurrency in FastAPI: kick off multiple async operations simultaneously.
pythoncreate_task.py
import asyncio, time async def fetch_user(uid: int): await asyncio.sleep(0.2) return {"id": uid, "name": "Alice"} async def fetch_orders(uid: int): await asyncio.sleep(0.3) return ["order1", "order2"] async def fetch_stats(uid: int): await asyncio.sleep(0.1) return {"logins": 42} async def get_user_dashboard(uid: int): start = time.time() # ❌ Sequential — total time: 0.2 + 0.3 + 0.1 = 0.6s # user = await fetch_user(uid) # orders = await fetch_orders(uid) # stats = await fetch_stats(uid) # ✅ Parallel with create_task — total time: max(0.2, 0.3, 0.1) = 0.3s task_user = asyncio.create_task(fetch_user(uid)) task_orders = asyncio.create_task(fetch_orders(uid)) task_stats = asyncio.create_task(fetch_stats(uid)) # Now await their results user = await task_user orders = await task_orders stats = await task_stats print(f"Done in {time.time()-start:.2f}s") # ~0.3s return {"user": user, "orders": orders, "stats": stats} asyncio.run(get_user_dashboard(1))
gather, wait & as_completed
▶
Three patterns for running multiple coroutines concurrently — each with different trade-offs for when you need all results, partial results on failure, or streaming results as they arrive.
pythongather_wait_as_completed.py
import asyncio async def work(name: str, delay: float): await asyncio.sleep(delay) return f"{name} done" async def main(): # ── gather() ───────────────────────────────────────── # Runs all, waits for ALL to finish, returns list in order. # If one raises, by default it cancels others. results = await asyncio.gather( work("A", 0.1), work("B", 0.3), work("C", 0.2), return_exceptions=True # exceptions become values, not raised ) print(results) # ['A done', 'B done', 'C done'] ← in input order # ── wait() ─────────────────────────────────────────── # More control: separate done from not-done sets. tasks = {asyncio.create_task(work(n, d)) for n, d in [("X",.1),("Y",.5)]} done, pending = await asyncio.wait(tasks, timeout=0.2) print(f"done: {len(done)}, pending: {len(pending)}") for t in pending: t.cancel() # cancel tasks that didn't finish in time # ── as_completed() ─────────────────────────────────── # Yields results as they finish (fastest first). # Great for: "show first result immediately, load rest after" coros = [work("fast", 0.1), work("slow", 0.5), work("mid", 0.3)] async for fut in asyncio.as_completed(coros): # Python 3.13+ async for result = await fut print(f"Got: {result}") # fast → mid → slow asyncio.run(main())
2.3.2
Cancellation & Timeout Handling
Cancel Signals, Cleanup & Timeouts
▶
Cancelling a task injects a
CancelledError at the next await point. Tasks can catch it for cleanup. asyncio.timeout() (Python 3.11+) or asyncio.wait_for() add deadline enforcement — critical for preventing runaway requests in production FastAPI.
pythoncancellation_timeout.py
import asyncio # ── Manual cancellation ────────────────────────────────── async def long_running(): try: print("working...") await asyncio.sleep(10) # long operation print("done") except asyncio.CancelledError: print("cleanup: releasing resources") raise # ALWAYS re-raise CancelledError! async def cancel_demo(): task = asyncio.create_task(long_running()) await asyncio.sleep(0.5) # let it start task.cancel() # inject CancelledError try: await task except asyncio.CancelledError: print("task was cancelled") # ── Timeout with wait_for (works in all Python 3.7+) ───── async def timeout_demo(): try: result = await asyncio.wait_for( long_running(), timeout=2.0 # raise TimeoutError after 2s ) except asyncio.TimeoutError: print("Request timed out after 2s") # ── asyncio.timeout context manager (Python 3.11+) ─────── async def timeout_context(): try: async with asyncio.timeout(2.0): await asyncio.sleep(10) # will be cancelled except TimeoutError: print("timed out!") # ── FastAPI timeout pattern ─────────────────────────────── # @app.get("/data") # async def get_data(): # try: # result = await asyncio.wait_for(slow_service(), timeout=5.0) # return result # except asyncio.TimeoutError: # raise HTTPException(504, "Gateway Timeout") asyncio.run(cancel_demo())
Topic 2 · Section 2.4
Concurrency Models
Not all work is equal. I/O-bound tasks (waiting for DB, HTTP, files) are handled beautifully by async. CPU-bound tasks (image processing, ML inference, heavy computation) need threads or processes — or they'll block your event loop and kill throughput.
2.4.1
I/O Bound — HTTP, DB calls, File operations
Async I/O — The Right Tool for FastAPI
▶
I/O-bound operations spend most of their time waiting for an external system (database, HTTP server, file system). During that wait, your CPU is idle.
async/await lets you use that idle time to serve other requests — this is the core value of FastAPI's async architecture.
❌ Sync / Blocking
- Thread waits idle during I/O
- Need 1 thread per concurrent request
- 100 concurrent → 100 threads → high memory
- Thread switching overhead
✅ Async / Non-blocking
- Loop handles other requests during I/O wait
- 1 thread → 10,000+ concurrent I/O ops
- Low memory, no thread switching
- Native FastAPI model — zero config
pythonasync_io_fastapi.py
import asyncio import httpx # async HTTP client import aiofiles # async file I/O from sqlalchemy.ext.asyncio import AsyncSession # ── Async HTTP requests (non-blocking) ── async def fetch_github_user(username: str) -> dict: async with httpx.AsyncClient() as client: resp = await client.get(f"https://api.github.com/users/{username}") resp.raise_for_status() return resp.json() # ── Async DB query (non-blocking) ── async def get_users(db: AsyncSession) -> list: result = await db.execute(select(User)) return result.scalars().all() # ── Async file read (non-blocking) ── async def read_config(path: str) -> str: async with aiofiles.open(path, 'r') as f: return await f.read() # ── FastAPI: run all three concurrently ── async def dashboard_endpoint(username: str, db: AsyncSession): gh_user, db_users, config = await asyncio.gather( fetch_github_user(username), # async HTTP — non-blocking get_users(db), # async DB — non-blocking read_config("config.yaml"), # async file — non-blocking ) return {"github": gh_user, "users": db_users}
2.4.2
CPU Bound — ThreadPoolExecutor & ProcessPoolExecutor
Offloading CPU Work Without Blocking the Loop
▶
CPU-bound work (image resizing, password hashing, ML inference, data processing) runs Python bytecode and is bound by the GIL. Running it in an
Solution: offload to a thread pool (for blocking I/O or light CPU) or a process pool (for heavy CPU that bypasses the GIL) using
async def function blocks the event loop.
Solution: offload to a thread pool (for blocking I/O or light CPU) or a process pool (for heavy CPU that bypasses the GIL) using
loop.run_in_executor().
FastAPI shortcut: If you define a route with
def (not async def), FastAPI automatically runs it in a thread pool. Only do this for truly synchronous blocking code — don't use it as a lazy alternative to async.
pythoncpu_bound_offload.py
import asyncio from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from functools import partial import hashlib, time # ── Synchronous CPU-heavy function ── def hash_password(password: str) -> str: # bcrypt is CPU-intensive — takes ~100ms return hashlib.pbkdf2_hmac('sha256', password.encode(), b'salt', 100_000).hex() def heavy_compute(n: int) -> int: return sum(i*i for i in range(n)) async def main(): loop = asyncio.get_event_loop() # ── ThreadPoolExecutor ────────────────────────────── # Good for: blocking I/O (sync DB drivers), light CPU work # GIL is still held, but I/O ops release it with ThreadPoolExecutor(max_workers=4) as pool: hashed = await loop.run_in_executor( pool, hash_password, "mysecretpassword" ) print(f"Hash: {hashed[:20]}...") # ── ProcessPoolExecutor ───────────────────────────── # Good for: heavy CPU (ML inference, image processing) # Spawns separate processes → bypasses GIL with ProcessPoolExecutor(max_workers=2) as pool: result = await loop.run_in_executor( pool, heavy_compute, 10_000_000 ) print(f"Sum: {result}") # ── asyncio.to_thread (Python 3.9+, simpler syntax) ─ hashed = await asyncio.to_thread(hash_password, "mypassword") print(f"to_thread hash: {hashed[:20]}...") asyncio.run(main()) # ── FastAPI patterns ──────────────────────────────────── # # Pattern 1: auto thread pool with plain def # @app.post("/login") # def login(data: LoginSchema): ← FastAPI runs in thread pool # hashed = hash_password(data.password) ← safe: not blocking event loop # ... # # Pattern 2: explicit offload in async def # @app.post("/process-image") # async def process_image(file: UploadFile): # data = await file.read() # result = await asyncio.to_thread(resize_image, data, (800, 600)) # return {"size": len(result)}
ProcessPoolExecutor caution: Each process has its own memory space — you can't share Python objects directly. Use it only for pure functions that take serializable input/output (numbers, bytes, strings). Heavy ML models → load once per process with an initializer.
Decision Map — Which Concurrency Tool?
▶
Concurrency Decision Tree for FastAPI
Your task is...
Waiting for DB / HTTP / file?
└─▶ async def + await (async SQLAlchemy, httpx, aiofiles)
Using a sync library that blocks (psycopg2, requests)?
└─▶ asyncio.to_thread() or ThreadPoolExecutor
Pure CPU work (hashing, image resize, <100ms)?
└─▶ asyncio.to_thread() (simple, uses thread pool)
Heavy CPU work (ML inference, video encode, >1s)?
└─▶ ProcessPoolExecutor (bypasses GIL, separate process)
Long-running background job (email, report)?
└─▶ FastAPI BackgroundTasks (Topic 13)
or Celery / ARQ (Topic 13.2)
Plain sync code that is short / not I/O?
└─▶ plain def handler (FastAPI auto-threads it)