FastAPI Mastery
Topic 22 — Final Topic 🎓
AI Application Patterns with FastAPI
FastAPI is the go-to framework for building AI backends. This topic covers the four pillars of production AI APIs: wiring LLM endpoints, streaming tokens to clients, building agent systems with tool-calling, and implementing RAG (Retrieval-Augmented Generation) pipelines.
22.1
LLM APIs
đŸ’Ŧ
Chat Endpoints
â–ŧ

A chat endpoint accepts a conversation history (a list of messages with roles like user, assistant, system) and returns the next assistant reply. This mirrors how OpenAI, Anthropic, and other LLM providers work under the hood.

👤 Client
sends messages[]
→
⚡ FastAPI
/chat endpoint
→
🤖 LLM
OpenAI / Claude
→
⚡ FastAPI
formats response
→
👤 Client
gets reply
python — chat endpoint with Pydantic models
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Literal
import openai

app = FastAPI()
client = openai.AsyncOpenAI()  # uses OPENAI_API_KEY env var

# ── Pydantic schemas ──────────────────────────────────────
class Message(BaseModel):
    role: Literal["user", "assistant", "system"]
    content: str

class ChatRequest(BaseModel):
    messages: list[Message]
    model: str = "gpt-4o-mini"
    temperature: float = 0.7
    max_tokens: int = 1024

class ChatResponse(BaseModel):
    reply: str
    model: str
    usage: dict

# ── Endpoint ──────────────────────────────────────────────
@app.post("/chat", response_model=ChatResponse)
async def chat(body: ChatRequest):
    response = await client.chat.completions.create(
        model=body.model,
        messages=[m.model_dump() for m in body.messages],
        temperature=body.temperature,
        max_tokens=body.max_tokens,
    )
    return ChatResponse(
        reply=response.choices[0].message.content,
        model=response.model,
        usage=dict(response.usage)
    )
💡
Always inject a system message server-side to set the AI's persona and constraints. Never let clients control it — they could override your safety instructions. Add it as the first message before forwarding to the LLM.
📝
Completion Endpoints
â–ŧ

A completion endpoint takes a plain text prompt and returns generated text — no conversation history needed. Great for one-shot tasks: summarization, classification, extraction, code generation. Think of it as "prompt in → answer out".

python — versatile completion endpoint
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from enum import Enum

class TaskType(str, Enum):
    summarize = "summarize"
    classify  = "classify"
    extract   = "extract"
    translate = "translate"

class CompletionRequest(BaseModel):
    prompt: str = Field(..., min_length=1, max_length=10000)
    task: TaskType = TaskType.summarize
    target_language: str | None = None  # for translate task

# System prompts per task — defined server-side, not by client!
SYSTEM_PROMPTS = {
    TaskType.summarize: "Summarize the given text concisely in 2-3 sentences.",
    TaskType.classify:  "Classify the sentiment as POSITIVE, NEGATIVE, or NEUTRAL.",
    TaskType.extract:   "Extract key facts as a JSON list of {fact, source} objects.",
    TaskType.translate: "Translate the text to {language}. Output only the translation.",
}

@app.post("/complete")
async def complete(body: CompletionRequest):
    system = SYSTEM_PROMPTS[body.task]
    if body.task == TaskType.translate:
        if not body.target_language:
            raise HTTPException(400, "target_language is required for translate")
        system = system.format(language=body.target_language)

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system},
            {"role": "user",   "content": body.prompt},
        ]
    )
    return {"result": response.choices[0].message.content}
â„šī¸
The key pattern here is server-controlled system prompts. By mapping task enum values to pre-defined prompts, you get consistent, safe behavior. Your API is the single source of truth for how the LLM behaves.
đŸ”ĸ
Embedding Endpoints
â–ŧ

Embeddings convert text into a fixed-size vector of numbers (e.g. 1536 floats for text-embedding-3-small). Semantically similar texts have vectors that are close together. Embeddings power search, recommendations, clustering, and RAG.

python — embedding endpoint (single + batch)
from fastapi import FastAPI
from pydantic import BaseModel
import openai, asyncio

