FastAPI Mastery
Topic 14 of 22 — WebSockets
63.6% complete
Topic 14
WebSockets 🔌
HTTP is request-response — the client asks, the server answers, and the connection closes. WebSockets keep the connection open so the server can push data to the client anytime. Perfect for chat apps, live dashboards, multiplayer games, and AI token streaming.
HTTP vs WebSocket
🌐 HTTP

Client sends request → Server replies → Connection closes. Client must poll again for new data. Simple, stateless, easy to cache.

🔌 WebSocket

Client upgrades connection → Both sides can send anytime → Connection stays open. Perfect for real-time push. Stateful, persistent.

Client Server │ │ │── GET /ws (Upgrade: websocket)▶│ HTTP Handshake │◀── 101 Switching Protocols ───│ Connection upgraded ✅ │ │ │── "hello" ───────────────────▶│ Client sends anytime │◀── "world" ───────────────────│ Server sends anytime │◀── "new message!" ────────────│ Server pushes without request │── "bye" ────────────────────▶│ │ │ │◀──── Connection Close ────────│ Either side can close
14.1 Protocol
🤝
Handshake
A WebSocket starts life as a normal HTTP GET request with special headers asking to "upgrade" the connection. The server replies with 101 Switching Protocols, and from that moment the TCP connection becomes a full-duplex WebSocket channel.
What the upgrade HTTP request looks like
HTTP — Upgrade Request Headers
# Client sends this (browser/JS does it automatically)
GET /ws/chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

# Server responds with:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Key headers explained:
HeaderMeaning
Upgrade: websocketClient is requesting protocol upgrade
Connection: UpgradeThis is a connection upgrade request
Sec-WebSocket-KeyRandom base64 nonce for security verification
Sec-WebSocket-Version: 13WebSocket protocol version (always 13)
101 Switching ProtocolsServer accepted — connection is now WebSocket
💡
FastAPI (via Starlette) handles all this handshake logic for you automatically. You just call await websocket.accept() and the protocol switch happens behind the scenes.
The URL scheme — WebSocket URLs use ws:// or wss:// (secure):
JavaScript — browser connects
// Plain WebSocket (development)
const ws = new WebSocket("ws://localhost:8000/ws/chat");

// Secure WebSocket (production, same as HTTPS → WSS)
const ws = new WebSocket("wss://api.example.com/ws/chat");

// With path params / query params
const ws = new WebSocket(`wss://api.example.com/ws/${roomId}?token=${token}`);
📦
Frames · Ping · Pong
After the handshake, data travels as frames. Each frame has a type (text, binary, ping, pong, close) and a payload. You don't usually deal with raw frames in FastAPI — the library handles it — but understanding frames explains why ping/pong exists.
Frame types
Frame TypeOpcodePurpose
Text0x1UTF-8 string data (most common — JSON messages)
Binary0x2Raw bytes (images, files, audio)
Ping0x9Keep-alive probe — "are you still there?"
Pong0xAReply to ping — "yes, I'm here"
Close0x8Gracefully closing the connection
Continuation0x0Used to split large messages into multiple frames
Ping/Pong — keepalive mechanism
Client Server │ │ │ │ (every 30s, server sends) │◀──────── PING ────────────────│ "still there?" │──────── PONG ────────────────▶│ "yes!" │ │ │ (if no PONG received) │ │ │ Server detects dead connection │ │ Closes & cleans up resources
Starlette/FastAPI handles ping/pong automatically at the ASGI level. But you can also send pings manually from Python:
Python — manual ping (low-level)
from fastapi import WebSocket # Starlette's WebSocket exposes the raw ASGI send/receive # For most apps you never need to send ping manually — # the ASGI server (Uvicorn) handles keepalive for you. # But you CAN send a ping frame explicitly: await websocket.send({"type": "websocket.send", "bytes": None, "text": None}) # In practice, use uvicorn's ws_ping_interval setting instead: # uvicorn main:app --ws-ping-interval 20 --ws-ping-timeout 20
Run Uvicorn with --ws-ping-interval 20 to auto-ping every 20 seconds. Dead connections that don't pong are closed automatically — no code needed.
14.2 FastAPI WebSockets
Accept Connections
Declare a WebSocket route with @app.websocket("/path"). The handler receives a WebSocket object. Always call await websocket.accept() first — this completes the handshake. Then enter a loop to send/receive messages.
Python — minimal WebSocket endpoint
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()   # ← MUST be first. Completes the handshake.

    while True:
        data = await websocket.receive_text()   # wait for a message
        await websocket.send_text(f"Echo: {data}")  # echo it back
