FastAPI Mastery
Topic 19 of 22 — Observability
86.4% complete
Topic 19 of 22
Observability
You can't fix what you can't see. Observability is the three-pillar discipline of Logging (what happened), Metrics (how much / how fast), and Tracing (why it was slow). Master all three and you'll diagnose production issues in minutes instead of hours.
Request ──► FastAPI ──► Response │ ┌────────┼──────────────────────┐ │ │ Observability │ │ ▼ │ │ 📋 LOGS │ ← What happened? (text events) │ "POST /orders 201 45ms" │ │ │ │ 📊 METRICS │ ← How much/fast? (numbers over time) │ requests_total = 1042 │ │ p99_latency = 120ms │ │ │ │ 🔍 TRACES │ ← Why slow? (request journey) │ [DB 40ms][Auth 5ms] │ └───────────────────────────────┘
19.1
Logging
📋
Setup & Configuration

Python's built-in logging module is the standard. FastAPI (via Uvicorn) already emits access logs — you configure how detailed they are, where they go, and what format they use.

Log levels control what gets shown — from most verbose to most critical:

DEBUG

Fine-grained detail. Only enable in development — very noisy.

INFO

Normal app events — requests handled, startup complete, etc.

WARNING

Something unexpected but not breaking. Retried DB call, etc.

ERROR

Something failed. Log and investigate promptly.

CRITICAL

System-level failure. App may be unresponsive. Page your on-call.

Python — logging_config.py
import logging
import sys

def setup_logging(level: str = "INFO"):
    logging.basicConfig(
        level=getattr(logging, level),
        format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        handlers=[logging.StreamHandler(sys.stdout)],
    )

# main.py
from fastapi import FastAPI
from logging_config import setup_logging

setup_logging()
logger = logging.getLogger(__name__)

app = FastAPI()

@app.on_event("startup")
async def on_startup():
    logger.info("🚀 App started successfully")

@app.get("/orders/{order_id}")
async def get_order(order_id: int):
    logger.debug("Fetching order %s", order_id)
    if order_id == 0:
        logger.warning("Attempted to fetch order ID 0 — invalid")
        return {"error": "invalid id"}
    logger.info("Order %s fetched OK", order_id)
    return {"order_id": order_id, "status": "shipped"}
2024-03-15 09:12:01 | INFO | main | 🚀 App started successfully 2024-03-15 09:12:05 | DEBUG | main | Fetching order 42 2024-03-15 09:12:05 | INFO | main | Order 42 fetched OK 2024-03-15 09:12:09 | WARNING | main | Attempted to fetch order ID 0 — invalid
Use named loggers per module: logger = logging.getLogger(__name__). This makes log output show exactly which file the message came from — invaluable in large apps.
Configuring Uvicorn log level from CLI:
bash
# Development — verbose
uvicorn main:app --log-level debug

# Production — only warnings and above
uvicorn main:app --log-level warning

# Or via environment variable
LOG_LEVEL=info uvicorn main:app
🏗️
Structured Logging (JSON)

Plain text logs are readable by humans but hard for machines to search. Structured logging emits JSON — every log entry is a machine-parseable object that log aggregators (Datadog, Loki, CloudWatch) can index, filter, and alert on.

bash — install structlog
pip install structlog
Python — structured_logging.py
import structlog
import logging

def setup_structlog():
    structlog.configure(
        processors=[
            structlog.stdlib.add_log_level,
            structlog.stdlib.add_logger_name,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.JSONRenderer(),   # ← outputs JSON!
        ],
        wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        logger_factory=structlog.PrintLoggerFactory(),
    )

setup_structlog()
log = structlog.get_logger()

# Usage — add key-value context to any log call
log.info("order.created",
    order_id=42,
    user_id="usr_abc",
    total=99.99,
    currency="USD"
)