class EmbedRequest(BaseModel):
    texts: list[str]             # send one or many at once
    model: str = "text-embedding-3-small"

class EmbedResponse(BaseModel):
    embeddings: list[list[float]]  # one vector per input text
    dimensions: int
    model: str

@app.post("/embed", response_model=EmbedResponse)
async def embed(body: EmbedRequest):
    # OpenAI accepts up to 2048 texts in one call
    response = await client.embeddings.create(
        model=body.model,
        input=body.texts,
    )
    vectors = [item.embedding for item in response.data]
    return EmbedResponse(
        embeddings=vectors,
        dimensions=len(vectors[0]),
        model=response.model,
    )

# Usage — check similarity between two texts
import numpy as np

def cosine_similarity(a: list[float], b: list[float]) -> float:
    a, b = np.array(a), np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
ModelDimensionsUse Case
text-embedding-3-small1536Fast, cheap, great for most tasks
text-embedding-3-large3072Higher accuracy tasks
nomic-embed-text (local)768Free, runs on your own server
⚡
Always batch your embedding calls. Instead of calling the API once per document, send all texts in one request. This is 10–100× faster and dramatically reduces API costs.
22.2
AI Streaming
🌊
Token Streaming
â–ŧ

Without streaming, users wait for the entire LLM response before seeing anything — often 5–30 seconds. With token streaming, the LLM sends each word as it's generated. Users see output appear in real time, dramatically improving perceived responsiveness.

The LLM API sends chunks via a chunked HTTP response. FastAPI uses StreamingResponse with an async generator to forward those chunks immediately to the client.

Token stream demo (click to replay):

█
python — streaming chat endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai, json

@app.post("/chat/stream")
async def chat_stream(body: ChatRequest):
    # Async generator: yields each chunk as it arrives from OpenAI
    async def generate():
        stream = await client.chat.completions.create(
            model=body.model,
            messages=[m.model_dump() for m in body.messages],
            stream=True,   # ← this enables streaming!
        )
        async for chunk in stream:
            delta = chunk.choices[0].delta.content
            if delta is not None:
                # Send each token as a JSON line (NDJSON format)
                yield json.dumps({"token": delta}) + "\n"
        # Signal stream end
        yield json.dumps({"done": True}) + "\n"

    return StreamingResponse(
        generate(),
        media_type="application/x-ndjson"  # newline-delimited JSON
    )

# ── JavaScript client-side consumption ───────────────────
# const response = await fetch("/chat/stream", { method: "POST", body: ... })
# const reader = response.body.getReader();
# const decoder = new TextDecoder();
# while (true) {
#   const { done, value } = await reader.read();
#   if (done) break;
#   const lines = decoder.decode(value).split("\n").filter(Boolean);
#   for (const line of lines) {
#     const { token } = JSON.parse(line);
#     if (token) appendToUI(token);
#   }
# }
📡
SSE Streaming
â–ŧ

Server-Sent Events (SSE) is a browser-native streaming format. The client uses the EventSource API and the server sends data: ...\n\n formatted messages. SSE is the format used by OpenAI's own API and most chat interfaces (ChatGPT, Claude.ai).

📌
NDJSON vs SSE: NDJSON works with fetch() (full control, works with POST). SSE works with EventSource (browser-native reconnect, GET only). For AI chat, NDJSON + fetch is usually better because you need to send POST bodies with messages.
python — SSE streaming endpoint
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio

async def sse_event(data: str, event: str = None) -> str:
    """Format a Server-Sent Event string."""
    lines = []
    if event:
        lines.append(f"event: {event}")
    lines.append(f"data: {data}")
    lines.append("")  # blank line terminates the event
    return "\n".join(lines) + "\n"

@app.get("/chat/sse")
async def chat_sse(prompt: str, request: Request):
    async def generate():
        stream = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            stream=True,
        )
        async for chunk in stream:
            # Check if client disconnected
            if await request.is_disconnected():
                break
            delta = chunk.choices[0].delta.content
            if delta:
                yield await sse_event(
                    data=json.dumps({"token": delta}),
                    event="token"
                )
        yield await sse_event(data="[DONE]", event="done")

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # disable nginx buffering!
        }
    )