Testing this from the browser console:
JavaScript — browser test
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onopen    = () => ws.send("Hello FastAPI!");
ws.onmessage = (e) => console.log(e.data);  // logs "Echo: Hello FastAPI!"
ws.onclose   = () => console.log("closed");
Accept with path parameters and query params
Python — path + query params
from fastapi import WebSocket, Query

@app.websocket("/ws/{room_id}")
async def room_ws(
    websocket: WebSocket,
    room_id: str,                        # path param
    username: str = Query(None)         # ?username=alice
):
    await websocket.accept()
    print(f"{username} joined room {room_id}")
    while True:
        msg = await websocket.receive_text()
        await websocket.send_text(f"[{room_id}] {username}: {msg}")

# Connect: ws://localhost:8000/ws/general?username=alice
Handling disconnects gracefully — when a client disconnects, receive_text() raises WebSocketDisconnect:
Python — handle disconnect
from fastapi import WebSocket
from starlette.websockets import WebSocketDisconnect

@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Got: {data}")
    except WebSocketDisconnect:
        print("Client disconnected — clean up here")
        # remove from connection manager, save last seen, etc.
📥
Receive Messages
FastAPI's WebSocket object provides three receive methods depending on what the client sends.
MethodReturnsUse When
receive_text()strClient sends UTF-8 text (JSON strings, chat messages)
receive_bytes()bytesClient sends binary data (files, images, audio)
receive_json()dict/listClient sends JSON — FastAPI parses it automatically
Python — receive text, bytes, JSON
@app.websocket("/ws/text")
async def text_ws(ws: WebSocket):
    await ws.accept()
    while True:
        msg: str = await ws.receive_text()
        print(f"Text: {msg}")

@app.websocket("/ws/binary")
async def binary_ws(ws: WebSocket):
    await ws.accept()
    while True:
        data: bytes = await ws.receive_bytes()
        print(f"Received {len(data)} bytes")
        # process image, audio chunk, etc.

@app.websocket("/ws/json")
async def json_ws(ws: WebSocket):
    await ws.accept()
    while True:
        payload: dict = await ws.receive_json()
        # payload = {"action": "move", "x": 10, "y": 20}
        action = payload.get("action")
        print(f"Action: {action}")
Low-level receive() — returns a raw dict, handles both text and binary in one call:
Python — raw receive()
@app.websocket("/ws/any")
async def any_ws(ws: WebSocket):
    await ws.accept()
    while True:
        message = await ws.receive()
        # message is a dict: {"type": "websocket.receive", "text": "...", "bytes": None}
        if message["type"] == "websocket.receive":
            if message.get("text"):
                print(f"Text: {message['text']}")
            elif message.get("bytes"):
                print(f"Bytes: {len(message['bytes'])} bytes")
        elif message["type"] == "websocket.disconnect":
            break
📤
Send Messages
Mirror of receive — three methods for sending text, bytes, or JSON.
MethodInputUse When
send_text(data)strSending plain text or raw JSON strings
send_bytes(data)bytesSending files, images, audio chunks
send_json(data)dict/listSending structured data — auto-serialized to JSON
Python — send examples
@app.websocket("/ws/demo")
async def demo_ws(ws: WebSocket):
    await ws.accept()

    # Send a plain string
    await ws.send_text("Welcome to the server!")

    # Send structured JSON (dict → serialized automatically)
    await ws.send_json({
        "type": "connected",
        "user_count": 42,
        "server_time": "2024-01-01T00:00:00Z"
    })

    # Send binary data (e.g., a small image thumbnail)
    with open("thumb.png", "rb") as f:
        await ws.send_bytes(f.read())

    # Receive and echo loop
    try:
        while True:
            msg = await ws.receive_text()
            await ws.send_json({"echo": msg, "length": len(msg)})
    except WebSocketDisconnect:
        pass

    # Graceful close with a code
    # await ws.close(code=1000)  # 1000 = normal closure
