🧠FFVAcademy
📜

Event Sourcing e CQRS: quando eventos são a fonte da verdade

17 min de leitura·+85 XP
Pré-requisitos (0/1)0%

Recomendamos completar os pré-requisitos antes de seguir, mas nada te impede de continuar.

Quase todo sistema que você conhece persiste o estado atual: user.balance = 150. Foi 200, virou 150. A mutação aconteceu, o valor anterior morreu. Event Sourcing inverte essa premissa: grava a sequência de eventos que aconteceram (Credited 200, Debited 50) e deriva o estado dessa lista imutável. Dá pra voltar no tempo, reconstruir qualquer view, auditar tudo — ao preço de complexidade real.

CQRS é o primo frequentemente associado: separar commands (escrita) de queries (leitura) em modelos/bancos/escalas diferentes. Este módulo mostra o que são, como implementar, armadilhas reais (versionamento de eventos é um pesadelo), e quando não usar — porque 80% dos casos são overengineering.

Event Sourcing: eventos como fonte da verdade

CRUD tradicional:
  ┌──────────────────────────────┐
  │ accounts                      │
  │ id | balance | updated_at    │
  │ 42 |  150.00 | 2026-04-16    │    ← só o estado atual
  └──────────────────────────────┘

Event Sourcing:
  ┌──────────────────────────────────────────────────────────────┐
  │ events (append-only)                                           │
  │ id | aggregate_id | type         | payload       | timestamp  │
  │ 1  | 42           | AccountOpened| {owner:"Ana"} | 2026-01-10 │
  │ 2  | 42           | Credited     | {amount:200}  | 2026-02-03 │
  │ 3  | 42           | Debited      | {amount:50}   | 2026-03-18 │
  │ 4  | 42           | Debited      | {amount:0.01} | 2026-04-16 │
  └──────────────────────────────────────────────────────────────┘
  → estado atual = somar eventos: balance = 149.99
  → história completa, imutável, auditável

Propriedades:

  • Append-only: eventos nunca são editados ou deletados. Correções = novo evento (ex: BalanceCorrected).
  • Ordenados por aggregate: eventos do mesmo agregado têm sequência monotônica (version 1, 2, 3...).
  • Immutable audit log: você tem o "porquê" de cada mudança, não só o "o quê".
  • Replay: reconstruir qualquer estado passado aplicando eventos até um timestamp.

Command → Event → Aggregate: o ciclo básico

┌─────────┐     command     ┌──────────────┐     validate + produce    ┌──────────────┐
│ Cliente │ ──────────────► │  Command     │ ────────────────────────► │   Event(s)   │
└─────────┘                 │  Handler     │                            └──────────────┘
                            └──────────────┘                                   │
                                    ▲                                          │ append
                                    │ load                                     ▼
                                    │                                 ┌────────────────┐
                            ┌──────────────┐   apply events           │  EVENT STORE   │
                            │  Aggregate   │◄─────────────────────────│ (append-only)  │
                            │  (in memory) │                          └────────────────┘
                            └──────────────┘
                                                                              │ stream
                                                                              ▼
                                                                    ┌─────────────────┐
                                                                    │   PROJECTIONS   │
                                                                    │  (read models)  │
                                                                    └─────────────────┘

Exemplo: BankAccount em Python:

python
# bank_account.py — agregado em Event Sourcing

from dataclasses import dataclass, field
from typing import Literal

# ─── Events (imutáveis) ───
@dataclass(frozen=True)
class AccountOpened:
    account_id: str
    owner: str

@dataclass(frozen=True)
class Credited:
    account_id: str
    amount: float

@dataclass(frozen=True)
class Debited:
    account_id: str
    amount: float

Event = AccountOpened | Credited | Debited


# ─── Aggregate ───
@dataclass
class BankAccount:
    account_id: str = ""
    owner: str = ""
    balance: float = 0.0
    version: int = 0                    # número do último evento aplicado
    _uncommitted: list[Event] = field(default_factory=list)

    @classmethod
    def load(cls, history: list[Event]) -> "BankAccount":
        acc = cls()
        for e in history:
            acc._apply(e)
            acc.version += 1
        return acc

    def _apply(self, e: Event) -> None:
        if isinstance(e, AccountOpened):
            self.account_id = e.account_id
            self.owner = e.owner
        elif isinstance(e, Credited):
            self.balance += e.amount
        elif isinstance(e, Debited):
            self.balance -= e.amount

    # ─── Commands (validam + produzem eventos) ───
    def open(self, account_id: str, owner: str) -> None:
        if self.account_id:
            raise ValueError("Already opened")
        self._raise(AccountOpened(account_id, owner))

    def credit(self, amount: float) -> None:
        if amount <= 0:
            raise ValueError("Amount must be positive")
        self._raise(Credited(self.account_id, amount))

    def debit(self, amount: float) -> None:
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if self.balance - amount < 0:
            raise ValueError("Insufficient funds")   # regra de negócio
        self._raise(Debited(self.account_id, amount))

    def _raise(self, e: Event) -> None:
        self._apply(e)
        self._uncommitted.append(e)
        self.version += 1


