FastAPI Mastery
Topic 13 of 22 — Background Processing
59.1% complete
Topic 13
Background Processing ⚙️
Sometimes you need to do work after sending a response — like sending an email, writing audit logs, or resizing an image. FastAPI's BackgroundTasks lets you queue functions that run after the HTTP response is sent, so the user never waits. This topic also covers scheduling patterns for delayed and retried tasks.
How Background Tasks Work
Client FastAPI Server │ │ │──── POST /register ───────▶│ │ │ 1. Validate request │ │ 2. Save user to DB │ │ 3. Queue background task (send_welcome_email) │◀─── 201 Created ──────────│ 4. Return response immediately ✅ │ │ │ │ 5. [After response] Run send_welcome_email() │ │ ↳ Connect to SMTP │ │ ↳ Send email │ │ ↳ Done (user already got 201)
💡
The key insight: the response goes out first, then the background task runs. The client never waits for the email to send.
13.1 BackgroundTasks
📧
Email Jobs
The most classic background task: send a welcome email after registration. You don't want users to wait 2 seconds while FastAPI connects to SMTP. Queue it as a background task so it runs after the 201 response is sent.
Step 1 — Inject BackgroundTasks as a parameter
Python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel

app = FastAPI()

# ── The background task function (plain Python function) ──
def send_welcome_email(email: str, username: str):
    # In real code: connect to SMTP, render template, send
    print(f"Sending welcome email to {email} for {username}")
    # imagine: smtp.send(to=email, subject="Welcome!", body=...)

class UserCreate(BaseModel):
    username: str
    email: str

# ── Route: inject BackgroundTasks ──
@app.post("/register", status_code=201)
async def register(
    user: UserCreate,
    background_tasks: BackgroundTasks   # FastAPI injects this
):
    # 1. Save user to DB (pretend)
    new_user = {"id": 1, "username": user.username, "email": user.email}

    # 2. Queue the email — doesn't run yet!
    background_tasks.add_task(send_welcome_email, user.email, user.username)

    # 3. Return immediately — email runs AFTER this
    return {"message": "User created", "user": new_user}
add_task(fn, *args, **kwargs) — the first argument is the function itself. All remaining positional and keyword args are forwarded to it.
Step 2 — Using async background tasks — If your task does I/O (like calling an async email client), define it as async def:
Python — async background task
import asyncio

async def send_welcome_email_async(email: str, username: str):
    await asyncio.sleep(0.5)   # simulate async SMTP call
    print(f"[ASYNC] Email sent to {email}")

@app.post("/register-async", status_code=201)
async def register_async(user: UserCreate, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_welcome_email_async, user.email, user.username)
    return {"message": "Registered"}
Step 3 — Real-world email with FastAPI-Mail
Python — production email
# pip install fastapi-mail
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig

conf = ConnectionConfig(
    MAIL_USERNAME="you@example.com",
    MAIL_PASSWORD="secret",
    MAIL_FROM="you@example.com",
    MAIL_PORT=587,
    MAIL_SERVER="smtp.gmail.com",
    MAIL_STARTTLS=True,
    MAIL_SSL_TLS=False,
)

async def send_welcome_email(email: str, username: str):
    message = MessageSchema(
        subject="Welcome!",
        recipients=[email],
        body=f"Hi {username}, welcome aboard!",
        subtype="plain"
    )
    fm = FastMail(conf)
    await fm.send_message(message)

@app.post("/register")
async def register(user: UserCreate, bt: BackgroundTasks):
    bt.add_task(send_welcome_email, user.email, user.username)
    return {"status": "ok"}
⚠️
BackgroundTasks runs in the same process. If your server restarts mid-email, the task is lost. For critical tasks, use a proper job queue like Celery or ARQ (covered in 13.2).
📋
Audit Logging
Audit logging means recording who did what and when — e.g., "User 42 deleted record 99 at 14:32". It must not slow down the API response, making it a perfect background task use case.
Basic audit log function
Python
import json
from datetime import datetime, timezone

