Systems com múltiplos agents são o que permite resolver tarefas que excedem a capacidade de um único contexto ou requerem especialização paralela. Neste módulo, cobrimos os padrões fundamentais de orquestração: quando usar workflow fixo vs agent adaptativo, como implementar handoffs e como compor sistemas multi-agent confiáveis.
Workflows vs Agents: a decisão certa
| Critério | Workflow (código controla) | Agent (LLM controla) |
|---|---|---|
| Processo | Bem definido e estável | Aberto ou variável |
| Debugging | Simples — trace determinístico | Complexo — decisões emergentes |
| Custo | Previsível — N chamadas fixas | Variável — N calls dependem do LLM |
| Flexibilidade | Baixa — mudança requer código | Alta — adapta ao contexto |
| Confiabilidade | Alta — comportamento estável | Moderada — depende do modelo |
| Ideal para | Extração, classificação, geração formatada | Pesquisa, resolução de problemas, análise aberta |
# Exemplo: mesmo objetivo, abordagem diferente
# ─── WORKFLOW (processo fixo) ───────────────────────────
# Classificar sentimento + extrair entidades de reviews
# Processo sempre é: classificar → extrair → formatar
def processar_review_workflow(review: str) -> dict:
# Passo 1: classificação
sentimento = client.messages.create(
model="claude-haiku-4-5-20251001", # modelo menor = mais barato
max_tokens=10,
messages=[{"role": "user",
"content": f"Sentimento (positivo/negativo/neutro):
{review}"}]
).content[0].text.strip()
# Passo 2: extração
entidades = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=100,
messages=[{"role": "user",
"content": f"Entidades mencionadas (JSON list):
{review}"}]
).content[0].text.strip()
return {"sentimento": sentimento, "entidades": json.loads(entidades)}
# ─── AGENT (processo adaptativo) ────────────────────────
# Investigar reclamação de cliente — processo varia por caso
def investigar_reclamacao_agent(reclamacao: str) -> str:
# Claude decide quais tools usar, em que ordem, quantas vezes
messages = [{"role": "user", "content": reclamacao}]
while True:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
tools=[buscar_pedido_tool, buscar_historico_tool, consultar_estoque_tool],
messages=messages
)
if response.stop_reason == "end_turn":
return response.content[0].text
# ... processa tool calls e continuaPadrão orquestrador-worker
# Orquestrador delega para workers especializados:
import asyncio
from anthropic import Anthropic
client = Anthropic()
# ─── Workers especializados ──────────────────────────────
async def worker_pesquisa(topico: str) -> str:
"""Worker especializado em pesquisa web."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1000,
tools=[search_web_tool], # só tem acesso à pesquisa
system="Você é um pesquisador. Pesquise e retorne fatos verificáveis.",
messages=[{"role": "user", "content": f"Pesquise: {topico}"}]
)
return _run_tool_loop(response) # loop de tool use
async def worker_analise(dados: str, criterios: str) -> str:
"""Worker especializado em análise estruturada."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1000,
system="Você é um analista. Avalie os dados segundo os critérios fornecidos.",
messages=[{"role": "user",
"content": f"Dados:
{dados}
Critérios:
{criterios}"}]
)
return response.content[0].text
async def worker_redacao(analises: list[str], formato: str) -> str:
"""Worker especializado em síntese e redação."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2000,
system="Você é um editor. Sintetize as análises em um relatório coeso.",
messages=[{"role": "user",
"content": f"Síntese em {formato}:
" + "
---
".join(analises)}]
)
return response.content[0].text
# ─── Orquestrador ─────────────────────────────────────────
async def orquestrar_due_diligence(empresa: str) -> str:
"""
Orquestrador que coordena workers para due diligence.
O orquestrador não executa pesquisa ou análise — só coordena.
"""
# FASE 1: Pesquisa paralela (sub-tarefas independentes)
topicos = [
f"histórico financeiro e funding de {empresa}",
f"reputação e reviews de {empresa}",
f"litígios e problemas legais de {empresa}",
f"concorrentes diretos de {empresa}",
]
pesquisas = await asyncio.gather(*[
worker_pesquisa(topico) for topico in topicos
])
# FASE 2: Análise paralela (cada pesquisa → análise independente)
criterios = """
- Saúde financeira: crescimento, lucratividade, riscos
- Reputação: NPS, reviews, crises de imagem
- Risco legal: pendências, compliance
- Posição competitiva: market share, diferenciais
"""
analises = await asyncio.gather(*[
worker_analise(pesquisa, criterios)
for pesquisa in pesquisas
])
# FASE 3: Síntese sequencial (depende de todas as análises)
relatorio = await worker_redacao(
analises=analises,
formato="relatório executivo de due diligence em Markdown"
)
return relatorioHandoffs entre agents
# Padrão de handoff: um agent passa contexto para outro
# Caso de uso: pipeline de conteúdo
# Pesquisador → Escritor → Editor → Publicador
class AgentPipeline:
def __init__(self):
self.history = [] # histórico compartilhado entre agents
def executar_agent(self, system: str, task: str,
context: str = "") -> str:
"""Executa um agent com contexto acumulado do pipeline."""
prompt = task
if context:
prompt = f"Contexto das etapas anteriores:
{context}
{task}"
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2000,
system=system,
messages=[{"role": "user", "content": prompt}]
)
result = response.content[0].text
# Acumula no histórico do pipeline
self.history.append({"agent": system[:50], "output": result})
return result
def gerar_artigo(self, topico: str) -> dict:
pipeline = AgentPipeline()
# Agent 1: Pesquisador
pesquisa = pipeline.executar_agent(
system="Você é um pesquisador especializado. Pesquise com profundidade.",
task=f"Pesquise o tópico: {topico}. Retorne fatos, dados e fontes."
)
# Agent 2: Escritor (recebe pesquisa como contexto)
rascunho = pipeline.executar_agent(
system="Você é um escritor técnico sênior. Escreva com clareza e precisão.",
task="Escreva um artigo técnico baseado na pesquisa. 800-1200 palavras.",
context=f"PESQUISA:
{pesquisa}"
)
# Agent 3: Editor (recebe rascunho como contexto)
revisado = pipeline.executar_agent(
system="Você é um editor exigente. Melhore sem alterar o conteúdo técnico.",
task="Revise: melhore fluxo, corrija erros, garanta consistência de tom.",
context=f"RASCUNHO:
{rascunho}"
)
return {
"topico": topico,
"pesquisa": pesquisa,
"rascunho": rascunho,
"final": revisado,
"pipeline_log": pipeline.history
}Controle de qualidade e checkpointing
# Para sistemas de produção: checkpoints e validação entre etapas
import json
from dataclasses import dataclass
from enum import Enum
class Status(Enum):
PENDENTE = "pendente"
EM_PROGRESSO = "em_progresso"
COMPLETO = "completo"
FALHOU = "falhou"
AGUARDANDO_REVISAO = "aguardando_revisao" # Human-in-the-loop
@dataclass
class Checkpoint:
etapa: str
status: Status
output: str | None = None
erro: str | None = None
class PipelineComCheckpoint:
def __init__(self, task_id: str):
self.task_id = task_id
self.checkpoints: list[Checkpoint] = []
def executar_etapa(self, nome: str, fn, *args,
requer_aprovacao: bool = False) -> str:
checkpoint = Checkpoint(etapa=nome, status=Status.EM_PROGRESSO)
self.checkpoints.append(checkpoint)
try:
resultado = fn(*args)
if requer_aprovacao:
# Pausa para revisão humana (notifica via webhook, email, etc.)
checkpoint.status = Status.AGUARDANDO_REVISAO
checkpoint.output = resultado
self._notificar_revisor(nome, resultado)
aprovado = self._aguardar_aprovacao() # polling ou webhook
if not aprovado:
raise ValueError(f"Etapa '{nome}' rejeitada na revisão")
checkpoint.status = Status.COMPLETO
checkpoint.output = resultado
return resultado
except Exception as e:
checkpoint.status = Status.FALHOU
checkpoint.erro = str(e)
# Persiste o estado — pode ser retomado de onde parou
self._persistir_estado()
raise
def retomar_de_checkpoint(self, etapa_nome: str) -> None:
"""Retoma pipeline a partir de uma etapa específica."""
# Útil quando uma etapa falha e o problema foi corrigido
idx = next(i for i, c in enumerate(self.checkpoints)
if c.etapa == etapa_nome)
self.checkpoints = self.checkpoints[:idx] # remove checkpoints a partir dali| Padrão | Quando usar | Trade-off |
|---|---|---|
| Sequential workflow | Processo definido, etapas dependentes | Previsível mas inflexível |
| Parallel fan-out | Sub-tarefas independentes | Rápido mas requer agregação |
| Orchestrator-worker | Tarefas complexas com especialização | Flexível mas difícil de debugar |
| Pipeline com handoff | Transformação progressiva de conteúdo | Claro mas latência acumulada |
| Human-in-the-loop | Ações com alto impacto ou risco | Seguro mas latência manual |
Sistemas multi-agent são multiplicadores de capacidade — quando bem orquestrados. A chave é clareza de responsabilidade: orquestrador coordena, workers executam, checkpoints garantem qualidade. O erro comum é fazer o orquestrador fazer tudo — isso destrói os benefícios da especialização e paralelismo. Comece simples: um workflow fixo para processo definido. Adicione agents onde o processo precisa ser adaptativo.
Próximo: Claude em produção — custo real por chamada, estratégias de caching para economizar 80%+ em tokens, rate limits e como monitorar sistemas Claude em produção.