# ─── Repository ───
class AccountRepository:
    def __init__(self, event_store):
        self.store = event_store

    async def load(self, account_id: str) -> BankAccount:
        events = await self.store.load_stream(account_id)
        return BankAccount.load(events)

    async def save(self, acc: BankAccount) -> None:
        if not acc._uncommitted:
            return
        expected_version = acc.version - len(acc._uncommitted)
        await self.store.append(
            acc.account_id, acc._uncommitted, expected_version
        )
        acc._uncommitted.clear()
💡
Optimistic concurrency: ao salvar, o repo manda expected_versionpro event store. Se dois writers carregaram version=5 e os dois tentam anexar, o segundo falha (o version já virou 6) e precisa reload + retry. É o padrão padrão em ES.

CQRS: separar escrita e leitura

CQRS é independente de Event Sourcing, mas se casam lindamente. A ideia: o modelo de escrita (command side) é otimizado pra invariantes; o modelo de leitura (query side) é otimizado pra consultas. Podem morar em bancos diferentes.

CLIENTE
  ├─ POST /accounts/42/debit  ────► ┌─────────────┐
  │   (command)                     │  Command    │ ─► event store (Postgres/EventStoreDB)
  │                                 │  Handler    │       │
  │                                 └─────────────┘       │ stream
  │                                                       ▼
  │                                             ┌───────────────────┐
  │                                             │   PROJECTORS      │
  │                                             │   (consumers)     │
  │                                             └───────────────────┘
  │                                                ▲       ▲      ▲
  │                                                │       │      │
  │                                                ▼       ▼      ▼
  │                                        ┌──────────┬────────┬────────┐
  │                                        │Read DB #1│ ES#2   │Redis   │
  │                                        │(ledger)  │(search)│(cache) │
  │                                        └──────────┴────────┴────────┘
  │                                                ▲       ▲      ▲
  └─ GET /accounts/42  ───────────────────────────┤       │      │
     (query)                             cada view serve queries específicas
LadoOtimizado paraModelos
Command (write)Consistency, invariantes, validaçãoAggregate rico, DDD
Query (read)Performance de leitura, view-specificRead models denormalizados
⚠️
CQRS sem Event Sourcing também existe — é só separar modelos de escrita e leitura usando o mesmo banco com views materializadas, ou replicação lógica pra DB de leitura. Muita gente confunde: CQRS + ES são ortogonais.

Projections: as views do mundo

Projeção é um consumer que lê o event stream e mantém uma "view" do estado. Cada projeção é independente — você pode criar quantas quiser, deletar, refazer do zero (replay).

python
# projections.py — projector async alimentando tabela de leitura

async def account_balance_projector(event_stream):
    """Projeção simples: saldo atual por conta."""
    async for event in event_stream:
        async with db.begin() as tx:
            if isinstance(event, AccountOpened):
                await tx.execute(
                    text("INSERT INTO accounts_view (id, owner, balance) VALUES (:id, :o, 0)"),
                    {"id": event.account_id, "o": event.owner},
                )
            elif isinstance(event, Credited):
                await tx.execute(
                    text("UPDATE accounts_view SET balance = balance + :a WHERE id = :id"),
                    {"id": event.account_id, "a": event.amount},
                )
            elif isinstance(event, Debited):
                await tx.execute(
                    text("UPDATE accounts_view SET balance = balance - :a WHERE id = :id"),
                    {"id": event.account_id, "a": event.amount},
                )
            # ledger de checkpoint: última offset processada (pra replay/resume)
            await tx.execute(
                text("UPDATE projection_offsets SET last_event_id = :id WHERE name = 'balance'"),
                {"id": event.id},
            )


async def transactions_history_projector(event_stream):
    """Outra projeção do MESMO stream: histórico de operações."""
    async for event in event_stream:
        if isinstance(event, (Credited, Debited)):
            await db.execute(
                text("""INSERT INTO transactions_view
                        (account_id, type, amount, at)
                        VALUES (:id, :t, :a, :ts)"""),
                {
                    "id": event.account_id,
                    "t": type(event).__name__,
                    "a": getattr(event, "amount", 0),
                    "ts": event.timestamp,
                },
            )