# ── Simple file-based audit log ──
def write_audit_log(user_id: int, action: str, resource: str, detail: dict = {}):
    entry = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "user_id": user_id,
        "action": action,       # e.g. "DELETE"
        "resource": resource,   # e.g. "invoice:99"
        "detail": detail,
    }
    # In production: write to DB, Elasticsearch, or a log service
    with open("audit.log", "a") as f:
        f.write(json.dumps(entry) + "\n")

# ── Route that logs every deletion ──
@app.delete("/invoices/{invoice_id}")
async def delete_invoice(
    invoice_id: int,
    background_tasks: BackgroundTasks,
    current_user_id: int = 42   # normally from JWT
):
    # 1. Delete from DB
    print(f"Deleting invoice {invoice_id}")

    # 2. Queue the audit log — never blocks the response
    background_tasks.add_task(
        write_audit_log,
        user_id=current_user_id,
        action="DELETE",
        resource=f"invoice:{invoice_id}",
        detail={"invoice_id": invoice_id}
    )

    return {"deleted": invoice_id}
Audit logging via DB (async) — writing to a database instead of a file:
Python — async DB audit log
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, DateTime, JSON
from datetime import datetime, timezone

engine = create_async_engine("sqlite+aiosqlite:///./audit.db")
SessionLocal = async_sessionmaker(engine)

class Base(DeclarativeBase): pass

class AuditLog(Base):
    __tablename__ = "audit_logs"
    id: Mapped[int] = mapped_column(primary_key=True)
    timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True))
    user_id: Mapped[int]
    action: Mapped[str] = mapped_column(String(50))
    resource: Mapped[str] = mapped_column(String(200))

async def log_to_db(user_id: int, action: str, resource: str):
    async with SessionLocal() as session:
        log = AuditLog(
            timestamp=datetime.now(timezone.utc),
            user_id=user_id,
            action=action,
            resource=resource
        )
        session.add(log)
        await session.commit()

@app.delete("/users/{user_id}")
async def delete_user(user_id: int, bt: BackgroundTasks):
    # ... delete logic ...
    bt.add_task(log_to_db, user_id=1, action="DELETE_USER", resource=f"user:{user_id}")
    return {"deleted": user_id}
Multiple background tasks in one request — you can queue as many as you need:
Python — multiple tasks
@app.post("/orders")
async def create_order(order: dict, bt: BackgroundTasks):
    order_id = 99  # from DB insert

    # Queue multiple tasks — they run sequentially after response
    bt.add_task(send_order_confirmation_email, order["email"], order_id)
    bt.add_task(write_audit_log, user_id=1, action="CREATE_ORDER", resource=f"order:{order_id}")
    bt.add_task(notify_warehouse_system, order_id)

    return {"order_id": order_id, "status": "created"}
Tasks run in the order they were added, one after another. They do not run in parallel.
Passing BackgroundTasks through dependencies — in larger apps you inject it into service classes:
Python — DI pattern
from fastapi import Depends class OrderService: def __init__(self, background_tasks: BackgroundTasks): self.bt = background_tasks async def create(self, order: dict) -> dict: order_id = 99 # queue tasks from inside the service self.bt.add_task(send_order_confirmation_email, order["email"], order_id) return {"order_id": order_id} @app.post("/orders") async def create_order( order: dict, service: OrderService = Depends(OrderService) ): return await service.create(order)
13.2 Scheduling Concepts
BackgroundTasks runs immediately after the response. But sometimes you need tasks that run later (delayed tasks) or that retry on failure. For these, you need a dedicated task queue. Let's understand the concepts and then see practical implementations.
BackgroundTasks

Runs inside FastAPI process. Simple, no setup. Best for: non-critical fire-and-forget tasks. No retry, no delay, lost on crash.

Task Queue (Celery/ARQ)

Separate worker process + message broker (Redis/RabbitMQ). Supports retries, delays, scheduling, persistence, monitoring.