# Client: const source = new EventSource("/chat/sse?prompt=Hello");
# source.addEventListener("token", e => appendText(JSON.parse(e.data).token));
# source.addEventListener("done", () => source.close());
âš ī¸
Add X-Accel-Buffering: no to response headers. Without it, Nginx will buffer the entire response and only send it when the stream is complete — completely defeating the purpose of streaming!
🔌
WebSocket Streaming
â–ŧ

For interactive AI applications (voice interfaces, real-time collaboration, multi-turn chat with interruption), WebSockets are the best choice. Unlike SSE, WebSockets are bidirectional — the client can send new messages while the AI is still responding.

python — WebSocket chat with token streaming
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json

@app.websocket("/ws/chat")
async def ws_chat(websocket: WebSocket):
    await websocket.accept()
    # Keep track of conversation history
    history: list[dict] = []

    try:
        while True:
            # 1. Receive user message
            data = await websocket.receive_text()
            user_msg = json.loads(data)["message"]
            history.append({"role": "user", "content": user_msg})

            # 2. Stream LLM response back token by token
            full_response = ""
            stream = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=history,
                stream=True
            )
            async for chunk in stream:
                delta = chunk.choices[0].delta.content
                if delta:
                    full_response += delta
                    await websocket.send_json({
                        "type": "token",
                        "content": delta
                    })

            # 3. Signal end; add assistant reply to history
            await websocket.send_json({"type": "done"})
            history.append({"role": "assistant", "content": full_response})

    except WebSocketDisconnect:
        pass  # Client disconnected cleanly
MethodDirectionBest ForClient API
NDJSON via fetchServer → ClientMost AI chat UIsfetch() + ReadableStream
SSE (EventSource)Server → ClientDashboard feeds, GET streamsnew EventSource(url)
WebSocketBidirectionalVoice, real-time collabnew WebSocket(url)
22.3
Agent APIs
🔧
Tool Calling APIs
â–ŧ

Tool calling (also called "function calling") lets the LLM request execution of real functions. You define tools with JSON schemas, the model decides when to use them, and your FastAPI backend runs the actual code and returns the result. This is how AI agents take actions in the real world.

👤
User sends message
"What's the weather in Tokyo and book me a flight there?"
↓
🤖
LLM decides: use tool
Returns tool_call: get_weather(city="Tokyo")
↓
⚡
FastAPI executes tool
Calls actual weather API, gets back JSON result
↓
🤖
LLM receives result, may call more tools
Uses weather data, then calls book_flight(destination="Tokyo", ...)
↓
đŸ’Ŧ
LLM returns final answer
"Tokyo is 22°C and sunny. I've booked your flight — confirmation #TX449."
python — tool calling agent loop
from fastapi import FastAPI
from pydantic import BaseModel
import openai, json

# ── Define tools as JSON schemas ───────────────────────────
TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather for a city",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {"type": "string", "description": "City name"},
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
                },
                "required": ["city"]
            }
        }
    }
]

# ── Actual tool implementations ────────────────────────────
async def get_weather(city: str, unit: str = "celsius") -> dict:
    # In production: call a real weather API
    return {"city": city, "temp": 22, "unit": unit, "condition": "sunny"}

TOOL_MAP = {"get_weather": get_weather}

# ── Agent loop ─────────────────────────────────────────────
@app.post("/agent")
async def agent(body: ChatRequest):
    messages = [m.model_dump() for m in body.messages]

    while True:  # loop until LLM gives final answer (no more tool calls)
        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=TOOLS,
            tool_choice="auto"
        )
        msg = response.choices[0].message

        # No tool calls → final answer, return it
        if not msg.tool_calls:
            return {"reply": msg.content}

        # Append assistant's tool-call message to history
        messages.append(msg)

        # Execute each requested tool and append results
        for tc in msg.tool_calls:
            fn_name = tc.function.name
            fn_args = json.loads(tc.function.arguments)
            result  = await TOOL_MAP[fn_name](**fn_args)

            messages.append({
                "role":         "tool",
                "tool_call_id": tc.id,
                "content":      json.dumps(result)
            })
