Topic 14
WebSockets 🔌
HTTP is request-response — the client asks, the server answers, and the connection closes. WebSockets keep the connection open so the server can push data to the client anytime. Perfect for chat apps, live dashboards, multiplayer games, and AI token streaming.
HTTP vs WebSocket
🌐 HTTP
Client sends request → Server replies → Connection closes. Client must poll again for new data. Simple, stateless, easy to cache.
🔌 WebSocket
Client upgrades connection → Both sides can send anytime → Connection stays open. Perfect for real-time push. Stateful, persistent.
Client Server
│ │
│── GET /ws (Upgrade: websocket)▶│ HTTP Handshake
│◀── 101 Switching Protocols ───│ Connection upgraded ✅
│ │
│── "hello" ───────────────────▶│ Client sends anytime
│◀── "world" ───────────────────│ Server sends anytime
│◀── "new message!" ────────────│ Server pushes without request
│── "bye" ────────────────────▶│
│ │
│◀──── Connection Close ────────│ Either side can close
14.1
Protocol
Handshake
▼
A WebSocket starts life as a normal HTTP GET request with special headers asking to "upgrade" the connection. The server replies with 101 Switching Protocols, and from that moment the TCP connection becomes a full-duplex WebSocket channel.
What the upgrade HTTP request looks like
HTTP — Upgrade Request Headers
# Client sends this (browser/JS does it automatically) GET /ws/chat HTTP/1.1 Host: example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== Sec-WebSocket-Version: 13 # Server responds with: HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Key headers explained:
| Header | Meaning |
|---|---|
Upgrade: websocket | Client is requesting protocol upgrade |
Connection: Upgrade | This is a connection upgrade request |
Sec-WebSocket-Key | Random base64 nonce for security verification |
Sec-WebSocket-Version: 13 | WebSocket protocol version (always 13) |
101 Switching Protocols | Server accepted — connection is now WebSocket |
FastAPI (via Starlette) handles all this handshake logic for you automatically. You just call
await websocket.accept() and the protocol switch happens behind the scenes.The URL scheme — WebSocket URLs use
ws:// or wss:// (secure):JavaScript — browser connects
// Plain WebSocket (development) const ws = new WebSocket("ws://localhost:8000/ws/chat"); // Secure WebSocket (production, same as HTTPS → WSS) const ws = new WebSocket("wss://api.example.com/ws/chat"); // With path params / query params const ws = new WebSocket(`wss://api.example.com/ws/${roomId}?token=${token}`);
Frames · Ping · Pong
▼
After the handshake, data travels as frames. Each frame has a type (text, binary, ping, pong, close) and a payload. You don't usually deal with raw frames in FastAPI — the library handles it — but understanding frames explains why ping/pong exists.
Frame types
| Frame Type | Opcode | Purpose |
|---|---|---|
| Text | 0x1 | UTF-8 string data (most common — JSON messages) |
| Binary | 0x2 | Raw bytes (images, files, audio) |
| Ping | 0x9 | Keep-alive probe — "are you still there?" |
| Pong | 0xA | Reply to ping — "yes, I'm here" |
| Close | 0x8 | Gracefully closing the connection |
| Continuation | 0x0 | Used to split large messages into multiple frames |
Ping/Pong — keepalive mechanism
Client Server
│ │
│ │ (every 30s, server sends)
│◀──────── PING ────────────────│ "still there?"
│──────── PONG ────────────────▶│ "yes!"
│ │
│ (if no PONG received) │
│ │ Server detects dead connection
│ │ Closes & cleans up resources
Starlette/FastAPI handles ping/pong automatically at the ASGI level. But you can also send pings manually from Python:
Python — manual ping (low-level)
from fastapi import WebSocket # Starlette's WebSocket exposes the raw ASGI send/receive # For most apps you never need to send ping manually — # the ASGI server (Uvicorn) handles keepalive for you. # But you CAN send a ping frame explicitly: await websocket.send({"type": "websocket.send", "bytes": None, "text": None}) # In practice, use uvicorn's ws_ping_interval setting instead: # uvicorn main:app --ws-ping-interval 20 --ws-ping-timeout 20
Run Uvicorn with
--ws-ping-interval 20 to auto-ping every 20 seconds. Dead connections that don't pong are closed automatically — no code needed.
14.2
FastAPI WebSockets
Accept Connections
▼
Declare a WebSocket route with
@app.websocket("/path"). The handler receives a WebSocket object. Always call await websocket.accept() first — this completes the handshake. Then enter a loop to send/receive messages.
Python — minimal WebSocket endpoint
from fastapi import FastAPI, WebSocket app = FastAPI() @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() # ← MUST be first. Completes the handshake. while True: data = await websocket.receive_text() # wait for a message await websocket.send_text(f"Echo: {data}") # echo it back
Testing this from the browser console:
JavaScript — browser test
const ws = new WebSocket("ws://localhost:8000/ws"); ws.onopen = () => ws.send("Hello FastAPI!"); ws.onmessage = (e) => console.log(e.data); // logs "Echo: Hello FastAPI!" ws.onclose = () => console.log("closed");
Accept with path parameters and query params
Python — path + query params
from fastapi import WebSocket, Query @app.websocket("/ws/{room_id}") async def room_ws( websocket: WebSocket, room_id: str, # path param username: str = Query(None) # ?username=alice ): await websocket.accept() print(f"{username} joined room {room_id}") while True: msg = await websocket.receive_text() await websocket.send_text(f"[{room_id}] {username}: {msg}") # Connect: ws://localhost:8000/ws/general?username=alice
Handling disconnects gracefully — when a client disconnects,
receive_text() raises WebSocketDisconnect:Python — handle disconnect
from fastapi import WebSocket from starlette.websockets import WebSocketDisconnect @app.websocket("/ws") async def ws_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_text() await websocket.send_text(f"Got: {data}") except WebSocketDisconnect: print("Client disconnected — clean up here") # remove from connection manager, save last seen, etc.
Receive Messages
▼
FastAPI's
WebSocket object provides three receive methods depending on what the client sends.| Method | Returns | Use When |
|---|---|---|
receive_text() | str | Client sends UTF-8 text (JSON strings, chat messages) |
receive_bytes() | bytes | Client sends binary data (files, images, audio) |
receive_json() | dict/list | Client sends JSON — FastAPI parses it automatically |
Python — receive text, bytes, JSON
@app.websocket("/ws/text") async def text_ws(ws: WebSocket): await ws.accept() while True: msg: str = await ws.receive_text() print(f"Text: {msg}") @app.websocket("/ws/binary") async def binary_ws(ws: WebSocket): await ws.accept() while True: data: bytes = await ws.receive_bytes() print(f"Received {len(data)} bytes") # process image, audio chunk, etc. @app.websocket("/ws/json") async def json_ws(ws: WebSocket): await ws.accept() while True: payload: dict = await ws.receive_json() # payload = {"action": "move", "x": 10, "y": 20} action = payload.get("action") print(f"Action: {action}")
Low-level
receive() — returns a raw dict, handles both text and binary in one call:Python — raw receive()
@app.websocket("/ws/any") async def any_ws(ws: WebSocket): await ws.accept() while True: message = await ws.receive() # message is a dict: {"type": "websocket.receive", "text": "...", "bytes": None} if message["type"] == "websocket.receive": if message.get("text"): print(f"Text: {message['text']}") elif message.get("bytes"): print(f"Bytes: {len(message['bytes'])} bytes") elif message["type"] == "websocket.disconnect": break
Send Messages
▼
Mirror of receive — three methods for sending text, bytes, or JSON.
| Method | Input | Use When |
|---|---|---|
send_text(data) | str | Sending plain text or raw JSON strings |
send_bytes(data) | bytes | Sending files, images, audio chunks |
send_json(data) | dict/list | Sending structured data — auto-serialized to JSON |
Python — send examples
@app.websocket("/ws/demo") async def demo_ws(ws: WebSocket): await ws.accept() # Send a plain string await ws.send_text("Welcome to the server!") # Send structured JSON (dict → serialized automatically) await ws.send_json({ "type": "connected", "user_count": 42, "server_time": "2024-01-01T00:00:00Z" }) # Send binary data (e.g., a small image thumbnail) with open("thumb.png", "rb") as f: await ws.send_bytes(f.read()) # Receive and echo loop try: while True: msg = await ws.receive_text() await ws.send_json({"echo": msg, "length": len(msg)}) except WebSocketDisconnect: pass # Graceful close with a code # await ws.close(code=1000) # 1000 = normal closure
WebSocket close codes
| Code | Meaning |
|---|---|
| 1000 | Normal closure |
| 1001 | Going away (server restart / page navigate) |
| 1006 | Abnormal closure (connection dropped) |
| 1008 | Policy violation (auth failed) |
| 1011 | Internal server error |
14.3
Advanced
Connection Manager
▼
With multiple clients connected, you need to track all active connections. A Connection Manager is a simple class that holds a list of connected
WebSocket objects and provides connect, disconnect, and broadcast methods.
Python — ConnectionManager
from fastapi import FastAPI, WebSocket from starlette.websockets import WebSocketDisconnect from typing import List class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) async def send_personal(self, message: str, websocket: WebSocket): await websocket.send_text(message) # ── Instantiate ONE manager (shared across all connections) ── manager = ConnectionManager() @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): await manager.connect(websocket) try: while True: data = await websocket.receive_text() # Send back to sender await manager.send_personal(f"You said: {data}", websocket) # Broadcast to everyone else await manager.broadcast(f"Client #{client_id}: {data}") except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"Client #{client_id} left")
This in-memory list only works with a single server process. With multiple workers, use Redis Pub/Sub to share state across processes (shown in the Rooms section).
Rooms
▼
A room is a named group of WebSocket connections. Messages sent to a room only go to clients in that room. Implement rooms with a
dict mapping room names to lists of connections.
Python — Room-based ConnectionManager
from collections import defaultdict class RoomManager: def __init__(self): # {"general": [ws1, ws2], "dev": [ws3]} self.rooms: dict[str, List[WebSocket]] = defaultdict(list) async def join(self, room: str, websocket: WebSocket): await websocket.accept() self.rooms[room].append(websocket) await self.broadcast_to_room(room, f"A new user joined #{room}") def leave(self, room: str, websocket: WebSocket): if websocket in self.rooms[room]: self.rooms[room].remove(websocket) if not self.rooms[room]: del self.rooms[room] # clean up empty room async def broadcast_to_room(self, room: str, message: str): for ws in self.rooms.get(room, []): await ws.send_text(message) def room_count(self, room: str) -> int: return len(self.rooms.get(room, [])) room_manager = RoomManager() @app.websocket("/ws/room/{room_name}") async def room_ws( websocket: WebSocket, room_name: str, username: str = Query("anonymous") ): await room_manager.join(room_name, websocket) try: while True: text = await websocket.receive_text() await room_manager.broadcast_to_room( room_name, f"[{room_name}] {username}: {text}" ) except WebSocketDisconnect: room_manager.leave(room_name, websocket) await room_manager.broadcast_to_room(room_name, f"{username} left") # Connect to room: ws://localhost:8000/ws/room/general?username=alice
Broadcast
▼
Broadcast = send the same message to every connected client. The simple in-memory version works for a single process. For production multi-worker setups, use Redis Pub/Sub so workers can broadcast across processes.
In-memory broadcast (single process) — already in ConnectionManager above. Here's a standalone example with JSON payloads:
Python — JSON broadcast
import json from datetime import datetime connections: List[WebSocket] = [] async def broadcast_json(event_type: str, data: dict): payload = json.dumps({ "type": event_type, "data": data, "ts": datetime.utcnow().isoformat() }) dead = [] for ws in connections: try: await ws.send_text(payload) except: dead.append(ws) # collect failed connections for ws in dead: connections.remove(ws) # clean up dead ones # Trigger a broadcast from an HTTP endpoint (e.g., admin pushes an alert) @app.post("/admin/broadcast") async def admin_broadcast(message: str): await broadcast_json("admin_alert", {"message": message}) return {"sent_to": len(connections)}
Redis Pub/Sub broadcast (multi-worker) — each FastAPI worker subscribes to a Redis channel. When any worker publishes, all workers relay it to their local connections:
Python — Redis Pub/Sub broadcast
# pip install redis asyncio-redis or redis[asyncio] import asyncio import json import redis.asyncio as aioredis redis_client = None local_connections: List[WebSocket] = [] @asynccontextmanager async def lifespan(app): global redis_client redis_client = aioredis.from_url("redis://localhost:6379") # Start subscriber background task asyncio.create_task(redis_subscriber()) yield await redis_client.close() async def redis_subscriber(): "Subscribe to Redis, relay to local WebSocket clients" pubsub = redis_client.pubsub() await pubsub.subscribe("chat") async for message in pubsub.listen(): if message["type"] == "message": text = message["data"].decode() for ws in local_connections: await ws.send_text(text) @app.websocket("/ws") async def ws_endpoint(ws: WebSocket): await ws.accept() local_connections.append(ws) try: while True: msg = await ws.receive_text() # Publish to Redis → ALL workers relay to their clients await redis_client.publish("chat", msg) except WebSocketDisconnect: local_connections.remove(ws)
Redis Pub/Sub is the standard pattern for scaling WebSockets horizontally. Each worker only tracks its own connections; Redis fans out messages across all workers.
Authentication
▼
Browser WebSockets cannot send custom HTTP headers during the handshake. So you can't use
Authorization: Bearer .... The two standard alternatives are: query parameter token or first-message token.
Option A — Token in query param (simple)
Python — query param auth
from fastapi import WebSocket, Query, HTTPException import jwt # pip install pyjwt SECRET = "mysecret" def get_user_from_token(token: str) -> dict: try: payload = jwt.decode(token, SECRET, algorithms=["HS256"]) return payload except jwt.InvalidTokenError: return None @app.websocket("/ws") async def protected_ws( websocket: WebSocket, token: str = Query(None) # ?token=eyJ... ): user = get_user_from_token(token) if not user: await websocket.close(code=1008) # 1008 = policy violation return await websocket.accept() await websocket.send_json({"type": "auth_ok", "user": user["sub"]}) try: while True: msg = await websocket.receive_text() await websocket.send_text(f"[{user['sub']}]: {msg}") except WebSocketDisconnect: pass # JS: new WebSocket("ws://localhost:8000/ws?token=eyJhbGci...")
Option B — First-message token (more secure) — accept the connection, then immediately wait for an auth message:
Python — first-message auth
@app.websocket("/ws/secure") async def secure_ws(websocket: WebSocket): await websocket.accept() # accept first (connection is open) # Wait for the first message — must be an auth token auth_msg = await websocket.receive_json() token = auth_msg.get("token") user = get_user_from_token(token) if not user: await websocket.send_json({"type": "error", "msg": "Unauthorized"}) await websocket.close(code=1008) return await websocket.send_json({"type": "auth_ok"}) try: while True: data = await websocket.receive_json() await websocket.send_json({"echo": data}) except WebSocketDisconnect: pass // JS client: // ws.onopen = () => ws.send(JSON.stringify({token: "eyJhbGci..."}));
Option C — Cookie-based auth — if the user is already logged in with a session cookie:
Python — cookie auth
from fastapi import Cookie @app.websocket("/ws") async def cookie_ws( websocket: WebSocket, session: str = Cookie(None) # reads the "session" cookie ): if not session: await websocket.close(code=1008) return user = validate_session(session) # your session lookup await websocket.accept() await websocket.send_json({"type": "auth_ok", "user": user})
Reconnection
▼
WebSocket connections drop — network hiccups, server restarts, mobile going to sleep. The browser's native
WebSocket does NOT auto-reconnect. You must implement it on the client side.
Client-side auto-reconnect (JavaScript)
JavaScript — reconnect with exponential backoff
class ReconnectingWebSocket { constructor(url, token) { this.url = url; this.token = token; this.retryDelay = 1000; // start with 1 second this.maxDelay = 30000; // max 30 seconds this.ws = null; this.connect(); } connect() { this.ws = new WebSocket(`ws://localhost:8000/ws?token=${this.token}`); this.ws.onopen = () => { console.log("✅ Connected"); this.retryDelay = 1000; // reset delay on success }; this.ws.onmessage = (e) => { console.log("Message:", e.data); }; this.ws.onclose = (e) => { if (e.code === 1008) { // auth failure — don't retry console.log("Auth failed, not reconnecting"); return; } console.log(`Disconnected. Reconnecting in ${this.retryDelay}ms`); setTimeout(() => this.connect(), this.retryDelay); // exponential backoff: 1s → 2s → 4s → ... → 30s this.retryDelay = Math.min(this.retryDelay * 2, this.maxDelay); }; this.ws.onerror = (e) => console.error("WS error:", e); } send(data) { if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send(data); } } } // Usage const socket = new ReconnectingWebSocket("ws://localhost:8000/ws", myJwtToken); socket.send("Hello!");
Server-side: detect stale connections — the server should clean up dead connections proactively:
Python — heartbeat from server
import asyncio @app.websocket("/ws") async def ws_with_heartbeat(ws: WebSocket): await ws.accept() async def heartbeat(): while True: await asyncio.sleep(15) try: await ws.send_json({"type": "ping"}) except: break # connection dead — exit heartbeat loop # Run heartbeat and message handler concurrently hb_task = asyncio.create_task(heartbeat()) try: while True: data = await ws.receive_text() if data == "pong": continue # client responding to ping await ws.send_text(f"Echo: {data}") except WebSocketDisconnect: hb_task.cancel()
The recommended pattern: Uvicorn ws-ping-interval handles OS-level keepalive. A JSON ping/pong message loop handles application-level disconnect detection. Use both in production.
🎯 Putting it all together — a minimal chat app
Python — complete chat server
from fastapi import FastAPI, WebSocket, Query from fastapi.responses import HTMLResponse from starlette.websockets import WebSocketDisconnect from collections import defaultdict from typing import List import json app = FastAPI() class ChatManager: def __init__(self): self.rooms: dict[str, List[WebSocket]] = defaultdict(list) async def join(self, room: str, ws: WebSocket, username: str): await ws.accept() self.rooms[room].append(ws) await self.broadcast(room, "system", f"{username} joined") def leave(self, room: str, ws: WebSocket): self.rooms[room].remove(ws) async def broadcast(self, room: str, sender: str, text: str): msg = json.dumps({"from": sender, "text": text}) dead = [] for ws in self.rooms[room]: try: await ws.send_text(msg) except: dead.append(ws) for ws in dead: self.rooms[room].remove(ws) chat = ChatManager() @app.websocket("/chat/{room}") async def chat_ws( ws: WebSocket, room: str, username: str = Query("anonymous") ): await chat.join(room, ws, username) try: while True: text = await ws.receive_text() await chat.broadcast(room, username, text) except WebSocketDisconnect: chat.leave(room, ws) await chat.broadcast(room, "system", f"{username} left")
Topic 14 — Key Takeaways
✓
WebSocket = persistent, bidirectional connection
Starts as an HTTP upgrade request (101 Switching Protocols). After that, both sides can send frames anytime without waiting for a request.
✓
Always accept() first, then loop receive/send
Use
@app.websocket(), call await websocket.accept(), then while True: receive → process → send. Wrap in try/except WebSocketDisconnect.✓
ConnectionManager tracks active connections
A shared singleton with a list of WebSocket objects.
connect(), disconnect(), broadcast(), and broadcast_to_room() are the four key methods.✓
Auth via query param or first-message token
Browsers can't send Authorization headers with WebSockets. Use
?token=... in the URL or send the token as the very first JSON message after connecting.✓
Scale with Redis Pub/Sub; reconnect with exponential backoff
Single-process: in-memory list. Multi-worker: Redis channel where every worker subscribes and relays. Client-side reconnect: retry with 1s → 2s → 4s backoff, skip retry on auth failure (code 1008).