WebSocket close codes
CodeMeaning
1000Normal closure
1001Going away (server restart / page navigate)
1006Abnormal closure (connection dropped)
1008Policy violation (auth failed)
1011Internal server error
14.3 Advanced
🗂️
Connection Manager
With multiple clients connected, you need to track all active connections. A Connection Manager is a simple class that holds a list of connected WebSocket objects and provides connect, disconnect, and broadcast methods.
Python — ConnectionManager
from fastapi import FastAPI, WebSocket
from starlette.websockets import WebSocketDisconnect
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

    async def send_personal(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

# ── Instantiate ONE manager (shared across all connections) ──
manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # Send back to sender
            await manager.send_personal(f"You said: {data}", websocket)
            # Broadcast to everyone else
            await manager.broadcast(f"Client #{client_id}: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left")
⚠️
This in-memory list only works with a single server process. With multiple workers, use Redis Pub/Sub to share state across processes (shown in the Rooms section).
🏠
Rooms
A room is a named group of WebSocket connections. Messages sent to a room only go to clients in that room. Implement rooms with a dict mapping room names to lists of connections.
Python — Room-based ConnectionManager
from collections import defaultdict

class RoomManager:
    def __init__(self):
        # {"general": [ws1, ws2], "dev": [ws3]}
        self.rooms: dict[str, List[WebSocket]] = defaultdict(list)

    async def join(self, room: str, websocket: WebSocket):
        await websocket.accept()
        self.rooms[room].append(websocket)
        await self.broadcast_to_room(room, f"A new user joined #{room}")

    def leave(self, room: str, websocket: WebSocket):
        if websocket in self.rooms[room]:
            self.rooms[room].remove(websocket)
        if not self.rooms[room]:
            del self.rooms[room]   # clean up empty room

    async def broadcast_to_room(self, room: str, message: str):
        for ws in self.rooms.get(room, []):
            await ws.send_text(message)

    def room_count(self, room: str) -> int:
        return len(self.rooms.get(room, []))

room_manager = RoomManager()

@app.websocket("/ws/room/{room_name}")
async def room_ws(
    websocket: WebSocket,
    room_name: str,
    username: str = Query("anonymous")
):
    await room_manager.join(room_name, websocket)
    try:
        while True:
            text = await websocket.receive_text()
            await room_manager.broadcast_to_room(
                room_name,
                f"[{room_name}] {username}: {text}"
            )
    except WebSocketDisconnect:
        room_manager.leave(room_name, websocket)
        await room_manager.broadcast_to_room(room_name, f"{username} left")

# Connect to room: ws://localhost:8000/ws/room/general?username=alice
📡
Broadcast
Broadcast = send the same message to every connected client. The simple in-memory version works for a single process. For production multi-worker setups, use Redis Pub/Sub so workers can broadcast across processes.
In-memory broadcast (single process) — already in ConnectionManager above. Here's a standalone example with JSON payloads:
Python — JSON broadcast
import json
from datetime import datetime

connections: List[WebSocket] = []

async def broadcast_json(event_type: str, data: dict):
    payload = json.dumps({
        "type": event_type,
        "data": data,
        "ts": datetime.utcnow().isoformat()
    })
    dead = []
    for ws in connections:
        try:
            await ws.send_text(payload)
        except:
            dead.append(ws)  # collect failed connections
    for ws in dead:
        connections.remove(ws)  # clean up dead ones

# Trigger a broadcast from an HTTP endpoint (e.g., admin pushes an alert)
@app.post("/admin/broadcast")
async def admin_broadcast(message: str):
    await broadcast_json("admin_alert", {"message": message})
    return {"sent_to": len(connections)}
Redis Pub/Sub broadcast (multi-worker) — each FastAPI worker subscribes to a Redis channel. When any worker publishes, all workers relay it to their local connections:
Python — Redis Pub/Sub broadcast
# pip install redis asyncio-redis or redis[asyncio]
import asyncio
import json
import redis.asyncio as aioredis

redis_client = None
local_connections: List[WebSocket] = []

@asynccontextmanager
async def lifespan(app):
    global redis_client
    redis_client = aioredis.from_url("redis://localhost:6379")
    # Start subscriber background task
    asyncio.create_task(redis_subscriber())
    yield
    await redis_client.close()

async def redis_subscriber():
    "Subscribe to Redis, relay to local WebSocket clients"
    pubsub = redis_client.pubsub()
    await pubsub.subscribe("chat")
    async for message in pubsub.listen():
        if message["type"] == "message":
            text = message["data"].decode()
            for ws in local_connections:
                await ws.send_text(text)

@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket):
    await ws.accept()
    local_connections.append(ws)
    try:
        while True:
            msg = await ws.receive_text()
            # Publish to Redis → ALL workers relay to their clients
            await redis_client.publish("chat", msg)
    except WebSocketDisconnect:
        local_connections.remove(ws)
