🧠FFVAcademy
🚀

LLM APIs em Produção: streaming, structured output, batch e cache

16 min de leitura·+80 XP
Pré-requisitos (0/1)0%

Recomendamos completar os pré-requisitos antes de seguir, mas nada te impede de continuar.

Chamar client.messages.create() no notebook é fácil. Fazer isso em produção com 100k req/dia, SLOs de latência, custo sob controle e sem romper o banco em deploy ruim — é outra história. Este módulo cobre os padrões que separam protótipo de sistema: streaming, structured output, batch API, prompt caching, retry, rate limit e idempotência.

Streaming SSE: reduzir percepção de latência

Server-Sent Events (SSE) é um fluxo HTTP unidirecional onde o servidor manda data: eventos conforme tokens chegam. Em chat UI, o usuário vê a resposta sendo digitada — percepção de latência despenca.

python
# Streaming com Anthropic SDK
from anthropic import Anthropic
client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explique quantum tunneling."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    final = stream.get_final_message()

# Passando adiante em FastAPI/Hono como SSE para o browser
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat")
async def chat(body: dict):
    async def gen():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=body["messages"],
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {json.dumps({'delta': text})}\n\n"
            yield "data: [DONE]\n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")
⚠️
Streaming complica retry: você já entregou parte da resposta quando o stream quebra. Estratégia comum é abortar e reiniciar no cliente (mostrar "tentando novamente"), ou persistir o request_id e reenviar do zero em 5xx no primeiro chunk. Em meio de stream, raramente vale retry.

Structured output: JSON schema no sampler

Em vez de pedir JSON por prompt e torcer, passe o schema para o provider. O decoder aplica gramática (constrained sampling) — a saída é garantidamente válida. Fim do parse com try/except.

python
# OpenAI Structured Outputs com Pydantic
from openai import OpenAI
from pydantic import BaseModel
from typing import Literal

client = OpenAI()

class Extract(BaseModel):
    customer_name: str
    sentiment: Literal["positive", "neutral", "negative"]
    urgency: int   # 1-5
    categories: list[str]

resp = client.responses.parse(
    model="gpt-5",
    input=[
        {"role": "system", "content": "Extraia dados estruturados do ticket."},
        {"role": "user",   "content": ticket_text},
    ],
    text_format=Extract,
)
extracted: Extract = resp.output_parsed     # tipado, válido

# Anthropic: usar tool_use como mecanismo (pattern comum)
schema = {
    "name": "extract_ticket",
    "description": "Extrai dados estruturados do ticket.",
    "input_schema": Extract.model_json_schema(),
}
r = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=500,
    tools=[schema],
    tool_choice={"type": "tool", "name": "extract_ticket"},   # força uso
    messages=[{"role": "user", "content": ticket_text}],
)
parsed = next(b for b in r.content if b.type == "tool_use").input

📋 Extrair dados estruturados de texto em pipeline ETL

Structured output com schema (tool_use ou responses.parse)

Garantia de validade elimina camada de retry/parse defensivo. Código downstream assume tipos corretos. Reduz bugs invisíveis em dados.

Alt: JSON in prompt + try/exceptfunciona mas 1-5% quebra; custo de debug acumula

Alt: Function calling "livre"ok se a API aceita mas structured output é mais direto

Batch API: 50% off em jobs offline

Anthropic, OpenAI e Google têm Batch API: você envia JSONL com N requests, recebe JSONL com N responses em até 24h (tipicamente minutos). Preço de input/output: 50% do normal.

python
# Anthropic Batch API (Message Batches)
from anthropic import Anthropic
client = Anthropic()

batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": f"item-{i}",
            "params": {
                "model": "claude-sonnet-4-6",
                "max_tokens": 512,
                "messages": [{"role": "user", "content": prompt}],
            },
        }
        for i, prompt in enumerate(prompts)      # pode ter 10k+ itens
    ],
)

# Poll até terminar
import time
while True:
    status = client.messages.batches.retrieve(batch.id)
    if status.processing_status in ("ended", "failed", "canceled"):
        break
    time.sleep(30)

# Ler resultados (JSONL streamed)
for line in client.messages.batches.results(batch.id):
    if line.result.type == "succeeded":
        save_result(line.custom_id, line.result.message)
CasoBatchSynchronous
Eval harness (golden set de 1k)✓ — desconto compensaok para dev
Reembedding de 10M chunks✓ — economia enorme× custo x2
Enriquecimento noturno de dados×
Resposta em UI×✓ (streaming)
Pipeline event-driven em tempo real×
Score de risco por request×

Prompt caching em APIs (revisão rápida)

Coberto em detalhe no módulo de context engineering. O resumo operacional para APIs:

  • Anthropic: marque blocos com cache_control: {type:"ephemeral"}; write 125% / read 10%.
  • OpenAI: automático para prefixos repetidos ≥1024 tokens; read ~50%.
  • Google Gemini: cachedContent API explícita; read ~25% + taxa de storage.
  • TTL típico: ~5min. Volume baixo + janelas esparsas perdem o benefício.

Retry, backoff e jitter

python
# Retry robusto com tenacity
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, retry_if_exception_type
from anthropic import APIStatusError, APITimeoutError, RateLimitError

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential_jitter(initial=1, max=30, jitter=2),
    retry=retry_if_exception_type((APITimeoutError, RateLimitError, APIStatusError)),
    reraise=True,
)
def llm_call(messages, **kwargs):
    try:
        return client.messages.create(messages=messages, **kwargs)
    except APIStatusError as e:
        # Retry só em 5xx e 429; 4xx persistente não resolve
        if e.status_code < 500 and e.status_code != 429:
            raise StopRetry() from e
        raise
