⚠️
Connection pool, N+1 e o que mata sua API
⏱ 14 min de leitura·+70 XP
APIs lentas raramente têm código Python lento — têm problemas no banco de dados. N+1 queries e falta de connection pool são os dois problemas de performance mais comuns e mais impactantes em aplicações com banco relacional.
N+1: detectar e corrigir
# PROBLEMA: N+1 query com SQLAlchemy ORM
from sqlalchemy.orm import Session
def listar_posts_com_autores(db: Session):
posts = db.query(Post).all() # 1 query: SELECT * FROM posts
for post in posts:
print(post.author.nome) # N queries: SELECT * FROM users WHERE id=?
# Total: 1 + N queries para N posts
# SOLUÇÃO 1: joinedload — JOIN numa única query
from sqlalchemy.orm import joinedload
def listar_posts_com_autores_v2(db: Session):
posts = db.query(Post).options(
joinedload(Post.author) # JOIN users na mesma query
).all()
for post in posts:
print(post.author.nome) # dados já carregados — sem nova query
# Total: 1 query com JOIN
# SOLUÇÃO 2: selectinload — 2 queries mas sem duplicação de dados
from sqlalchemy.orm import selectinload
def listar_posts_com_autores_v3(db: Session):
posts = db.query(Post).options(
selectinload(Post.author) # SELECT users WHERE id IN (1, 2, 3, ...)
).all()
# Total: 2 queries (melhor que JOIN para relações 1:N com duplicação)
# DETECÇÃO em desenvolvimento:
# sqlalchemy-utils tem QueryCounter:
from sqlalchemy import event
query_count = [0]
@event.listens_for(engine, "before_cursor_execute")
def count_queries(conn, cursor, statement, *args):
query_count[0] += 1
# Ou usar SQLAlchemy Echo:
engine = create_engine(url, echo=True) # loga todas as queries
# SQL puro — sempre use JOIN ou IN:
# ❌ N+1:
posts = db.execute("SELECT * FROM posts").fetchall()
for post in posts:
author = db.execute("SELECT * FROM users WHERE id=?", [post.author_id]).fetchone()
# ✅ JOIN:
results = db.execute("""
SELECT p.*, u.nome as author_nome
FROM posts p
JOIN users u ON p.author_id = u.id
""").fetchall()Connection pool: SQLAlchemy e asyncpg
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Pool de conexões com SQLAlchemy (síncrono)
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=10, # conexões mantidas abertas
max_overflow=20, # conexões extras em pico (máx: pool_size + max_overflow = 30)
pool_timeout=30, # espera máxima por conexão disponível (segundos)
pool_recycle=3600, # recria conexões após 1h (evita conexões mortas)
pool_pre_ping=True, # testa conexão antes de usar (detecta conexões mortas)
)
# Calcular pool_size ideal:
# pool_size ≈ (workers × threads_por_worker) / 2
# Para FastAPI com 4 workers Uvicorn: 4 × 1 = 4 → pool_size=5, max_overflow=10
# Pool assíncrono com SQLAlchemy async
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=0, # asyncio — não precisa de overflow geralmente
pool_timeout=30,
)
# asyncpg diretamente (sem ORM)
import asyncpg
async def criar_pool():
return await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=5, # conexões mínimas abertas
max_size=20, # máximo de conexões
command_timeout=10, # timeout por query
max_inactive_connection_lifetime=300, # fecha conexões ociosas após 5min
)
async def buscar_usuario(pool, user_id: int):
async with pool.acquire() as conn: # pega conexão do pool
return await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
# conexão devolvida ao pool automaticamente ao sair do context managerPgBouncer: connection pooling externo
| Aspecto | Pool na aplicação | PgBouncer |
|---|---|---|
| Escopo | Por processo | Global (todos os processos/serviços) |
| Overhead | Zero (mesmo processo) | Pequeno (processo extra) |
| Escalabilidade | N_workers × pool_size conexões | Pool centralizado, muito mais eficiente |
| Restart de app | Conexões reconectam | Pool persiste, sem overhead de reconexão |
| Quando usar | 1 instância, poucas conexões | Múltiplas instâncias, muitos workers |
# pgbouncer.ini — configuração básica
# [databases]
# mydb = host=localhost port=5432 dbname=mydb
#
# [pgbouncer]
# pool_mode = transaction
# max_client_conn = 10000 # conexões de clientes (aplicação)
# default_pool_size = 25 # conexões reais ao PostgreSQL por database
# min_pool_size = 5
# server_lifetime = 3600
# server_idle_timeout = 600
# listen_port = 5432
# listen_addr = *
# Com PgBouncer, configure a aplicação para usar NullPool
# (PgBouncer já faz o pool, não precisa do pool da aplicação):
from sqlalchemy import NullPool
engine = create_engine(
"postgresql://user:pass@pgbouncer-host:5432/mydb",
poolclass=NullPool, # sem pool na aplicação — PgBouncer faz isso
)
# Métricas para monitorar:
# SHOW POOLS; — estado dos pools
# SHOW STATS; — queries por segundo, duração média
# SHOW CLIENTS; — conexões de clientes
# SHOW SERVERS; — conexões com PostgreSQL✅
Regras de ouro: nunca crie conexão por request sem pool. Use
pool_pre_ping=True para detectar conexões mortas. Configure pool_size baseado no número de workers × CPU do banco (PostgreSQL é ~1 core por conexão ativa). Detecte N+1 em desenvolvimento com echo=True ou SQLAlchemy event hooks. Use joinedload para relações 1:1 e selectinload para 1:N.💡
Você concluiu a trilha SQL & Databases! Próximo: Como o Computador Funciona — CPU, memória, processos e I/O por baixo dos panos.
🧩
Quiz rápido
3 perguntas · Acerte tudo e ganhe o badge 🎯 Gabarito
Continue lendo