âš ī¸
Security: The LLM controls which tools get called and with what arguments. Always validate inputs in your tool functions, enforce authorization (does this user have permission to book flights?), and set a max iteration limit on the agent loop to prevent infinite loops.
🧠
Agent State APIs
â–ŧ

Long-running agents need persistent state. The agent may need to pause, resume, or be queried mid-task. Agent State APIs give clients visibility and control over what the agent is doing, and persist conversation + tool history across multiple API calls.

python — stateful agent with session persistence
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from enum import Enum
import uuid, redis.asyncio as redis, json

class AgentStatus(str, Enum):
    idle    = "idle"
    running = "running"
    waiting = "waiting"  # waiting for human input
    done    = "done"
    error   = "error"

class AgentSession(BaseModel):
    session_id: str
    status: AgentStatus
    messages: list[dict]  # full history
    current_task: str | None = None

redis_client = redis.Redis(host="localhost", decode_responses=True)

# Create a new agent session
@app.post("/agent/sessions")
async def create_session(initial_prompt: str):
    session_id = str(uuid.uuid4())
    session = AgentSession(
        session_id=session_id,
        status=AgentStatus.idle,
        messages=[{"role": "user", "content": initial_prompt}]
    )
    await redis_client.setex(
        f"session:{session_id}", 3600,  # 1hr TTL
        session.model_dump_json()
    )
    return {"session_id": session_id}

# Get current agent state
@app.get("/agent/sessions/{session_id}")
async def get_session(session_id: str):
    data = await redis_client.get(f"session:{session_id}")
    if not data:
        raise HTTPException(404, "Session not found or expired")
    return AgentSession.model_validate_json(data)

# Send a message to an existing session
@app.post("/agent/sessions/{session_id}/messages")
async def send_message(session_id: str, message: str):
    session = await get_session(session_id)
    session.messages.append({"role": "user", "content": message})
    session.status = AgentStatus.running
    # Persist and dispatch to background task ...
    return {"status": "queued"}
🔄
Workflow APIs
â–ŧ

A workflow is a sequence of AI steps where each step's output feeds into the next. Unlike a single agent loop, workflows have a defined structure: Step 1 → Step 2 → Step 3. They're great for complex multi-stage tasks: research → outline → draft → review.

python — multi-step AI workflow endpoint
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio

class BlogRequest(BaseModel):
    topic: str
    target_audience: str
    word_count: int = 800

class BlogResponse(BaseModel):
    research:  str  # Step 1 output
    outline:   str  # Step 2 output
    draft:     str  # Step 3 output
    final:     str  # Step 4 output

async def llm(system: str, prompt: str) -> str:
    """Thin helper to call LLM with a system + user message."""
    r = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system},
            {"role": "user",   "content": prompt},
        ]
    )
    return r.choices[0].message.content

@app.post("/workflow/blog", response_model=BlogResponse)
async def blog_workflow(body: BlogRequest):
    # Step 1: Research key points
    research = await llm(
        "You are a research assistant. List 5 key facts about the topic.",
        f"Topic: {body.topic} | Audience: {body.target_audience}"
    )

    # Step 2: Create outline using research
    outline = await llm(
        "You are a content strategist. Create a blog outline.",
        f"Topic: {body.topic}\nResearch:\n{research}"
    )

    # Step 3: Write draft based on outline
    draft = await llm(
        f"Write a {body.word_count}-word blog post for {body.target_audience}.",
        f"Outline:\n{outline}\nResearch:\n{research}"
    )

    # Step 4: Polish the draft
    final = await llm(
        "Polish this blog post: fix grammar, improve flow, add a strong CTA.",
        draft
    )

    return BlogResponse(
        research=research, outline=outline, draft=draft, final=final
    )
⚡
When workflow steps are independent (don't need each other's outputs), run them in parallel with asyncio.gather(step1(), step2(), step3()). This can cut workflow time by 2–3×.
22.4
RAG APIs

RAG (Retrieval-Augmented Generation) solves the LLM's biggest limitation: it doesn't know your private data or recent events. RAG works in two phases: indexing (embed your documents and store vectors) and retrieval + generation (find relevant chunks, inject into prompt, get accurate answer).