log.error("payment.failed",
    order_id=42,
    reason="card_declined",
    attempt=2
)
{"event": "order.created", "level": "info", "timestamp": "2024-03-15T09:12:01Z", "order_id": 42, "user_id": "usr_abc", "total": 99.99} {"event": "payment.failed", "level": "error", "timestamp": "2024-03-15T09:12:03Z", "order_id": 42, "reason": "card_declined", "attempt": 2}
💡
With JSON logs you can run queries like level=error AND order_id=42 directly in your log platform — no regex parsing needed. This turns minutes of grep-ing into a one-second search.
🔀
Request Logging Middleware

Log every incoming request and outgoing response automatically — method, path, status code, duration, and a unique request ID for correlating all log lines belonging to one request.

Python — middleware/logging.py
import time, uuid, logging
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger("api.access")

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Generate unique ID for this request
        request_id = str(uuid.uuid4())[:8]
        start_time = time.perf_counter()

        # Attach request_id to the request state (usable in route handlers)
        request.state.request_id = request_id

        logger.info(
            "[%s] → %s %s",
            request_id, request.method, request.url.path
        )

        try:
            response = await call_next(request)
        except Exception as exc:
            logger.error("[%s] ✗ UNHANDLED %s", request_id, exc)
            raise

        duration_ms = (time.perf_counter() - start_time) * 1000
        logger.info(
            "[%s] ← %s %s %d (%.1fms)",
            request_id, request.method, request.url.path,
            response.status_code, duration_ms
        )

        # Add request ID to response header so clients can reference it
        response.headers["X-Request-ID"] = request_id
        return response

# Register in main.py
app.add_middleware(RequestLoggingMiddleware)
2024-03-15 09:12:01 | INFO | api.access | [a3f9bc12] → POST /orders 2024-03-15 09:12:01 | INFO | main | Order validated, calling payment service 2024-03-15 09:12:01 | INFO | api.access | [a3f9bc12] ← POST /orders 201 (43.2ms) 2024-03-15 09:12:05 | INFO | api.access | [b7de1a3f] → GET /orders/999 2024-03-15 09:12:05 | INFO | api.access | [b7de1a3f] ← GET /orders/999 404 (2.1ms)
Notice how [a3f9bc12] appears in every log line for that request. When you have 1000 concurrent requests you can filter by ID and see the complete story for just one of them.
19.2
Metrics
📊
Prometheus Integration

Prometheus is the industry-standard metrics system. It periodically scrapes your app's /metrics endpoint and stores time-series data. Grafana then visualises it as dashboards and alerts.

FastAPI App Prometheus Grafana ┌──────────┐ scrape ┌──────────┐ query ┌──────────┐ │ /metrics │ ◄──────── │ Server │ ───────► │Dashboard │ │ (text) │ every 15s │ (TSDB) │ │ & Alerts │ └──────────┘ └──────────┘ └──────────┘
bash — install
pip install prometheus-fastapi-instrumentator
Python — main.py (auto-instrument)
from fastapi import FastAPI
from prometheus_fastapi_instrumentator import Instrumentator

app = FastAPI()

# One line — auto-creates /metrics endpoint with default metrics
Instrumentator().instrument(app).expose(app)

@app.get("/orders")
async def list_orders():
    return ["order1", "order2"]
After startup, visiting http://localhost:8000/metrics returns:
# HELP http_requests_total Total number of requests by method, handler, status # TYPE http_requests_total counter http_requests_total{handler="/orders",method="GET",status="2xx"} 1042 # HELP http_request_duration_seconds Duration of HTTP requests # TYPE http_request_duration_seconds histogram http_request_duration_seconds_bucket{le="0.1"} 990 http_request_duration_seconds_bucket{le="0.5"} 1040 http_request_duration_seconds_bucket{le="+Inf"} 1042 http_request_duration_seconds_sum 52.4 http_request_duration_seconds_count 1042
💡
The library automatically tracks request counts, response status codes, and latency histograms (p50, p95, p99) for every endpoint — with zero code changes to your routes.
Useful PromQL queries for Grafana dashboards:
PromQL
# Requests per second (last 5 min)
rate(http_requests_total[5m])

