Python's built-in logging module is the standard. FastAPI (via Uvicorn) already emits access logs — you configure how detailed they are, where they go, and what format they use.
Log levels control what gets shown — from most verbose to most critical:
Fine-grained detail. Only enable in development — very noisy.
Normal app events — requests handled, startup complete, etc.
Something unexpected but not breaking. Retried DB call, etc.
Something failed. Log and investigate promptly.
System-level failure. App may be unresponsive. Page your on-call.
import logging import sys def setup_logging(level: str = "INFO"): logging.basicConfig( level=getattr(logging, level), format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[logging.StreamHandler(sys.stdout)], ) # main.py from fastapi import FastAPI from logging_config import setup_logging setup_logging() logger = logging.getLogger(__name__) app = FastAPI() @app.on_event("startup") async def on_startup(): logger.info("🚀 App started successfully") @app.get("/orders/{order_id}") async def get_order(order_id: int): logger.debug("Fetching order %s", order_id) if order_id == 0: logger.warning("Attempted to fetch order ID 0 — invalid") return {"error": "invalid id"} logger.info("Order %s fetched OK", order_id) return {"order_id": order_id, "status": "shipped"}
logger = logging.getLogger(__name__). This makes log output show exactly which file the message came from — invaluable in large apps.# Development — verbose uvicorn main:app --log-level debug # Production — only warnings and above uvicorn main:app --log-level warning # Or via environment variable LOG_LEVEL=info uvicorn main:app
Plain text logs are readable by humans but hard for machines to search. Structured logging emits JSON — every log entry is a machine-parseable object that log aggregators (Datadog, Loki, CloudWatch) can index, filter, and alert on.
pip install structlog
import structlog import logging def setup_structlog(): structlog.configure( processors=[ structlog.stdlib.add_log_level, structlog.stdlib.add_logger_name, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.JSONRenderer(), # ← outputs JSON! ], wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), logger_factory=structlog.PrintLoggerFactory(), ) setup_structlog() log = structlog.get_logger() # Usage — add key-value context to any log call log.info("order.created", order_id=42, user_id="usr_abc", total=99.99, currency="USD" ) log.error("payment.failed", order_id=42, reason="card_declined", attempt=2 )
level=error AND order_id=42 directly in your log platform — no regex parsing needed. This turns minutes of grep-ing into a one-second search.Log every incoming request and outgoing response automatically — method, path, status code, duration, and a unique request ID for correlating all log lines belonging to one request.
import time, uuid, logging from fastapi import Request from starlette.middleware.base import BaseHTTPMiddleware logger = logging.getLogger("api.access") class RequestLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): # Generate unique ID for this request request_id = str(uuid.uuid4())[:8] start_time = time.perf_counter() # Attach request_id to the request state (usable in route handlers) request.state.request_id = request_id logger.info( "[%s] → %s %s", request_id, request.method, request.url.path ) try: response = await call_next(request) except Exception as exc: logger.error("[%s] ✗ UNHANDLED %s", request_id, exc) raise duration_ms = (time.perf_counter() - start_time) * 1000 logger.info( "[%s] ← %s %s %d (%.1fms)", request_id, request.method, request.url.path, response.status_code, duration_ms ) # Add request ID to response header so clients can reference it response.headers["X-Request-ID"] = request_id return response # Register in main.py app.add_middleware(RequestLoggingMiddleware)
[a3f9bc12] appears in every log line for that request. When you have 1000 concurrent requests you can filter by ID and see the complete story for just one of them.
Prometheus is the industry-standard metrics system. It periodically scrapes your app's /metrics endpoint and stores time-series data. Grafana then visualises it as dashboards and alerts.
pip install prometheus-fastapi-instrumentator
from fastapi import FastAPI from prometheus_fastapi_instrumentator import Instrumentator app = FastAPI() # One line — auto-creates /metrics endpoint with default metrics Instrumentator().instrument(app).expose(app) @app.get("/orders") async def list_orders(): return ["order1", "order2"]
http://localhost:8000/metrics returns:# Requests per second (last 5 min) rate(http_requests_total[5m]) # Error rate (5xx) as percentage rate(http_requests_total{status="5xx"}[5m]) / rate(http_requests_total[5m]) * 100 # 99th percentile latency histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m])) # Requests per endpoint sum by (handler) (rate(http_requests_total[5m]))
The auto-instrumented HTTP metrics tell you how your API is performing. But you also want business metrics: orders created per minute, active checkouts, payment failure rate. Create these with prometheus_client directly.
Only goes up. Total orders, errors, emails sent. Never resets except on restart.
Goes up and down. Active users, queue size, current connections.
Tracks distributions with buckets. Ideal for latency and request size.
Like histogram but calculates quantiles client-side. Less flexible for aggregation.
from prometheus_client import Counter, Gauge, Histogram # Counter: total orders created ORDERS_CREATED = Counter( "orders_created_total", "Total orders created", labelnames=["payment_method", "currency"] ) # Gauge: orders currently being processed ACTIVE_CHECKOUTS = Gauge( "active_checkouts", "Orders in checkout flow right now" ) # Histogram: payment processing time PAYMENT_DURATION = Histogram( "payment_duration_seconds", "Time taken to process a payment", buckets=[.1, .25, .5, 1, 2, 5] )
import time from metrics import ORDERS_CREATED, ACTIVE_CHECKOUTS, PAYMENT_DURATION @app.post("/checkout") async def checkout(order: Order): ACTIVE_CHECKOUTS.inc() # gauge up start = time.perf_counter() try: result = await process_payment(order) ORDERS_CREATED.labels( payment_method=order.method, currency=order.currency ).inc() # counter up return result finally: elapsed = time.perf_counter() - start PAYMENT_DURATION.observe(elapsed) # histogram bucket ACTIVE_CHECKOUTS.dec() # gauge down
payment_method) to slice and dice in Grafana. Ask "what's the p99 latency for card payments in EUR?" and your labels answer it instantly.Tracing records the entire journey of a request — through your FastAPI handler, into the database, out to a microservice, and back. Each step is a span. Together they form a trace.
OpenTelemetry (OTel) is the vendor-neutral standard. You instrument once and export to Jaeger, Zipkin, Datadog, Honeycomb, or any other backend.
pip install opentelemetry-api \
opentelemetry-sdk \
opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-sqlalchemy \
opentelemetry-exporter-otlp
from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource def setup_tracing(service_name: str = "fastapi-app"): resource = Resource.create({"service.name": service_name}) provider = TracerProvider(resource=resource) # Export to an OTLP collector (Jaeger, Tempo, Honeycomb …) exporter = OTLPSpanExporter(endpoint="http://localhost:4317") provider.add_span_processor(BatchSpanProcessor(exporter)) trace.set_tracer_provider(provider) return trace.get_tracer(service_name) # main.py from fastapi import FastAPI from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor from tracing_setup import setup_tracing tracer = setup_tracing("order-service") app = FastAPI() # Auto-instrument FastAPI routes and SQLAlchemy queries FastAPIInstrumentor.instrument_app(app) SQLAlchemyInstrumentor().instrument(engine=engine)
FastAPIInstrumentor automatically creates a span for every route. SQLAlchemyInstrumentor creates a span for every DB query. Together they show you exactly how much time your requests spend in the database vs application code — with zero manual work.Auto-instrumentation gets you HTTP and DB spans for free. For your own business logic — calling an external API, running a pricing calculation, sending an email — add custom spans manually to get full visibility.
from opentelemetry import trace tracer = trace.get_tracer(__name__) @app.post("/orders") async def create_order(order: OrderIn): # The outer span (for the route) is created automatically by FastAPIInstrumentor # Add child spans for the steps inside: with tracer.start_as_current_span("validate.inventory") as span: span.set_attribute("product.id", order.product_id) span.set_attribute("quantity", order.quantity) in_stock = await check_inventory(order.product_id, order.quantity) span.set_attribute("in_stock", in_stock) if not in_stock: raise HTTPException(400, "Out of stock") with tracer.start_as_current_span("payment.process") as span: span.set_attribute("amount", order.total) span.set_attribute("currency", order.currency) try: charge = await charge_card(order) span.set_attribute("charge.id", charge.id) except PaymentError as e: span.record_exception(e) # capture exception in trace span.set_status(trace.StatusCode.ERROR) raise with tracer.start_as_current_span("notification.send_email"): await send_confirmation_email(order.user_email) return {"order_id": charge.order_id, "status": "confirmed"}
span.record_exception(e) when catching errors — it attaches the full stack trace to the span in your tracing backend. Now you see not just that something failed but where and why, with full context.| Pillar | Answers | Tool | Storage |
|---|---|---|---|
| Logs | What happened and when? | structlog / Python logging | Loki, CloudWatch, Datadog |
| Metrics | How much, how fast, how often? | Prometheus client | Prometheus + Grafana |
| Traces | Why was this request slow? | OpenTelemetry | Jaeger, Tempo, Honeycomb |
trace_id to your log messages so you can jump from a log line directly to the trace:from opentelemetry import trace import logging class TraceIDFilter(logging.Filter): """Inject current OTel trace ID into every log record.""" def filter(self, record): ctx = trace.get_current_span().get_span_context() record.trace_id = format(ctx.trace_id, "032x") if ctx.is_valid else "none" return True # Now your log format can include %(trace_id)s logging.basicConfig( format="%(asctime)s | %(levelname)s | trace=%(trace_id)s | %(message)s" ) logging.getLogger().addFilter(TraceIDFilter())
trace_id in both your logs and your trace backend, you can click a log line in Datadog or Loki and jump straight into Jaeger to see the full distributed trace. This is the holy grail of debugging in production.