Before diving into each tool, understand how they fit together. In production you never run uvicorn main:app raw — you layer three processes in front of your FastAPI app:
Each worker is an independent OS process with its own Python interpreter and event loop. Multiple workers let you use all CPU cores and handle many concurrent requests.
event loop
event loop
event loop
event loop
(2 × CPU cores) + 1. On a 4-core machine that's 9 workers. For IO-heavy async apps, fewer workers still handle thousands of concurrent connections because of async/await — start with CPU cores + 1.# Development — single worker, auto-reload on file change uvicorn main:app --reload --host 0.0.0.0 --port 8000 # ⚠️ --reload forks a second process to watch files. # Never use --reload in production — it's slow and unsafe.
--reload watches your file system and restarts Uvicorn when any .py file changes. It is a development convenience only. In production, you achieve zero-downtime updates via Gunicorn's graceful reload (covered in 21.2).
# Reload only specific directories (faster) uvicorn main:app --reload --reload-dir app/ # Reload delay — avoid thrashing on rapid saves uvicorn main:app --reload --reload-delay 0.5
Uvicorn ships with two event loop implementations. uvloop is a drop-in replacement built on libuv (same engine as Node.js) — it's 2–4× faster than Python's default asyncio event loop for IO-bound tasks.
# Standard install pip install uvicorn # With uvloop + httptools (faster HTTP parsing) — recommended for prod pip install "uvicorn[standard]" # Installs: uvloop, httptools, websockets, watchfiles
# Full production command (but prefer Gunicorn — see 21.2) uvicorn main:app \ --host 0.0.0.0 \ --port 8000 \ --workers 5 \ --loop uvloop \ --http httptools \ --log-level warning \ --access-log \ --proxy-headers \ # Trust X-Forwarded-For from Nginx --forwarded-allow-ips "*"
--proxy-headers makes Uvicorn trust X-Forwarded-For from Nginx. Only enable this when Nginx is in front of Uvicorn — if exposed directly to the internet, it lets clients spoof their IP address.Gunicorn normally manages sync workers (for Django/Flask). For FastAPI, you tell Gunicorn to spawn Uvicorn workers using the UvicornWorker class. This gives you Gunicorn's process management with Uvicorn's async speed.
# The magic flag: -k uvicorn.workers.UvicornWorker gunicorn main:app \ -k uvicorn.workers.UvicornWorker \ --workers 5 \ --bind 0.0.0.0:8000 \ --timeout 120 \ --graceful-timeout 30 \ --keep-alive 5 \ --log-level warning \ --access-logfile - # log to stdout # For uvloop support use UvicornH11Worker or set loop=uvloop in config gunicorn main:app -k uvicorn.workers.UvicornWorker --worker-connections 1000
| Worker Class | Use Case | Async? |
|---|---|---|
sync (default) | Django, Flask (WSGI) | No |
UvicornWorker | FastAPI, Starlette (ASGI) | Yes |
gevent | Legacy async WSGI | Partial |
Put all Gunicorn configuration in a gunicorn.conf.py file instead of passing long command-line flags. This file is Python, so you can calculate worker count dynamically.
import multiprocessing # Server socket bind = "0.0.0.0:8000" backlog = 2048 # pending connections queue # Workers workers = multiprocessing.cpu_count() * 2 + 1 worker_class = "uvicorn.workers.UvicornWorker" worker_connections = 1000 # max concurrent connections per worker threads = 1 # keep at 1 for async workers # Timeouts timeout = 120 # kill worker if no response in 120s graceful_timeout = 30 # give worker 30s to finish requests before kill keepalive = 5 # keep idle connections alive 5s # Restarts (prevent memory leaks) max_requests = 1000 # restart worker after 1000 requests max_requests_jitter = 50 # add random jitter so workers don't all restart at once # Logging accesslog = "-" # stdout errorlog = "-" loglevel = "warning" # Security limit_request_line = 4094 limit_request_fields = 100
gunicorn main:app -c gunicorn.conf.py # Zero-downtime reload (sends HUP signal to master process) kill -HUP $(cat /tmp/gunicorn.pid) # Gunicorn starts new workers, drains old ones, then kills them
max_requests + max_requests_jitter is the production trick that prevents slow memory leaks. A worker that has handled 1000–1050 requests is gracefully replaced with a fresh one. Without this, a leak accumulates until the server runs out of RAM.Nginx sits in front of Gunicorn and acts as a reverse proxy. Clients talk to Nginx on port 80/443; Nginx forwards to Gunicorn on a local port (or Unix socket). Benefits: SSL termination, connection buffering (protects slow Gunicorn workers from slow clients), static file serving, and request rate limiting.
port 443
SSL + proxy
port 8000
workers
# Upstream — your Gunicorn process(es) upstream fastapi_app { server 127.0.0.1:8000; # Multiple Gunicorn instances (different machines / ports) # server 127.0.0.1:8001; # server 127.0.0.1:8002; keepalive 32; # keep 32 connections open to backend } # Redirect HTTP → HTTPS server { listen 80; server_name api.example.com; return 301 https://$host$request_uri; } server { listen 443 ssl http2; server_name api.example.com; # SSL (use certbot/Let's Encrypt in practice) ssl_certificate /etc/letsencrypt/live/api.example.com/fullchain.pem; ssl_certificate_key /etc/letsencrypt/live/api.example.com/privkey.pem; ssl_protocols TLSv1.2 TLSv1.3; # Security headers add_header X-Frame-Options DENY; add_header X-Content-Type-Options nosniff; # Proxy to Gunicorn location / { proxy_pass http://fastapi_app; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; # needed for WebSockets proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # Timeouts proxy_connect_timeout 5s; proxy_read_timeout 120s; } # Serve static files directly (bypass Python entirely) location /static/ { alias /app/static/; expires 30d; } }
Never manage SSL certificates manually in production. Use Certbot to get free Let's Encrypt certificates that auto-renew every 90 days.
# Install certbot apt install certbot python3-certbot-nginx # Get certificate + auto-configure Nginx certbot --nginx -d api.example.com # Test auto-renewal (runs via cron/systemd timer) certbot renew --dry-run
When you have multiple servers (horizontal scaling), Nginx distributes requests across them. Three main strategies:
# 1. Round Robin (default) — requests go to each server in turn upstream api_servers { server 10.0.0.1:8000; server 10.0.0.2:8000; server 10.0.0.3:8000; } # 2. Least Connections — send to server with fewest active connections upstream api_servers { least_conn; server 10.0.0.1:8000; server 10.0.0.2:8000; } # 3. IP Hash — same client always goes to same server (sticky sessions) upstream api_servers { ip_hash; server 10.0.0.1:8000; server 10.0.0.2:8000; } # Weighted — server 1 gets 3× more traffic than server 2 upstream api_servers { server 10.0.0.1:8000 weight=3; server 10.0.0.2:8000 weight=1; }
Rate limiting prevents abuse by capping how many requests a client can make in a time window. You can rate-limit at two levels: Nginx (very fast, before Python runs) and FastAPI middleware (more flexible, per-user or per-route).
Level 1 — Nginx rate limiting (edge, cheapest):
# Define a shared memory zone — tracks request rates per IP # 10m = 10 MB of shared memory (~160,000 IPs); rate = 10 req/second limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s; server { location /api/ { # Allow burst of 20 extra requests, then start delaying limit_req zone=api_limit burst=20 nodelay; limit_req_status 429; # return HTTP 429 Too Many Requests proxy_pass http://fastapi_app; } }
Level 2 — FastAPI middleware (fine-grained, per-user):
import time from fastapi import Request from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse import redis.asyncio as redis redis_client = redis.from_url("redis://localhost") class RateLimitMiddleware(BaseHTTPMiddleware): def __init__(self, app, limit: int = 100, window: int = 60): super().__init__(app) self.limit = limit # max requests self.window = window # per N seconds async def dispatch(self, request: Request, call_next): # Use JWT user_id if authenticated, else fall back to IP client_key = request.headers.get("X-User-ID") or request.client.host key = f"rate:{client_key}" pipe = redis_client.pipeline() await pipe.incr(key) await pipe.expire(key, self.window) results = await pipe.execute() count = results[0] if count > self.limit: return JSONResponse( status_code=429, content={"detail": "Rate limit exceeded. Try again later."}, headers={"Retry-After": str(self.window)}, ) response = await call_next(request) response.headers["X-RateLimit-Limit"] = str(self.limit) response.headers["X-RateLimit-Remaining"] = str(max(0, self.limit - count)) return response # Add to app app.add_middleware(RateLimitMiddleware, limit=100, window=60)
Retries handle transient failures: a DB connection blip, a momentary network hiccup, a downstream API returning 503. The key rule: always use exponential backoff with jitter — don't hammer a struggling service at a fixed rate.
import asyncio, random from typing import TypeVar, Callable, Awaitable T = TypeVar("T") async def retry_async( fn: Callable[[], Awaitable[T]], *, max_attempts: int = 3, base_delay: float = 0.5, # seconds max_delay: float = 10.0, exceptions: tuple = (Exception,), ) -> T: for attempt in range(max_attempts): try: return await fn() except exceptions as exc: if attempt == max_attempts - 1: raise # last attempt — re-raise delay = min(base_delay * (2 ** attempt), max_delay) jitter = delay * random.uniform(0.8, 1.2) # ±20% jitter await asyncio.sleep(jitter) # Usage in a FastAPI route @app.get("/orders/{order_id}") async def get_order(order_id: int): return await retry_async( lambda: payment_service.get_order(order_id), max_attempts=3, exceptions=(ConnectionError, TimeoutError), )
# pip install tenacity from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import httpx @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5, min=0.5, max=10), retry=retry_if_exception_type(httpx.HTTPStatusError), ) async def call_payment_api(order_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"https://payments.internal/orders/{order_id}") response.raise_for_status() return response.json()
Retries are for transient failures. But if a service is truly down, retrying makes things worse — you pile up requests, exhaust connection pools, and slow down your own API. A circuit breaker detects sustained failures and stops calling the broken service for a cooldown period.
import asyncio, time from enum import Enum class State(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" class CircuitBreaker: def __init__(self, failure_threshold=5, recovery_timeout=30): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.failure_count = 0 self.last_failure_time = None self.state = State.CLOSED async def call(self, fn, *args, **kwargs): if self.state == State.OPEN: if time.time() - self.last_failure_time > self.recovery_timeout: self.state = State.HALF_OPEN # allow one probe else: raise Exception("Circuit OPEN — service unavailable") try: result = await fn(*args, **kwargs) self._on_success() return result except Exception: self._on_failure() raise def _on_success(self): self.failure_count = 0 self.state = State.CLOSED def _on_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = State.OPEN # Usage payment_cb = CircuitBreaker(failure_threshold=5, recovery_timeout=30) @app.post("/checkout") async def checkout(order_id: int): try: result = await payment_cb.call(payment_service.charge, order_id) return result except Exception: return {"status": "payment_service_unavailable", "retry_after": 30}
pip install circuitbreaker) or pybreaker — they add thread safety, metrics callbacks, and Redis-backed shared state across multiple workers.Every external call — HTTP requests, DB queries, Redis lookups — must have a timeout. Without one, a single slow dependency can exhaust all your worker threads and bring down your entire API.
import asyncio import httpx from sqlalchemy.ext.asyncio import AsyncSession # 1. HTTP client timeout (all operations) async with httpx.AsyncClient(timeout=httpx.Timeout( connect=2.0, # connection establishment read=10.0, # waiting for response body write=5.0, # sending request body pool=2.0, # waiting for a connection from pool )) as client: response = await client.get("https://payments.internal/charge") # 2. asyncio.wait_for — timeout any coroutine try: result = await asyncio.wait_for( slow_db_query(), timeout=5.0 # 5 second timeout ) except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="Database timeout") # 3. Per-route timeout middleware from starlette.middleware.base import BaseHTTPMiddleware class TimeoutMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): try: return await asyncio.wait_for(call_next(request), timeout=30.0) except asyncio.TimeoutError: from starlette.responses import JSONResponse return JSONResponse({"detail": "Request timed out"}, status_code=504) app.add_middleware(TimeoutMiddleware)
| Layer | What to set | Typical value |
|---|---|---|
Nginx proxy_read_timeout | Max time to wait for Gunicorn response | 120s |
Gunicorn timeout | Kill worker if request takes longer | 120s |
| httpx client read timeout | Max wait for external HTTP response | 10–30s |
| SQLAlchemy pool timeout | Max wait for a DB connection from pool | 5–10s |
| Redis timeout | Max wait for Redis command | 1–2s |