# Error rate (5xx) as percentage
rate(http_requests_total{status="5xx"}[5m])
  / rate(http_requests_total[5m]) * 100

# 99th percentile latency
histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))

# Requests per endpoint
sum by (handler) (rate(http_requests_total[5m]))
🎯
Custom Business Metrics

The auto-instrumented HTTP metrics tell you how your API is performing. But you also want business metrics: orders created per minute, active checkouts, payment failure rate. Create these with prometheus_client directly.

Counter

Only goes up. Total orders, errors, emails sent. Never resets except on restart.

Gauge

Goes up and down. Active users, queue size, current connections.

Histogram

Tracks distributions with buckets. Ideal for latency and request size.

Summary

Like histogram but calculates quantiles client-side. Less flexible for aggregation.

Python — metrics.py
from prometheus_client import Counter, Gauge, Histogram

# Counter: total orders created
ORDERS_CREATED = Counter(
    "orders_created_total",
    "Total orders created",
    labelnames=["payment_method", "currency"]
)

# Gauge: orders currently being processed
ACTIVE_CHECKOUTS = Gauge(
    "active_checkouts",
    "Orders in checkout flow right now"
)

# Histogram: payment processing time
PAYMENT_DURATION = Histogram(
    "payment_duration_seconds",
    "Time taken to process a payment",
    buckets=[.1, .25, .5, 1, 2, 5]
)
Python — using metrics in routes
import time
from metrics import ORDERS_CREATED, ACTIVE_CHECKOUTS, PAYMENT_DURATION

@app.post("/checkout")
async def checkout(order: Order):
    ACTIVE_CHECKOUTS.inc()          # gauge up
    start = time.perf_counter()

    try:
        result = await process_payment(order)

        ORDERS_CREATED.labels(
            payment_method=order.method,
            currency=order.currency
        ).inc()                         # counter up

        return result
    finally:
        elapsed = time.perf_counter() - start
        PAYMENT_DURATION.observe(elapsed)  # histogram bucket
        ACTIVE_CHECKOUTS.dec()          # gauge down
Use labels on metrics (like payment_method) to slice and dice in Grafana. Ask "what's the p99 latency for card payments in EUR?" and your labels answer it instantly.
19.3
Distributed Tracing
🔍
OpenTelemetry Setup

Tracing records the entire journey of a request — through your FastAPI handler, into the database, out to a microservice, and back. Each step is a span. Together they form a trace.

OpenTelemetry (OTel) is the vendor-neutral standard. You instrument once and export to Jaeger, Zipkin, Datadog, Honeycomb, or any other backend.

Incoming Request │ ▼ Trace ID: abc-123 ┌──────────────────────────────────────────────┐ │ Span: GET /orders/42 0ms → 85ms │ │ ├─ Span: auth.verify 2ms → 8ms │ │ ├─ Span: db.query_order 10ms → 55ms │ ← slow! │ │ └─ Span: db.connect 10ms → 12ms │ │ └─ Span: cache.set 57ms → 60ms │ └──────────────────────────────────────────────┘ → "The DB query took 45ms — that's the bottleneck"
bash — install OTel
pip install opentelemetry-api \
            opentelemetry-sdk \
            opentelemetry-instrumentation-fastapi \
            opentelemetry-instrumentation-sqlalchemy \
            opentelemetry-exporter-otlp
Python — tracing_setup.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

def setup_tracing(service_name: str = "fastapi-app"):
    resource = Resource.create({"service.name": service_name})
    provider = TracerProvider(resource=resource)

    # Export to an OTLP collector (Jaeger, Tempo, Honeycomb …)
    exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
    provider.add_span_processor(BatchSpanProcessor(exporter))

    trace.set_tracer_provider(provider)
    return trace.get_tracer(service_name)

# main.py
from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from tracing_setup import setup_tracing

tracer = setup_tracing("order-service")
app = FastAPI()

