LLM APIs em Produção: streaming, structured output, batch e cache
- ⬜🔌 MCP Deep Dive: construindo um servidor profissional(Engenharia AI-Native)
Recomendamos completar os pré-requisitos antes de seguir, mas nada te impede de continuar.
Chamar client.messages.create() no notebook é fácil. Fazer isso em produção com 100k req/dia, SLOs de latência, custo sob controle e sem romper o banco em deploy ruim — é outra história. Este módulo cobre os padrões que separam protótipo de sistema: streaming, structured output, batch API, prompt caching, retry, rate limit e idempotência.
Streaming SSE: reduzir percepção de latência
Server-Sent Events (SSE) é um fluxo HTTP unidirecional onde o servidor manda data: eventos conforme tokens chegam. Em chat UI, o usuário vê a resposta sendo digitada — percepção de latência despenca.
# Streaming com Anthropic SDK
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Explique quantum tunneling."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
final = stream.get_final_message()
# Passando adiante em FastAPI/Hono como SSE para o browser
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat")
async def chat(body: dict):
async def gen():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=body["messages"],
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'delta': text})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")Structured output: JSON schema no sampler
Em vez de pedir JSON por prompt e torcer, passe o schema para o provider. O decoder aplica gramática (constrained sampling) — a saída é garantidamente válida. Fim do parse com try/except.
# OpenAI Structured Outputs com Pydantic
from openai import OpenAI
from pydantic import BaseModel
from typing import Literal
client = OpenAI()
class Extract(BaseModel):
customer_name: str
sentiment: Literal["positive", "neutral", "negative"]
urgency: int # 1-5
categories: list[str]
resp = client.responses.parse(
model="gpt-5",
input=[
{"role": "system", "content": "Extraia dados estruturados do ticket."},
{"role": "user", "content": ticket_text},
],
text_format=Extract,
)
extracted: Extract = resp.output_parsed # tipado, válido
# Anthropic: usar tool_use como mecanismo (pattern comum)
schema = {
"name": "extract_ticket",
"description": "Extrai dados estruturados do ticket.",
"input_schema": Extract.model_json_schema(),
}
r = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=500,
tools=[schema],
tool_choice={"type": "tool", "name": "extract_ticket"}, # força uso
messages=[{"role": "user", "content": ticket_text}],
)
parsed = next(b for b in r.content if b.type == "tool_use").input📋 Extrair dados estruturados de texto em pipeline ETL
Garantia de validade elimina camada de retry/parse defensivo. Código downstream assume tipos corretos. Reduz bugs invisíveis em dados.
Alt: JSON in prompt + try/except — funciona mas 1-5% quebra; custo de debug acumula
Alt: Function calling "livre" — ok se a API aceita mas structured output é mais direto
Batch API: 50% off em jobs offline
Anthropic, OpenAI e Google têm Batch API: você envia JSONL com N requests, recebe JSONL com N responses em até 24h (tipicamente minutos). Preço de input/output: 50% do normal.
# Anthropic Batch API (Message Batches)
from anthropic import Anthropic
client = Anthropic()
batch = client.messages.batches.create(
requests=[
{
"custom_id": f"item-{i}",
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 512,
"messages": [{"role": "user", "content": prompt}],
},
}
for i, prompt in enumerate(prompts) # pode ter 10k+ itens
],
)
# Poll até terminar
import time
while True:
status = client.messages.batches.retrieve(batch.id)
if status.processing_status in ("ended", "failed", "canceled"):
break
time.sleep(30)
# Ler resultados (JSONL streamed)
for line in client.messages.batches.results(batch.id):
if line.result.type == "succeeded":
save_result(line.custom_id, line.result.message)| Caso | Batch | Synchronous |
|---|---|---|
| Eval harness (golden set de 1k) | ✓ — desconto compensa | ok para dev |
| Reembedding de 10M chunks | ✓ — economia enorme | × custo x2 |
| Enriquecimento noturno de dados | ✓ | × |
| Resposta em UI | × | ✓ (streaming) |
| Pipeline event-driven em tempo real | × | ✓ |
| Score de risco por request | × | ✓ |
Prompt caching em APIs (revisão rápida)
Coberto em detalhe no módulo de context engineering. O resumo operacional para APIs:
- Anthropic: marque blocos com
cache_control: {type:"ephemeral"}; write 125% / read 10%. - OpenAI: automático para prefixos repetidos ≥1024 tokens; read ~50%.
- Google Gemini: cachedContent API explícita; read ~25% + taxa de storage.
- TTL típico: ~5min. Volume baixo + janelas esparsas perdem o benefício.
Retry, backoff e jitter
# Retry robusto com tenacity
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
from anthropic import APIStatusError, APITimeoutError, RateLimitError
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=30, jitter=2),
retry=retry_if_exception_type((APITimeoutError, RateLimitError, APIStatusError)),
reraise=True,
)
def llm_call(messages, **kwargs):
try:
return client.messages.create(messages=messages, **kwargs)
except APIStatusError as e:
# Retry só em 5xx e 429; 4xx persistente não resolve
if e.status_code < 500 and e.status_code != 429:
raise StopRetry() from e
raise| Status | Retry? | Por que |
|---|---|---|
| 408 Request Timeout | Sim, backoff | Servidor estourou tempo — pode passar em nova tentativa |
| 429 Too Many Requests | Sim, respeitar retry-after | Rate limit ou quota |
| 500 Internal Server Error | Sim, backoff | Erro transiente do provider |
| 502/503/504 Gateway | Sim, backoff | Upstream transiente |
| 400 Bad Request | Não | Seu request está inválido — retry não corrige |
| 401 Unauthorized | Não | Token inválido/expirado — gere novo fora do retry |
| 403 Forbidden | Não | Permission; abra ticket |
| 404 Not Found | Não | Recurso inexistente |
| Connect timeout | Sim, backoff | Rede — pode passar |
Rate limits e backpressure
Providers têm limits por minuto (RPM) e por tokens/min (TPM). Ultrapassar → 429. Em volume, você precisa de um gateway/cliente que:
- Estime tokens antes de enviar (tiktoken, @anthropic-ai/tokenizer).
- Use token bucket local para não estourar RPM/TPM.
- Aplique backpressure: quando cheio, responda 429 ou enfileire com queue (SQS, Redis).
- Priorize queries críticas (UI) sobre jobs (batch/analytics) — separe por API key se possível.
// Token bucket simples (RPM=60) para não bater 429 no provider
class TokenBucket {
private tokens: number;
private last: number;
constructor(private rate: number, private capacity: number) {
this.tokens = capacity;
this.last = Date.now();
}
async acquire(n = 1): Promise<void> {
while (true) {
this.refill();
if (this.tokens >= n) { this.tokens -= n; return; }
const need = (n - this.tokens) / this.rate;
await new Promise(r => setTimeout(r, need * 1000));
}
}
private refill() {
const now = Date.now();
const add = ((now - this.last) / 1000) * this.rate;
this.tokens = Math.min(this.capacity, this.tokens + add);
this.last = now;
}
}
const bucket = new TokenBucket(60 / 60, 60); // 60 RPM
await bucket.acquire();
const r = await client.messages.create(...);Idempotência e deduplicação
Retry depois de resposta perdida é comum (proxy errou, timeout na última milha). Sem idempotency key, você cobra duas vezes o cliente. LLM APIs usam anthropic-idempotency-key (Anthropic), idempotency-key (OpenAI) — UUID v4 por request lógico, reutilizado em retries.
import uuid
from anthropic import Anthropic
client = Anthropic()
def generate_for_request(request_id: str, prompt: str):
# idempotency_key estável → provider dedupe no seu lado
r = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
extra_headers={"anthropic-idempotency-key": request_id},
)
return r
# Em jobs async: persista (request_id, response) no seu lado.
# Antes de chamar LLM, consulte cache; só chama se não tiver.Observability mínima para LLM API
| Métrica | Por que logar | Onde |
|---|---|---|
| input_tokens / output_tokens | Custo e anomalias de tamanho | Por request, agregado por endpoint |
| model | Drift de versão silencioso | Alert se mudar sem release |
| ttft (time to first token) | UX streaming | Em UI, p50/p95 |
| latency_total | SLO | p50/p95/p99 por endpoint |
| stop_reason | Detectar truncation (max_tokens) | Alert se taxa > 5% |
| cache_read / cache_write | Efetividade do prompt caching | Calcular hit ratio |
| finish_reason = "content_filter" | Violação de policy | Investigar casos |
| retry_count | Saúde do provider | Alert se subir persistente |
gen_ai.* — padronização recente (2025).Perguntas típicas
❓ Como escolher entre streaming e non-streaming em um endpoint?
❓ Posso misturar providers no mesmo app?
❓ Preciso mesmo de structured output, ou JSON mode basta?
❓ Batch API é sempre mais barato?
Quiz rápido
4 perguntas · Acerte tudo e ganhe o badge 🎯 Gabarito