⏱️
Delayed Tasks
A delayed task runs at a specific time in the future — e.g., "send a reminder email in 24 hours" or "expire this token after 1 hour". Native BackgroundTasks can't do this (it runs immediately). You need a scheduler.
Option A — Simple delay with asyncio.sleep (in-process) — works for short delays but blocks the worker if the process restarts:
Python — asyncio delay (simple)
import asyncio
from fastapi import BackgroundTasks

async def send_reminder_after_delay(email: str, delay_seconds: int):
    await asyncio.sleep(delay_seconds)    # wait before doing work
    print(f"Reminder sent to {email}")

@app.post("/subscribe")
async def subscribe(email: str, bt: BackgroundTasks):
    # Will send a reminder in 1 hour (3600 seconds)
    bt.add_task(send_reminder_after_delay, email, delay_seconds=3600)
    return {"message": "Subscribed! Reminder in 1 hour."}
⚠️
asyncio.sleep in a background task holds a coroutine open for the entire delay. For delays over a few seconds, use a proper scheduler like APScheduler or ARQ.
Option B — APScheduler (production-grade in-process scheduler)
Python — APScheduler
# pip install apscheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta, timezone
from contextlib import asynccontextmanager

scheduler = AsyncIOScheduler()

@asynccontextmanager
async def lifespan(app):
    scheduler.start()     # start the scheduler on startup
    yield
    scheduler.shutdown()  # clean up on shutdown

app = FastAPI(lifespan=lifespan)

async def send_reminder(email: str):
    print(f"Reminder sent to {email}")

@app.post("/subscribe")
async def subscribe(email: str):
    # Schedule reminder 1 hour from now
    run_at = datetime.now(timezone.utc) + timedelta(hours=1)
    scheduler.add_job(
        send_reminder,
        trigger=DateTrigger(run_date=run_at),
        args=[email],
        id=f"reminder_{email}"  # unique job ID
    )
    return {"scheduled_for": run_at.isoformat()}

# ── Recurring schedule (cron-style) ──
from apscheduler.triggers.cron import CronTrigger

async def daily_report():
    print("Generating daily report...")

# Add this in lifespan before yield:
# scheduler.add_job(daily_report, CronTrigger(hour=8, minute=0))  # every day at 8am
Option C — ARQ (Redis-backed, production) — for tasks that must survive server restarts:
Python — ARQ task queue
# pip install arq redis
# Needs: Redis running (docker run -p 6379:6379 redis)
from arq import create_pool
from arq.connections import RedisSettings

# ── Task definition (in tasks.py) ──
async def send_reminder_task(ctx, email: str):
    print(f"Sending reminder to {email}")
    return "done"

# ── Worker settings (in worker.py) ──
class WorkerSettings:
    functions = [send_reminder_task]
    redis_settings = RedisSettings()

# ── FastAPI integration ──
redis_pool = None

@asynccontextmanager
async def lifespan(app):
    global redis_pool
    redis_pool = await create_pool(RedisSettings())
    yield
    await redis_pool.close()

app = FastAPI(lifespan=lifespan)

@app.post("/subscribe")
async def subscribe(email: str):
    # Enqueue with a 1-hour delay (_defer_by in seconds)
    await redis_pool.enqueue_job(
        "send_reminder_task",
        email,
        _defer_by=3600   # delay by 3600 seconds = 1 hour
    )
    return {"queued": True}

# Run the worker separately: python -m arq worker.WorkerSettings
💡
ARQ stores the task in Redis. Even if FastAPI restarts, the task survives and the worker picks it up when it's due.
🔄
Retries
Retries mean: if a background task fails (network error, SMTP down, etc.), try again automatically. Native BackgroundTasks does not retry — if it fails, it's gone. You need ARQ or Celery for real retries.
Manual retry wrapper (simple approach)
Python — manual retry
import asyncio async def send_email_with_retry(email: str, max_retries: int = 3): for attempt in range(max_retries): try: # Try sending the email await do_send_email(email) print(f"Email sent on attempt {attempt + 1}") return # Success — exit except Exception as e: wait = 2 ** attempt # exponential backoff: 1s, 2s, 4s print(f"Attempt {attempt+1} failed: {e}. Retrying in {wait}s") if attempt + 1 < max_retries: await asyncio.sleep(wait) print(f"All {max_retries} attempts failed for {email}") async def do_send_email(email: str): # Simulate a flaky SMTP server import random if random.random() < 0.7: # 70% chance of failure raise ConnectionError("SMTP unavailable") print(f"✓ Email sent to {email}")
ARQ built-in retry support — the cleanest production approach:
Python — ARQ retries
from arq import Retry