Redis Pub/Sub is the standard pattern for scaling WebSockets horizontally. Each worker only tracks its own connections; Redis fans out messages across all workers.
🔐
Authentication
Browser WebSockets cannot send custom HTTP headers during the handshake. So you can't use Authorization: Bearer .... The two standard alternatives are: query parameter token or first-message token.
Option A — Token in query param (simple)
Python — query param auth
from fastapi import WebSocket, Query, HTTPException import jwt # pip install pyjwt SECRET = "mysecret" def get_user_from_token(token: str) -> dict: try: payload = jwt.decode(token, SECRET, algorithms=["HS256"]) return payload except jwt.InvalidTokenError: return None @app.websocket("/ws") async def protected_ws( websocket: WebSocket, token: str = Query(None) # ?token=eyJ... ): user = get_user_from_token(token) if not user: await websocket.close(code=1008) # 1008 = policy violation return await websocket.accept() await websocket.send_json({"type": "auth_ok", "user": user["sub"]}) try: while True: msg = await websocket.receive_text() await websocket.send_text(f"[{user['sub']}]: {msg}") except WebSocketDisconnect: pass # JS: new WebSocket("ws://localhost:8000/ws?token=eyJhbGci...")
Option B — First-message token (more secure) — accept the connection, then immediately wait for an auth message:
Python — first-message auth
@app.websocket("/ws/secure")
async def secure_ws(websocket: WebSocket):
    await websocket.accept()   # accept first (connection is open)

    # Wait for the first message — must be an auth token
    auth_msg = await websocket.receive_json()
    token = auth_msg.get("token")

    user = get_user_from_token(token)
    if not user:
        await websocket.send_json({"type": "error", "msg": "Unauthorized"})
        await websocket.close(code=1008)
        return

    await websocket.send_json({"type": "auth_ok"})

    try:
        while True:
            data = await websocket.receive_json()
            await websocket.send_json({"echo": data})
    except WebSocketDisconnect:
        pass

// JS client:
// ws.onopen = () => ws.send(JSON.stringify({token: "eyJhbGci..."}));
Option C — Cookie-based auth — if the user is already logged in with a session cookie:
Python — cookie auth
from fastapi import Cookie

@app.websocket("/ws")
async def cookie_ws(
    websocket: WebSocket,
    session: str = Cookie(None)   # reads the "session" cookie
):
    if not session:
        await websocket.close(code=1008)
        return
    user = validate_session(session)  # your session lookup
    await websocket.accept()
    await websocket.send_json({"type": "auth_ok", "user": user})