📄
Document Upload
â–ŧ

The first step is getting documents into your system. Users upload PDFs, text files, or raw text. The document gets stored in object storage (S3) or locally, and its content is extracted for the next pipeline step.

1
User uploads file
PDF, DOCX, TXT via multipart form upload to FastAPI endpoint
2
Extract text
Parse PDF with pypdf, DOCX with python-docx, etc. Get raw text content.
3
Chunk processing
Split text into overlapping chunks (~500 tokens each)
4
Embed chunks
Call embedding API for each chunk, get float vectors
5
Store in vector DB
Save (chunk text + vector + metadata) into Pinecone, Weaviate, pgvector
python — document upload and indexing endpoint
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from pydantic import BaseModel
import pypdf, io, uuid

class UploadResponse(BaseModel):
    document_id: str
    filename: str
    status: str  # "queued" — indexing happens in background

@app.post("/documents", response_model=UploadResponse)
async def upload_document(
    file: UploadFile = File(...),
    background_tasks: BackgroundTasks = None
):
    contents = await file.read()
    doc_id   = str(uuid.uuid4())

    # Extract text based on file type
    if file.content_type == "application/pdf":
        reader = pypdf.PdfReader(io.BytesIO(contents))
        text   = " ".join(page.extract_text() for page in reader.pages)
    elif file.content_type == "text/plain":
        text = contents.decode()
    else:
        raise HTTPException(400, "Unsupported file type")

    # Index in background so upload feels instant
    background_tasks.add_task(index_document, doc_id, file.filename, text)

    return UploadResponse(
        document_id=doc_id,
        filename=file.filename,
        status="queued"
    )
âœ‚ī¸
Chunk Processing
â–ŧ

LLMs have context limits. You can't embed a 100-page PDF as one unit. Chunking splits the document into smaller pieces that fit in an embedding model. The art is finding the right chunk size and overlap — too small loses context, too large loses precision.

python — chunking + batch embedding
from dataclasses import dataclass

@dataclass
class Chunk:
    doc_id:   str
    chunk_id: str
    text:     str
    start:    int   # character position in source
    metadata: dict

def split_into_chunks(
    text: str,
    doc_id: str,
    chunk_size: int = 500,   # tokens (~2000 chars)
    overlap:    int = 50,    # overlap to preserve context
) -> list[Chunk]:
    words   = text.split()
    chunks  = []
    i       = 0

    while i < len(words):
        window      = words[i : i + chunk_size]
        chunk_text  = " ".join(window)
        chunk_start = len(" ".join(words[:i]))

        chunks.append(Chunk(
            doc_id   = doc_id,
            chunk_id = f"{doc_id}_{i}",
            text     = chunk_text,
            start    = chunk_start,
            metadata = {"word_index": i, "word_count": len(window)}
        ))
        i += chunk_size - overlap  # slide window with overlap

    return chunks

async def index_document(doc_id: str, filename: str, text: str):
    # 1. Split into chunks
    chunks = split_into_chunks(text, doc_id)

    # 2. Embed ALL chunks in one API call (batch)
    texts_to_embed = [c.text for c in chunks]
    embed_response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=texts_to_embed
    )
    vectors = [item.embedding for item in embed_response.data]

    # 3. Store in vector DB (pseudocode — depends on your DB choice)
    for chunk, vector in zip(chunks, vectors):
        await vector_db.upsert(
            id=chunk.chunk_id,
            values=vector,
            metadata={"text": chunk.text, "doc_id": doc_id, "filename": filename}
        )
Chunk StrategyWhen to Use
Fixed-size (500 words + overlap)General purpose, most documents
Sentence / paragraph boundariesWhen semantic units matter (articles, books)
Recursive splittingCode files (split by function/class)
Semantic chunkingGroup sentences by topic shift (most accurate, slowest)
🔍
Retrieval Endpoint
â–ŧ

The retrieval endpoint takes a search query, embeds it, and finds the most similar chunks in the vector database using cosine similarity. It's the "search" engine of your RAG system.

python — semantic search / retrieval endpoint
from fastapi import FastAPI
from pydantic import BaseModel