StatusRetry?Por que
408 Request TimeoutSim, backoffServidor estourou tempo — pode passar em nova tentativa
429 Too Many RequestsSim, respeitar retry-afterRate limit ou quota
500 Internal Server ErrorSim, backoffErro transiente do provider
502/503/504 GatewaySim, backoffUpstream transiente
400 Bad RequestNãoSeu request está inválido — retry não corrige
401 UnauthorizedNãoToken inválido/expirado — gere novo fora do retry
403 ForbiddenNãoPermission; abra ticket
404 Not FoundNãoRecurso inexistente
Connect timeoutSim, backoffRede — pode passar

Rate limits e backpressure

Providers têm limits por minuto (RPM) e por tokens/min (TPM). Ultrapassar → 429. Em volume, você precisa de um gateway/cliente que:

  • Estime tokens antes de enviar (tiktoken, @anthropic-ai/tokenizer).
  • Use token bucket local para não estourar RPM/TPM.
  • Aplique backpressure: quando cheio, responda 429 ou enfileire com queue (SQS, Redis).
  • Priorize queries críticas (UI) sobre jobs (batch/analytics) — separe por API key se possível.
typescript
// Token bucket simples (RPM=60) para não bater 429 no provider
class TokenBucket {
  private tokens: number;
  private last: number;
  constructor(private rate: number, private capacity: number) {
    this.tokens = capacity;
    this.last = Date.now();
  }
  async acquire(n = 1): Promise<void> {
    while (true) {
      this.refill();
      if (this.tokens >= n) { this.tokens -= n; return; }
      const need = (n - this.tokens) / this.rate;
      await new Promise(r => setTimeout(r, need * 1000));
    }
  }
  private refill() {
    const now = Date.now();
    const add = ((now - this.last) / 1000) * this.rate;
    this.tokens = Math.min(this.capacity, this.tokens + add);
    this.last = now;
  }
}

const bucket = new TokenBucket(60 / 60, 60);  // 60 RPM
await bucket.acquire();
const r = await client.messages.create(...);

Idempotência e deduplicação

Retry depois de resposta perdida é comum (proxy errou, timeout na última milha). Sem idempotency key, você cobra duas vezes o cliente. LLM APIs usam anthropic-idempotency-key (Anthropic), idempotency-key (OpenAI) — UUID v4 por request lógico, reutilizado em retries.

python
import uuid
from anthropic import Anthropic
client = Anthropic()

def generate_for_request(request_id: str, prompt: str):
    # idempotency_key estável → provider dedupe no seu lado
    r = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
        extra_headers={"anthropic-idempotency-key": request_id},
    )
    return r

# Em jobs async: persista (request_id, response) no seu lado.
# Antes de chamar LLM, consulte cache; só chama se não tiver.

Observability mínima para LLM API

MétricaPor que logarOnde
input_tokens / output_tokensCusto e anomalias de tamanhoPor request, agregado por endpoint
modelDrift de versão silenciosoAlert se mudar sem release
ttft (time to first token)UX streamingEm UI, p50/p95
latency_totalSLOp50/p95/p99 por endpoint
stop_reasonDetectar truncation (max_tokens)Alert se taxa &gt; 5%
cache_read / cache_writeEfetividade do prompt cachingCalcular hit ratio
finish_reason = "content_filter"Violação de policyInvestigar casos
retry_countSaúde do providerAlert se subir persistente
💡
Ferramentas prontas: Langfuse, LangSmith, Phoenix (Arize), Helicone. Todas instrumentam LLM calls com tags, traces e dashboards. Se preferir rolar no OpenTelemetry, use as semantic conventions paragen_ai.* — padronização recente (2025).

Perguntas típicas

Como escolher entre streaming e non-streaming em um endpoint?

Pela UX e pelo consumo. Chat UI, assistente, geração longa → streaming (melhor TTFT). Extração estruturada, classificação curta, job async → non-streaming (simplifica parse e retry). Regra: se usuário está olhando, streaming; se é pipeline, não.

Posso misturar providers no mesmo app?

Pode e é recomendado para resiliência. Padrão: abstrair atrás de uma interface ("generate(messages)") e fallback entre Anthropic/OpenAI/Google. Cuidado com diferenças de tool schema e prompt caching — cada provider tem particularidades. LiteLLM, OpenRouter, Portkey são gateways que ajudam.

Preciso mesmo de structured output, ou JSON mode basta?

Structured output com schema é mais forte que JSON mode. JSON mode garante JSON válido, mas não garante o schema (campos, tipos). Structured output força ambos. Para extração séria, sempre vá no schema explícito.

Batch API é sempre mais barato?

É ~50% off, mas SLA é até 24h. Se seu job pode aceitar delay, sempre use. Se precisar em minutos consistentes (ex: onboarding de clientes), synchronous com rate-limit controlado pode ser o certo. Não misture — job crítico em batch vira incident quando demora mais que o esperado.
Take-aways. Streaming reduz percepção de latência em UIs. Structured output com schema elimina retry de parse. Batch API dá 50% off em jobs offline. Prompt caching corta custo em agents repetitivos. Retry: exponential backoff + jitter, respeite retry-after, não retry em 4xx. Rate limit é seu problema — token bucket no cliente evita 429. Idempotency key é obrigatório em endpoints que criam estado. Observability (tokens, latência, stop_reason, cache ratio) é a base. Próximo: LLMOps — como operar tudo isso em produção com eval, drift, canary.
🧩

Quiz rápido

4 perguntas · Acerte tudo e ganhe o badge 🎯 Gabarito

Continue lendo