async def send_reminder_task(ctx, email: str):
    try:
        await do_send_email(email)
    except Exception as e:
        print(f"Task failed: {e}")
        # Raise Retry to tell ARQ to try again in 60 seconds
        raise Retry(defer=60)   # retry after 60s

class WorkerSettings:
    functions = [send_reminder_task]
    max_tries = 5    # total attempts before giving up
    redis_settings = RedisSettings()
Celery retry (the most feature-rich option)
Python — Celery with FastAPI
# pip install celery redis
from celery import Celery

# ── celery_app.py ──
celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1"
)
celery_app.conf.task_serializer = "json"

# ── tasks.py ──
@celery_app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=60    # 60 seconds between retries
)
def send_email_task(self, email: str):
    try:
        do_send_email_sync(email)  # Celery tasks are sync
    except Exception as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)
        # exponential backoff: 1s, 2s, 4s, 8s, 16s

# ── FastAPI main.py ──
from .celery_app import celery_app
from .tasks import send_email_task

@app.post("/register")
async def register(user: UserCreate):
    # Enqueue — Celery worker handles retry independently
    send_email_task.delay(user.email)
    return {"status": "queued"}

# Run worker: celery -A celery_app worker --loglevel=info
# Schedule delayed: send_email_task.apply_async(args=[email], countdown=3600)
Exponential backoff means each retry waits longer than the last: 1s → 2s → 4s → 8s. This gives flaky services time to recover without hammering them.
Choosing the right tool
Use Case Tool Why
Fire-and-forget, non-critical BackgroundTasks Zero setup, built into FastAPI
Recurring schedule (every 5 min) APScheduler Cron-like, in-process, simple
Delayed + must survive restarts ARQ Redis-backed, async-native, lightweight
Complex workflows, many workers Celery Mature, feature-rich, best monitoring
Dead letter queue concept — tasks that fail all retries go to a "dead letter queue" so you can inspect and requeue them manually:
Python — dead letter pattern (Celery)
# When all retries exhausted, on_failure is called @celery_app.task(bind=True, max_retries=3) def send_email_task(self, email: str): try: do_send(email) except Exception as exc: raise self.retry(exc=exc, max_retries=3) # Celery "task_failure" signal — fires when all retries exhausted from celery.signals import task_failure @task_failure.connect def on_task_failure(sender, task_id, exception, args, kwargs, **kw): # Log to dead-letter DB table / alert on Slack print(f"DEAD LETTER: task {task_id} failed permanently. Args: {args}")
1
Start with BackgroundTasks
For simple email sends, audit logs — no extra infrastructure needed.
2
Add APScheduler if you need cron
Daily reports, token cleanup, expiry checks — all can use APScheduler in-process.
3
Graduate to ARQ or Celery when you need retries + persistence
Payments, critical notifications, data pipelines — these must not be lost on server restart.
Topic 13 — Key Takeaways
BackgroundTasks = fire-and-forget after response
Inject BackgroundTasks into any route, call .add_task(fn, *args). Response goes out first, then tasks run in order. Supports both sync and async functions.
Use BackgroundTasks for email sends and audit logs
Perfect for non-critical work where losing a task on crash is acceptable. Queue multiple tasks per request. Pass it through DI into service classes.
Delayed tasks need APScheduler or ARQ
APScheduler for in-process cron. ARQ for Redis-backed delayed + durable tasks. Both integrate cleanly with FastAPI's lifespan.
Retries require a task queue
Manual retry wrappers work for simple cases. ARQ's Retry exception and Celery's self.retry() give you production-grade retry + exponential backoff + dead letter queues.