DEV Community

Ugur Aslim
Ugur Aslim

Posted on • Originally published at uguraslim.com

FastAPI Performance Tuning: Caching, Async, and Production Bottlenecks

FastAPI Performance Tuning: Caching, Async, and Production Bottlenecks

I've tuned FastAPI applications from handling 50 req/s to 5,000+ req/s in production. The gap between a "working" FastAPI app and a fast one isn't magic—it's understanding where time actually goes and fixing it systematically.

Most performance advice is generic ("use caching", "go async"). This post is specific: here's what killed my APIs, how I measured it, and the exact patterns I use now in CitizenApp.

The Three Bottlenecks That Matter

Before optimizing anything, I profile. FastAPI's Starlette foundation gives you request timing for free—but you need to look.

1. Synchronous database calls block the event loop. This is the killer. One 200ms database query in a sync endpoint blocks every other request waiting for an async worker. I learned this the hard way when our auth endpoint hit production traffic.

2. Missing connection pooling. SQLAlchemy without proper pool configuration creates new database connections per request. I've watched a 4-core Render instance exhaust Postgres connections (100 default limit) at only 300 concurrent users.

3. Uncompressed responses. This sounds trivial until you're serving 5MB JSON arrays. Gzip cuts that to 200KB. On a 10Mbps connection, that's the difference between 4 seconds and 160ms per request.

The Pattern I Use: Async-First with Strategic Sync

Here's my rule: make the database layer async first, database access second, serialization third.

# ❌ This blocks. Don't do this.
@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    return user

# ✅ This doesn't block. Do this.
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    return user
Enter fullscreen mode Exit fullscreen mode

Why async matters: FastAPI runs on Uvicorn with an event loop. If your endpoint is sync, Uvicorn queues it to a thread pool—creating context switch overhead and limiting concurrency. Async endpoints stay on the event loop.

But here's what burned me: not all libraries are async-friendly. Requests library? Sync only. Anthropic Python SDK? Sync by default. When I need sync code, I use run_in_threadpool:

from fastapi.concurrency import run_in_threadpool
import anthropic

@app.post("/analyze")
async def analyze(text: str):
    client = anthropic.Anthropic()

    # This would block. Don't do it directly:
    # response = client.messages.create(...)

    # Instead:
    response = await run_in_threadpool(
        client.messages.create,
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": text}]
    )
    return {"analysis": response.content[0].text}
Enter fullscreen mode Exit fullscreen mode

This offloads the blocking call to Uvicorn's thread pool without freezing the event loop.

Connection Pooling: The Silent Killer

SQLAlchemy's default pool is designed for single-process apps. In production with multiple workers, you'll exhaust connections.

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool, QueuePool

# Development (single worker, temporary connections):
engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool  # Don't pool in dev
)

# Production (multiple workers, connection pooling):
engine = create_async_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=5,  # connections per worker
    max_overflow=10,  # overflow when pool exhausted
    pool_recycle=3600,  # recycle connections every hour
    echo=False
)

async def get_async_db():
    async with AsyncSession(engine) as session:
        yield session
Enter fullscreen mode Exit fullscreen mode

The math: pool_size * num_workers should be less than your database's max connections. On Render's standard plan (2 workers), I use pool_size=5 + max_overflow=10 = max 30 connections. Postgres default allows 100, so we're safe.

Caching: Where It Actually Helps

I cache three things: rarely-changing data, expensive computations, and database queries.

from functools import lru_cache
from datetime import datetime, timedelta

# Option 1: Simple in-memory cache (single worker only)
@lru_cache(maxsize=128)
def get_org_plan(org_id: int) -> str:
    # This would hit database
    return "pro"  # stubbed

# Option 2: Redis (multi-worker, production)
import redis.asyncio as redis

redis_client: redis.Redis = None

async def init_redis():
    global redis_client
    redis_client = await redis.from_url("redis://localhost")

async def get_cached_org_plan(org_id: int) -> str:
    cached = await redis_client.get(f"org:{org_id}:plan")
    if cached:
        return cached.decode()

    # Fetch from DB
    result = await db.execute(select(Org).where(Org.id == org_id))
    org = result.scalar_one()

    # Cache for 1 hour
    await redis_client.setex(f"org:{org_id}:plan", 3600, org.plan)
    return org.plan
Enter fullscreen mode Exit fullscreen mode

Don't cache everything. Cache invalidation is hard. I cache only:

  • Organization settings (invalidate on update)
  • AI model pricing (stale data is acceptable)
  • Public user profiles (public = cacheable)

What I don't cache: auth tokens, user permissions, real-time data. The invalidation complexity isn't worth it.

Compression and Middleware

from fastapi.middleware.gzip import GZIPMiddleware
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(GZIPMiddleware, minimum_size=1000)

# CORS first (applied last, evaluated first)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.citizenapp.com"],
    allow_credentials=True,
)
Enter fullscreen mode Exit fullscreen mode

Middleware order matters in FastAPI. The last add_middleware is the first evaluated. I add compression last (closest to the actual endpoint).

Gzip minimum_size=1000: don't compress small responses. The overhead isn't worth it.

Gotcha: Task Queues Hide Performance Issues

I used to ship everything async and assume it was fast. Then I shipped an endpoint that called an external API (3-second timeout). Users felt 3-second latency.

The fix: offload to a task queue.

from celery import Celery

celery_app = Celery("tasks", broker="redis://localhost")

@app.post("/send-email")
async def send_email(email: str):
    # Fire and forget
    celery_app.delay(send_email_task, email)
    return {"status": "queued"}

@celery_app.task
def send_email_task(email: str):
    # This blocks for 2 seconds. Celery workers handle it, not your API.
    requests.post("https://sendgrid.com/...", ...)
Enter fullscreen mode Exit fullscreen mode

Users get instant response. Background job handles the slow operation.

What I Missed Early

I profiled endpoints in isolation. Then production traffic patterns exposed cache misses, lock contention, and connection pool exhaustion I never saw in testing.

Load test with realistic traffic patterns, not synthetic uniform requests. Use locust or k6:

# locust test
from locust import HttpUser, task

class APIUser(HttpUser):
    @task(3)
    def list_items(self):
        self.client.get("/items")

    @task(1)
    def create_item(self):
        self.client.post("/items", json={"name": "test"})
Enter fullscreen mode Exit fullscreen mode

This caught that our list endpoint (cached) was fine, but unbatched creates killed the database.

Your Move

Start with async database access. Add caching only when profiling shows repeated queries. Load test before deploying. Measure before and after—don't guess.

That's how CitizenApp handles thousands of concurrent users without infrastructure complexity.

Top comments (0)