# Auto-instrument FastAPI routes and SQLAlchemy queries
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine)
💡
FastAPIInstrumentor automatically creates a span for every route. SQLAlchemyInstrumentor creates a span for every DB query. Together they show you exactly how much time your requests spend in the database vs application code — with zero manual work.
🔖
Custom Spans & Attributes

Auto-instrumentation gets you HTTP and DB spans for free. For your own business logic — calling an external API, running a pricing calculation, sending an email — add custom spans manually to get full visibility.

Python — custom spans in routes
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

@app.post("/orders")
async def create_order(order: OrderIn):
    # The outer span (for the route) is created automatically by FastAPIInstrumentor
    # Add child spans for the steps inside:

    with tracer.start_as_current_span("validate.inventory") as span:
        span.set_attribute("product.id", order.product_id)
        span.set_attribute("quantity", order.quantity)
        in_stock = await check_inventory(order.product_id, order.quantity)
        span.set_attribute("in_stock", in_stock)

    if not in_stock:
        raise HTTPException(400, "Out of stock")

    with tracer.start_as_current_span("payment.process") as span:
        span.set_attribute("amount", order.total)
        span.set_attribute("currency", order.currency)
        try:
            charge = await charge_card(order)
            span.set_attribute("charge.id", charge.id)
        except PaymentError as e:
            span.record_exception(e)          # capture exception in trace
            span.set_status(trace.StatusCode.ERROR)
            raise

    with tracer.start_as_current_span("notification.send_email"):
        await send_confirmation_email(order.user_email)

    return {"order_id": charge.order_id, "status": "confirmed"}
What this looks like in Jaeger UI:
POST /orders 0ms ──────────────── 95ms │ ├─ validate.inventory 2ms ──── 12ms │ product.id = "sku-42" │ quantity = 3 │ in_stock = true │ ├─ payment.process 14ms ──────────────── 88ms ← bottleneck! │ amount = 149.99 │ currency = "USD" │ charge.id = "ch_xyz" │ └─ notification.send_email 90ms ─── 95ms
Use span.record_exception(e) when catching errors — it attaches the full stack trace to the span in your tracing backend. Now you see not just that something failed but where and why, with full context.
Putting all three pillars together:
Pillar Answers Tool Storage
Logs What happened and when? structlog / Python logging Loki, CloudWatch, Datadog
Metrics How much, how fast, how often? Prometheus client Prometheus + Grafana
Traces Why was this request slow? OpenTelemetry Jaeger, Tempo, Honeycomb
The golden correlation: add trace_id to your log messages so you can jump from a log line directly to the trace:
Python — inject trace ID into logs
from opentelemetry import trace
import logging

class TraceIDFilter(logging.Filter):
    """Inject current OTel trace ID into every log record."""
    def filter(self, record):
        ctx = trace.get_current_span().get_span_context()
        record.trace_id = format(ctx.trace_id, "032x") if ctx.is_valid else "none"
        return True

# Now your log format can include %(trace_id)s
logging.basicConfig(
    format="%(asctime)s | %(levelname)s | trace=%(trace_id)s | %(message)s"
)
logging.getLogger().addFilter(TraceIDFilter())
2024-03-15 09:12:01 | INFO | trace=4a3f9bc12de8a1b3c2e5f6789abc0123 | Order 42 created 2024-03-15 09:12:01 | ERROR | trace=4a3f9bc12de8a1b3c2e5f6789abc0123 | Payment failed: card_declined
💡
With the same trace_id in both your logs and your trace backend, you can click a log line in Datadog or Loki and jump straight into Jaeger to see the full distributed trace. This is the holy grail of debugging in production.
Topic 19 Complete! You now understand all three pillars of observability: Logging (structured JSON, request middleware, request IDs), Metrics (Prometheus auto-instrumentation, custom counters/gauges/histograms), and Tracing (OpenTelemetry, auto spans for FastAPI + SQLAlchemy, custom spans with attributes, and correlating trace IDs in logs). Reply "next" to continue to Topic 20: Architecture Patterns.