💡
Power move: quer uma nova feature que precisa de uma view diferente (ex: "histórico agregado por mês")? Crie uma nova projeção, rode replay do event store, tabela pronta. Sem migration complexa, sem backfill frágil. Esse é o caso de uso mais vendedor de Event Sourcing.

Snapshots: otimizando o load

Carregar uma conta com 100k eventos pra cada comando vira inviável. Snapshots salvam o estado em momentos específicos (ex: a cada 100 eventos), então o load puxa o snapshot mais recente + só os eventos posteriores.

python
# snapshot.py — carregamento com snapshot

async def load_with_snapshot(account_id: str) -> BankAccount:
    # 1. Busca snapshot mais recente
    snapshot = await db.fetchrow(
        "SELECT state, at_version FROM snapshots WHERE aggregate_id = $1 ORDER BY at_version DESC LIMIT 1",
        account_id,
    )

    if snapshot:
        acc = BankAccount(**snapshot["state"])
        acc.version = snapshot["at_version"]
        start_version = snapshot["at_version"] + 1
    else:
        acc = BankAccount()
        start_version = 0

    # 2. Carrega eventos APÓS o snapshot
    events = await load_events(account_id, from_version=start_version)
    for e in events:
        acc._apply(e)
        acc.version += 1

    return acc


async def save_snapshot_if_needed(acc: BankAccount):
    # A cada 100 eventos, persiste snapshot
    if acc.version % 100 == 0:
        await db.execute(
            "INSERT INTO snapshots (aggregate_id, state, at_version) VALUES ($1, $2, $3)",
            acc.account_id, serialize(acc), acc.version,
        )
⚠️
Snapshot não é source of truth. Se você muda lógica do agregado (ex: adiciona campo), snapshots antigos podem virar inválidos. Solução: versionar snapshots e ter migrador que descarta snapshots antigos (event log é sempre valido).

Versionamento de eventos: o maior pain point

Eventos são imutáveis pra sempre. Se daqui a 3 anos você precisa adicionar campo, mudar nome, ou quebrar formato — precisa de estratégia.

EstratégiaQuando usa
Adicionar campo opcionalMaior parte dos casos. Ignorar se ausente em eventos antigos.
Versionamento do evento (v1, v2)Quando breaking change inevitável. Consumers deserializam por versão.
Upcasters (transformers)Converter eventos v1 pra v2 ao carregar. Isola a lógica nova da antiga.
Copy-and-transformReplay completo transformando eventos pra nova schema. Custoso mas limpo.
python
# upcaster.py — exemplo de upcaster
def upcast(raw: dict) -> Event:
    event_type = raw["type"]
    version = raw.get("version", 1)

    if event_type == "Credited":
        if version == 1:
            # v1 não tinha currency. Antes, todos BRL.
            raw["currency"] = "BRL"
            raw["version"] = 2
        return Credited(**raw)

    if event_type == "Debited":
        ...
🚨
A tentação de editar eventos antigos: NUNCA. Event store é append-only por design. Editar = você quebrou o contrato com todos os consumers e projeções. Se um evento foi gravado errado, grave um EventCorrected novo. A história permanece auditável.

Event stores na prática