🔁
Reconnection
WebSocket connections drop — network hiccups, server restarts, mobile going to sleep. The browser's native WebSocket does NOT auto-reconnect. You must implement it on the client side.
Client-side auto-reconnect (JavaScript)
JavaScript — reconnect with exponential backoff
class ReconnectingWebSocket { constructor(url, token) { this.url = url; this.token = token; this.retryDelay = 1000; // start with 1 second this.maxDelay = 30000; // max 30 seconds this.ws = null; this.connect(); } connect() { this.ws = new WebSocket(`ws://localhost:8000/ws?token=${this.token}`); this.ws.onopen = () => { console.log("✅ Connected"); this.retryDelay = 1000; // reset delay on success }; this.ws.onmessage = (e) => { console.log("Message:", e.data); }; this.ws.onclose = (e) => { if (e.code === 1008) { // auth failure — don't retry console.log("Auth failed, not reconnecting"); return; } console.log(`Disconnected. Reconnecting in ${this.retryDelay}ms`); setTimeout(() => this.connect(), this.retryDelay); // exponential backoff: 1s → 2s → 4s → ... → 30s this.retryDelay = Math.min(this.retryDelay * 2, this.maxDelay); }; this.ws.onerror = (e) => console.error("WS error:", e); } send(data) { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send(data); } } } // Usage const socket = new ReconnectingWebSocket("ws://localhost:8000/ws", myJwtToken); socket.send("Hello!");
Server-side: detect stale connections — the server should clean up dead connections proactively:
Python — heartbeat from server
import asyncio @app.websocket("/ws") async def ws_with_heartbeat(ws: WebSocket): await ws.accept() async def heartbeat(): while True: await asyncio.sleep(15) try: await ws.send_json({"type": "ping"}) except: break # connection dead — exit heartbeat loop # Run heartbeat and message handler concurrently hb_task = asyncio.create_task(heartbeat()) try: while True: data = await ws.receive_text() if data == "pong": continue # client responding to ping await ws.send_text(f"Echo: {data}") except WebSocketDisconnect: hb_task.cancel()
The recommended pattern: Uvicorn ws-ping-interval handles OS-level keepalive. A JSON ping/pong message loop handles application-level disconnect detection. Use both in production.
🎯 Putting it all together — a minimal chat app
Python — complete chat server
from fastapi import FastAPI, WebSocket, Query from fastapi.responses import HTMLResponse from starlette.websockets import WebSocketDisconnect from collections import defaultdict from typing import List import json app = FastAPI() class ChatManager: def __init__(self): self.rooms: dict[str, List[WebSocket]] = defaultdict(list) async def join(self, room: str, ws: WebSocket, username: str): await ws.accept() self.rooms[room].append(ws) await self.broadcast(room, "system", f"{username} joined") def leave(self, room: str, ws: WebSocket): self.rooms[room].remove(ws) async def broadcast(self, room: str, sender: str, text: str): msg = json.dumps({"from": sender, "text": text}) dead = [] for ws in self.rooms[room]: try: await ws.send_text(msg) except: dead.append(ws) for ws in dead: self.rooms[room].remove(ws) chat = ChatManager() @app.websocket("/chat/{room}") async def chat_ws( ws: WebSocket, room: str, username: str = Query("anonymous") ): await chat.join(room, ws, username) try: while True: text = await ws.receive_text() await chat.broadcast(room, username, text) except WebSocketDisconnect: chat.leave(room, ws) await chat.broadcast(room, "system", f"{username} left")
Topic 14 — Key Takeaways
WebSocket = persistent, bidirectional connection
Starts as an HTTP upgrade request (101 Switching Protocols). After that, both sides can send frames anytime without waiting for a request.
Always accept() first, then loop receive/send
Use @app.websocket(), call await websocket.accept(), then while True: receive → process → send. Wrap in try/except WebSocketDisconnect.
ConnectionManager tracks active connections
A shared singleton with a list of WebSocket objects. connect(), disconnect(), broadcast(), and broadcast_to_room() are the four key methods.
Auth via query param or first-message token
Browsers can't send Authorization headers with WebSockets. Use ?token=... in the URL or send the token as the very first JSON message after connecting.
Scale with Redis Pub/Sub; reconnect with exponential backoff
Single-process: in-memory list. Multi-worker: Redis channel where every worker subscribes and relays. Client-side reconnect: retry with 1s → 2s → 4s backoff, skip retry on auth failure (code 1008).