Event Sourcing e CQRS: quando eventos são a fonte da verdade
- ⬜🪢 Sagas vs 2PC: transações distribuídas sem perder o sono(Sistemas Distribuídos)
Recomendamos completar os pré-requisitos antes de seguir, mas nada te impede de continuar.
Quase todo sistema que você conhece persiste o estado atual: user.balance = 150. Foi 200, virou 150. A mutação aconteceu, o valor anterior morreu. Event Sourcing inverte essa premissa: grava a sequência de eventos que aconteceram (Credited 200, Debited 50) e deriva o estado dessa lista imutável. Dá pra voltar no tempo, reconstruir qualquer view, auditar tudo — ao preço de complexidade real.
CQRS é o primo frequentemente associado: separar commands (escrita) de queries (leitura) em modelos/bancos/escalas diferentes. Este módulo mostra o que são, como implementar, armadilhas reais (versionamento de eventos é um pesadelo), e quando não usar — porque 80% dos casos são overengineering.
Event Sourcing: eventos como fonte da verdade
CRUD tradicional:
┌──────────────────────────────┐
│ accounts │
│ id | balance | updated_at │
│ 42 | 150.00 | 2026-04-16 │ ← só o estado atual
└──────────────────────────────┘
Event Sourcing:
┌──────────────────────────────────────────────────────────────┐
│ events (append-only) │
│ id | aggregate_id | type | payload | timestamp │
│ 1 | 42 | AccountOpened| {owner:"Ana"} | 2026-01-10 │
│ 2 | 42 | Credited | {amount:200} | 2026-02-03 │
│ 3 | 42 | Debited | {amount:50} | 2026-03-18 │
│ 4 | 42 | Debited | {amount:0.01} | 2026-04-16 │
└──────────────────────────────────────────────────────────────┘
→ estado atual = somar eventos: balance = 149.99
→ história completa, imutável, auditávelPropriedades:
- Append-only: eventos nunca são editados ou deletados. Correções = novo evento (ex: BalanceCorrected).
- Ordenados por aggregate: eventos do mesmo agregado têm sequência monotônica (version 1, 2, 3...).
- Immutable audit log: você tem o "porquê" de cada mudança, não só o "o quê".
- Replay: reconstruir qualquer estado passado aplicando eventos até um timestamp.
Command → Event → Aggregate: o ciclo básico
┌─────────┐ command ┌──────────────┐ validate + produce ┌──────────────┐
│ Cliente │ ──────────────► │ Command │ ────────────────────────► │ Event(s) │
└─────────┘ │ Handler │ └──────────────┘
└──────────────┘ │
▲ │ append
│ load ▼
│ ┌────────────────┐
┌──────────────┐ apply events │ EVENT STORE │
│ Aggregate │◄─────────────────────────│ (append-only) │
│ (in memory) │ └────────────────┘
└──────────────┘
│ stream
▼
┌─────────────────┐
│ PROJECTIONS │
│ (read models) │
└─────────────────┘Exemplo: BankAccount em Python:
# bank_account.py — agregado em Event Sourcing
from dataclasses import dataclass, field
from typing import Literal
# ─── Events (imutáveis) ───
@dataclass(frozen=True)
class AccountOpened:
account_id: str
owner: str
@dataclass(frozen=True)
class Credited:
account_id: str
amount: float
@dataclass(frozen=True)
class Debited:
account_id: str
amount: float
Event = AccountOpened | Credited | Debited
# ─── Aggregate ───
@dataclass
class BankAccount:
account_id: str = ""
owner: str = ""
balance: float = 0.0
version: int = 0 # número do último evento aplicado
_uncommitted: list[Event] = field(default_factory=list)
@classmethod
def load(cls, history: list[Event]) -> "BankAccount":
acc = cls()
for e in history:
acc._apply(e)
acc.version += 1
return acc
def _apply(self, e: Event) -> None:
if isinstance(e, AccountOpened):
self.account_id = e.account_id
self.owner = e.owner
elif isinstance(e, Credited):
self.balance += e.amount
elif isinstance(e, Debited):
self.balance -= e.amount
# ─── Commands (validam + produzem eventos) ───
def open(self, account_id: str, owner: str) -> None:
if self.account_id:
raise ValueError("Already opened")
self._raise(AccountOpened(account_id, owner))
def credit(self, amount: float) -> None:
if amount <= 0:
raise ValueError("Amount must be positive")
self._raise(Credited(self.account_id, amount))
def debit(self, amount: float) -> None:
if amount <= 0:
raise ValueError("Amount must be positive")
if self.balance - amount < 0:
raise ValueError("Insufficient funds") # regra de negócio
self._raise(Debited(self.account_id, amount))
def _raise(self, e: Event) -> None:
self._apply(e)
self._uncommitted.append(e)
self.version += 1
# ─── Repository ───
class AccountRepository:
def __init__(self, event_store):
self.store = event_store
async def load(self, account_id: str) -> BankAccount:
events = await self.store.load_stream(account_id)
return BankAccount.load(events)
async def save(self, acc: BankAccount) -> None:
if not acc._uncommitted:
return
expected_version = acc.version - len(acc._uncommitted)
await self.store.append(
acc.account_id, acc._uncommitted, expected_version
)
acc._uncommitted.clear()expected_versionpro event store. Se dois writers carregaram version=5 e os dois tentam anexar, o segundo falha (o version já virou 6) e precisa reload + retry. É o padrão padrão em ES.CQRS: separar escrita e leitura
CQRS é independente de Event Sourcing, mas se casam lindamente. A ideia: o modelo de escrita (command side) é otimizado pra invariantes; o modelo de leitura (query side) é otimizado pra consultas. Podem morar em bancos diferentes.
CLIENTE
├─ POST /accounts/42/debit ────► ┌─────────────┐
│ (command) │ Command │ ─► event store (Postgres/EventStoreDB)
│ │ Handler │ │
│ └─────────────┘ │ stream
│ ▼
│ ┌───────────────────┐
│ │ PROJECTORS │
│ │ (consumers) │
│ └───────────────────┘
│ ▲ ▲ ▲
│ │ │ │
│ ▼ ▼ ▼
│ ┌──────────┬────────┬────────┐
│ │Read DB #1│ ES#2 │Redis │
│ │(ledger) │(search)│(cache) │
│ └──────────┴────────┴────────┘
│ ▲ ▲ ▲
└─ GET /accounts/42 ───────────────────────────┤ │ │
(query) cada view serve queries específicas| Lado | Otimizado para | Modelos |
|---|---|---|
| Command (write) | Consistency, invariantes, validação | Aggregate rico, DDD |
| Query (read) | Performance de leitura, view-specific | Read models denormalizados |
Projections: as views do mundo
Projeção é um consumer que lê o event stream e mantém uma "view" do estado. Cada projeção é independente — você pode criar quantas quiser, deletar, refazer do zero (replay).
# projections.py — projector async alimentando tabela de leitura
async def account_balance_projector(event_stream):
"""Projeção simples: saldo atual por conta."""
async for event in event_stream:
async with db.begin() as tx:
if isinstance(event, AccountOpened):
await tx.execute(
text("INSERT INTO accounts_view (id, owner, balance) VALUES (:id, :o, 0)"),
{"id": event.account_id, "o": event.owner},
)
elif isinstance(event, Credited):
await tx.execute(
text("UPDATE accounts_view SET balance = balance + :a WHERE id = :id"),
{"id": event.account_id, "a": event.amount},
)
elif isinstance(event, Debited):
await tx.execute(
text("UPDATE accounts_view SET balance = balance - :a WHERE id = :id"),
{"id": event.account_id, "a": event.amount},
)
# ledger de checkpoint: última offset processada (pra replay/resume)
await tx.execute(
text("UPDATE projection_offsets SET last_event_id = :id WHERE name = 'balance'"),
{"id": event.id},
)
async def transactions_history_projector(event_stream):
"""Outra projeção do MESMO stream: histórico de operações."""
async for event in event_stream:
if isinstance(event, (Credited, Debited)):
await db.execute(
text("""INSERT INTO transactions_view
(account_id, type, amount, at)
VALUES (:id, :t, :a, :ts)"""),
{
"id": event.account_id,
"t": type(event).__name__,
"a": getattr(event, "amount", 0),
"ts": event.timestamp,
},
)Snapshots: otimizando o load
Carregar uma conta com 100k eventos pra cada comando vira inviável. Snapshots salvam o estado em momentos específicos (ex: a cada 100 eventos), então o load puxa o snapshot mais recente + só os eventos posteriores.
# snapshot.py — carregamento com snapshot
async def load_with_snapshot(account_id: str) -> BankAccount:
# 1. Busca snapshot mais recente
snapshot = await db.fetchrow(
"SELECT state, at_version FROM snapshots WHERE aggregate_id = $1 ORDER BY at_version DESC LIMIT 1",
account_id,
)
if snapshot:
acc = BankAccount(**snapshot["state"])
acc.version = snapshot["at_version"]
start_version = snapshot["at_version"] + 1
else:
acc = BankAccount()
start_version = 0
# 2. Carrega eventos APÓS o snapshot
events = await load_events(account_id, from_version=start_version)
for e in events:
acc._apply(e)
acc.version += 1
return acc
async def save_snapshot_if_needed(acc: BankAccount):
# A cada 100 eventos, persiste snapshot
if acc.version % 100 == 0:
await db.execute(
"INSERT INTO snapshots (aggregate_id, state, at_version) VALUES ($1, $2, $3)",
acc.account_id, serialize(acc), acc.version,
)Versionamento de eventos: o maior pain point
Eventos são imutáveis pra sempre. Se daqui a 3 anos você precisa adicionar campo, mudar nome, ou quebrar formato — precisa de estratégia.
| Estratégia | Quando usa |
|---|---|
| Adicionar campo opcional | Maior parte dos casos. Ignorar se ausente em eventos antigos. |
| Versionamento do evento (v1, v2) | Quando breaking change inevitável. Consumers deserializam por versão. |
| Upcasters (transformers) | Converter eventos v1 pra v2 ao carregar. Isola a lógica nova da antiga. |
| Copy-and-transform | Replay completo transformando eventos pra nova schema. Custoso mas limpo. |
# upcaster.py — exemplo de upcaster
def upcast(raw: dict) -> Event:
event_type = raw["type"]
version = raw.get("version", 1)
if event_type == "Credited":
if version == 1:
# v1 não tinha currency. Antes, todos BRL.
raw["currency"] = "BRL"
raw["version"] = 2
return Credited(**raw)
if event_type == "Debited":
...EventCorrected novo. A história permanece auditável.Event stores na prática
| Opção | Descrição | Quando usa |
|---|---|---|
| EventStoreDB (Kurrent) | Event store dedicado, streams nativos, projections built-in. | Quando você quer o produto feito pra isso. Setup novo. |
| Postgres append-only table | events (id, aggregate_id, version, type, payload, ts) com UNIQUE(aggregate_id, version). | A escolha mais comum em 2026. ACID local, migrations normais. |
| Kafka | Log imutável por partição. Compaction se precisar. | Escalas massivas, múltiplos consumers independentes. |
| DynamoDB streams | DynamoDB + DDB Streams pra projeções. | AWS native, alto throughput. |
| Marten (C# sobre Postgres) | Framework maduro Postgres+JSON. | Stack .NET. |
Schema Postgres comum:
-- event store simples em Postgres
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY, -- ordem global
aggregate_id UUID NOT NULL,
aggregate_version INT NOT NULL, -- ordem dentro do aggregate
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB DEFAULT '{}', -- correlação, causação, user
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(aggregate_id, aggregate_version) -- optimistic concurrency
);
CREATE INDEX ON events (aggregate_id, aggregate_version);
CREATE INDEX ON events (event_type); -- por tipo p/ projeções
CREATE INDEX ON events (occurred_at); -- filtros temporais
-- append com otimista
INSERT INTO events (aggregate_id, aggregate_version, event_type, payload)
VALUES ($1, $2, $3, $4);
-- vai falhar com "unique constraint violation" se outro writer já gravou aquela versãoDecisões reais
📋 Sistema bancário: extrato detalhado, auditoria legal, time travel de saldo
A história ALÉM do estado atual é o produto — extrato é literalmente o event log filtrado. Auditoria vira trivial (cada mudança tem quem/quando/por quê). Compliance (Bacen, LGPD) exige rastreabilidade que ES dá de graça. Complexidade extra compensa.
Alt: CRUD + audit_log table — Funciona, mas audit log sempre diverge do state por bugs/esquecimento.
📋 CRUD de cadastro de produtos (catálogo e-commerce)
Você só precisa do estado atual. A história de edições raramente importa fora de log de auditoria. ES aqui seria 5x mais código, 3x mais operação, zero retorno. Use Postgres + index + cache.
Alt: Event Sourcing — Só se tiver requisito específico de auditoria granular.
📋 E-commerce com múltiplas views do mesmo pedido (dashboard ops, app cliente, BI)
CQRS te dá liberdade pra modelar cada view sob medida. Você pode ter: Postgres pra write, Elasticsearch pra busca, Redis pra cache, data warehouse pra BI — todos alimentados via CDC ou eventos. Event Sourcing opcional; o ganho de CQRS sozinho já é enorme.
Alt: Single DB com views — Se todas as views cabem no mesmo banco sem dor. Mais simples.
📋 Domain complexo: seguros, logística, healthcare workflows
Domínios complexos com muitas transições de estado são onde ES brilha. O evento é uma linguagem ubíqua (domain experts reconhecem 'ClaimApproved', 'PolicyCanceled'). Replay dá poder de análise histórica. Pair com DDD bounded contexts.
Alt: State-based DDD — Tradicional — aggregate salva estado, não eventos. Perde time travel.
Perguntas típicas (Q&A)
Como apagar dados (LGPD right-to-be-forgotten) em event store imutável?
Patterns: (1) crypto-shredding — criptografar campos PII com chave por-usuário, jogar a chave fora quando solicitado (dado vira ilegível); (2) soft delete + tombstone event + reescrita dos snapshots; (3) segregar PII em loja separada referenciada por ID. A crypto-shredding é o padrão moderno.
Projeções podem ficar inconsistentes com o event store?
Sim, transientemente — projeção é eventual consistency. Monitor o lag do projector. Em caso de bug, delete a view, rode replay. Essa é a belezura: o source of truth é sempre os eventos.
Event store e eventos de integração (inter-serviços) são a mesma coisa?
Não! Domain events (internos) vivem no event store. Integration events (publicados pra outros serviços) são um conceito separado. Comum: domain event interno → handler → publica integration event diferente (com schema estável). Mistura os dois e você acopla todo mundo ao seu domínio interno.
ES é bom pra real-time analytics?
Ótimo — cada evento vira input pro streaming. Combine com Kafka + Flink/ksqlDB: projeções viram agregações em tempo real. Data warehouse populado via Kafka Connect. Event Sourcing empurra análise pra ser de primeira classe.
Tamanho do event store explode?
Cresce, mas é append-only (muito eficiente). Compressão JSONB, particionamento por aggregate_id, cold storage pra eventos antigos (S3 + Parquet) se necessário. Raramente é gargalo real — mais comum é projeção mal dimensionada.
- Event Sourcing: persiste eventos (deltas), não estado. Estado atual = fold sobre os eventos.
- CQRS: separa command side (escrita, invariantes) de query side (leitura, otimizada). Ortogonal a ES, combina bem.
- Eventos são imutáveis. Correções = novos eventos, nunca edit. Versionamento via upcasters.
- Snapshots otimizam load, não são source of truth. Podem ser descartados e refeitos.
- Projections alimentam read models múltiplos. Criar uma view nova = novo projector + replay.
- Optimistic concurrency: append falha se aggregate_version já existe. Retry after reload.
- Use ES quando a história importa (audit, time travel, domínio complexo). Evite em CRUD simples.
Próximo módulo: voltando ao single-node, mas com profundidade — Postgres MVCC e isolation levels.
Quiz rápido
4 perguntas · Acerte tudo e ganhe o badge 🎯 Gabarito