OpçãoDescriçãoQuando usa
EventStoreDB (Kurrent)Event store dedicado, streams nativos, projections built-in.Quando você quer o produto feito pra isso. Setup novo.
Postgres append-only tableevents (id, aggregate_id, version, type, payload, ts) com UNIQUE(aggregate_id, version).A escolha mais comum em 2026. ACID local, migrations normais.
KafkaLog imutável por partição. Compaction se precisar.Escalas massivas, múltiplos consumers independentes.
DynamoDB streamsDynamoDB + DDB Streams pra projeções.AWS native, alto throughput.
Marten (C# sobre Postgres)Framework maduro Postgres+JSON.Stack .NET.

Schema Postgres comum:

sql
-- event store simples em Postgres
CREATE TABLE events (
    id                BIGSERIAL PRIMARY KEY,            -- ordem global
    aggregate_id      UUID NOT NULL,
    aggregate_version INT NOT NULL,                     -- ordem dentro do aggregate
    event_type        TEXT NOT NULL,
    payload           JSONB NOT NULL,
    metadata          JSONB DEFAULT '{}',               -- correlação, causação, user
    occurred_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE(aggregate_id, aggregate_version)             -- optimistic concurrency
);

CREATE INDEX ON events (aggregate_id, aggregate_version);
CREATE INDEX ON events (event_type);                    -- por tipo p/ projeções
CREATE INDEX ON events (occurred_at);                   -- filtros temporais

-- append com otimista
INSERT INTO events (aggregate_id, aggregate_version, event_type, payload)
VALUES ($1, $2, $3, $4);
-- vai falhar com "unique constraint violation" se outro writer já gravou aquela versão

Decisões reais

📋 Sistema bancário: extrato detalhado, auditoria legal, time travel de saldo

Event Sourcing é perfect fit

A história ALÉM do estado atual é o produto — extrato é literalmente o event log filtrado. Auditoria vira trivial (cada mudança tem quem/quando/por quê). Compliance (Bacen, LGPD) exige rastreabilidade que ES dá de graça. Complexidade extra compensa.

Alt: CRUD + audit_log tableFunciona, mas audit log sempre diverge do state por bugs/esquecimento.

📋 CRUD de cadastro de produtos (catálogo e-commerce)

NÃO use Event Sourcing — CRUD simples

Você só precisa do estado atual. A história de edições raramente importa fora de log de auditoria. ES aqui seria 5x mais código, 3x mais operação, zero retorno. Use Postgres + index + cache.

Alt: Event SourcingSó se tiver requisito específico de auditoria granular.

📋 E-commerce com múltiplas views do mesmo pedido (dashboard ops, app cliente, BI)

CQRS com projeções dedicadas (não exige ES)

CQRS te dá liberdade pra modelar cada view sob medida. Você pode ter: Postgres pra write, Elasticsearch pra busca, Redis pra cache, data warehouse pra BI — todos alimentados via CDC ou eventos. Event Sourcing opcional; o ganho de CQRS sozinho já é enorme.

Alt: Single DB com viewsSe todas as views cabem no mesmo banco sem dor. Mais simples.

📋 Domain complexo: seguros, logística, healthcare workflows

Event Sourcing + DDD (Domain-Driven Design)

Domínios complexos com muitas transições de estado são onde ES brilha. O evento é uma linguagem ubíqua (domain experts reconhecem 'ClaimApproved', 'PolicyCanceled'). Replay dá poder de análise histórica. Pair com DDD bounded contexts.

Alt: State-based DDDTradicional — aggregate salva estado, não eventos. Perde time travel.

Perguntas típicas (Q&A)

Como apagar dados (LGPD right-to-be-forgotten) em event store imutável?

Patterns: (1) crypto-shredding — criptografar campos PII com chave por-usuário, jogar a chave fora quando solicitado (dado vira ilegível); (2) soft delete + tombstone event + reescrita dos snapshots; (3) segregar PII em loja separada referenciada por ID. A crypto-shredding é o padrão moderno.

Projeções podem ficar inconsistentes com o event store?

Sim, transientemente — projeção é eventual consistency. Monitor o lag do projector. Em caso de bug, delete a view, rode replay. Essa é a belezura: o source of truth é sempre os eventos.

Event store e eventos de integração (inter-serviços) são a mesma coisa?

Não! Domain events (internos) vivem no event store. Integration events (publicados pra outros serviços) são um conceito separado. Comum: domain event interno → handler → publica integration event diferente (com schema estável). Mistura os dois e você acopla todo mundo ao seu domínio interno.

ES é bom pra real-time analytics?

Ótimo — cada evento vira input pro streaming. Combine com Kafka + Flink/ksqlDB: projeções viram agregações em tempo real. Data warehouse populado via Kafka Connect. Event Sourcing empurra análise pra ser de primeira classe.

Tamanho do event store explode?

Cresce, mas é append-only (muito eficiente). Compressão JSONB, particionamento por aggregate_id, cold storage pra eventos antigos (S3 + Parquet) se necessário. Raramente é gargalo real — mais comum é projeção mal dimensionada.

Take-aways:
  • Event Sourcing: persiste eventos (deltas), não estado. Estado atual = fold sobre os eventos.
  • CQRS: separa command side (escrita, invariantes) de query side (leitura, otimizada). Ortogonal a ES, combina bem.
  • Eventos são imutáveis. Correções = novos eventos, nunca edit. Versionamento via upcasters.
  • Snapshots otimizam load, não são source of truth. Podem ser descartados e refeitos.
  • Projections alimentam read models múltiplos. Criar uma view nova = novo projector + replay.
  • Optimistic concurrency: append falha se aggregate_version já existe. Retry after reload.
  • Use ES quando a história importa (audit, time travel, domínio complexo). Evite em CRUD simples.

Próximo módulo: voltando ao single-node, mas com profundidade — Postgres MVCC e isolation levels.

🧩

Quiz rápido

4 perguntas · Acerte tudo e ganhe o badge 🎯 Gabarito

Continue lendo