class SearchResult(BaseModel):
    chunk_id:   str
    text:       str
    score:      float   # cosine similarity (0-1, higher = more similar)
    doc_id:     str
    filename:   str

class SearchResponse(BaseModel):
    query:   str
    results: list[SearchResult]

@app.get("/search", response_model=SearchResponse)
async def search(q: str, top_k: int = 5, doc_id: str | None = None):
    # 1. Embed the search query
    embed_response = await client.embeddings.create(
        model="text-embedding-3-small",
        input=[q]
    )
    query_vector = embed_response.data[0].embedding

    # 2. Search vector DB for nearest neighbors
    filter = {"doc_id": doc_id} if doc_id else {}
    matches = await vector_db.query(
        vector=query_vector,
        top_k=top_k,
        include_metadata=True,
        filter=filter
    )

    # 3. Format results
    results = [
        SearchResult(
            chunk_id=m.id,
            text=m.metadata["text"],
            score=m.score,
            doc_id=m.metadata["doc_id"],
            filename=m.metadata["filename"]
        )
        for m in matches
    ]
    return SearchResponse(query=q, results=results)
💡
Hybrid search — combining vector search with keyword search (BM25) — consistently outperforms either alone. Most production RAG systems use both and merge the results. Libraries like rank-bm25 handle the keyword side.
đŸ’Ŧ
Chat Endpoint (RAG)
â–ŧ

The RAG chat endpoint ties it all together: retrieve relevant chunks, inject them as context into the system prompt, then let the LLM answer the user's question based on that context. This is how "Chat with your PDF" products work.

python — complete RAG chat endpoint
from fastapi import FastAPI
from pydantic import BaseModel

class RAGRequest(BaseModel):
    question: str
    doc_id:   str | None = None  # restrict to one document
    top_k:    int = 5

class RAGResponse(BaseModel):
    answer:  str
    sources: list[dict]  # chunks used to generate the answer

@app.post("/rag/chat", response_model=RAGResponse)
async def rag_chat(body: RAGRequest):
    # ── Step 1: Retrieve relevant chunks ─────────────────────
    embed_r = await client.embeddings.create(
        model="text-embedding-3-small", input=[body.question]
    )
    query_vec = embed_r.data[0].embedding

    matches = await vector_db.query(
        vector=query_vec, top_k=body.top_k,
        include_metadata=True,
        filter={"doc_id": body.doc_id} if body.doc_id else {}
    )

    # ── Step 2: Build context from retrieved chunks ───────────
    context_parts = []
    sources       = []

    for i, match in enumerate(matches, 1):
        context_parts.append(f"[Source {i}] {match.metadata['text']}")
        sources.append({
            "rank":     i,
            "filename": match.metadata["filename"],
            "score":    round(match.score, 3),
            "excerpt":  match.metadata["text"][:200] + "..."
        })

    context = "\n\n".join(context_parts)

    # ── Step 3: Call LLM with context in system prompt ───────
    system = f"""You are a helpful assistant. Answer the user's question
ONLY using the provided context below. If the answer is not in the context,
say "I don't have enough information in the provided documents."

CONTEXT:
{context}"""

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system},
            {"role": "user",   "content": body.question},
        ]
    )

    return RAGResponse(
        answer=response.choices[0].message.content,
        sources=sources
    )
🏆
Always return sources. Showing users which document chunks were used to generate the answer builds trust and lets them verify accuracy. It also makes it much easier to debug when the RAG system returns wrong answers.
đŸ—„ī¸
Popular vector databases: pgvector (Postgres extension, great if you already use Postgres), Pinecone (managed, scales to billions of vectors), Weaviate (open source, hybrid search built-in), Chroma (local-first, great for development/prototyping).
🎓
🎉 Congratulations — FastAPI Mastery Complete!
You've now covered all 22 topics from Python internals to production AI systems. You can build:
  • High-performance async APIs with full type safety
  • Secure authentication & authorization systems
  • Production deployments with Nginx + Gunicorn + Uvicorn
  • Real-time WebSocket & streaming applications
  • AI-powered backends: LLM APIs, agent systems, and RAG pipelines