CQRS e Event Sourcing em Go
CQRS (Command Query Responsibility Segregation) e Event Sourcing são padrões arquiteturais poderosos que, quando combinados, permitem construir sistemas altamente escaláveis, auditáveis e flexíveis. Vamos explorar esses conceitos e ver como implementá-los em Go.
O Que é CQRS?
CQRS separa as operações de leitura (Query) das operações de escrita (Command):
Abordagem Tradicional:
┌─────────────────────────────────────────┐
│ Modelo Único │
│ │
│ Create Read Update Delete │
│ ↓ ↓ ↓ ↓ │
│ ┌───────────────────────────┐ │
│ │ Banco de Dados │ │
│ └───────────────────────────┘ │
└─────────────────────────────────────────┘
CQRS:
┌─────────────────────────────────────────┐
│ CQRS │
│ │
│ Commands Queries │
│ (Create, Update, (Read) │
│ Delete) │
│ ↓ ↓ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Write │───────→│ Read │ │
│ │ Model │ Sync │ Model │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────────┘
O Que é Event Sourcing?
Em vez de armazenar o estado atual, Event Sourcing armazena a sequência de eventos que levaram a esse estado:
Abordagem Tradicional:
┌───────────────────────────────────────┐
│ Conta: { id: 1, saldo: 1000 } │
└───────────────────────────────────────┘
Event Sourcing:
┌───────────────────────────────────────┐
│ Evento 1: ContaCriada { id: 1 } │
│ Evento 2: DepositoRealizado { 500 } │
│ Evento 3: DepositoRealizado { 700 } │
│ Evento 4: SaqueRealizado { 200 } │
│ │
│ Estado Atual = replay de eventos │
│ 0 + 500 + 700 - 200 = 1000 │
└───────────────────────────────────────┘
Por Que Usar Esses Padrões?
Benefícios do Event Sourcing
- Auditoria Completa: Histórico completo de todas as mudanças
- Time Travel: Reconstruir estado em qualquer ponto no tempo
- Debug: Entender exatamente como o sistema chegou ao estado atual
- Replay de Eventos: Corrigir bugs e re-processar dados
- Múltiplas Projeções: Diferentes visões dos mesmos dados
Benefícios do CQRS
- Escalabilidade Independente: Escalar leituras e escritas separadamente
- Otimização: Modelos otimizados para cada operação
- Simplicidade: Cada lado é mais simples que um modelo único
Implementação em Go
Event Store
// domain/events.go
package domain
import "time"
type Event interface {
EventType() string
AggregateID() string
Version() int
Timestamp() time.Time
}
type BaseEvent struct {
ID string
AggregateId string
Ver int
Type string
OccurredAt time.Time
}
func (e BaseEvent) EventType() string { return e.Type }
func (e BaseEvent) AggregateID() string { return e.AggregateId }
func (e BaseEvent) Version() int { return e.Ver }
func (e BaseEvent) Timestamp() time.Time { return e.OccurredAt }
// Eventos específicos
type ContaCriada struct {
BaseEvent
NomeTitular string
SaldoInicial float64
}
type DepositoRealizado struct {
BaseEvent
Valor float64
Descricao string
}
type SaqueRealizado struct {
BaseEvent
Valor float64
Descricao string
}
Event Store Interface
// eventstore/store.go
package eventstore
import (
"context"
"myapp/domain"
)
type EventStore interface {
// Append adiciona eventos com controle de concorrência otimista
Append(ctx context.Context, aggregateID string, expectedVersion int, events []domain.Event) error
// Load carrega eventos de um agregado
Load(ctx context.Context, aggregateID string, fromVersion int) ([]domain.Event, error)
// LoadAll carrega todos os eventos (para projeções)
LoadAll(ctx context.Context, fromPosition int64, limit int) ([]domain.Event, error)
}
Implementação PostgreSQL
// eventstore/postgres.go
package eventstore
import (
"context"
"database/sql"
"encoding/json"
"myapp/domain"
)
type PostgresEventStore struct {
db *sql.DB
}
func (s *PostgresEventStore) Append(ctx context.Context, aggregateID string, expectedVersion int, events []domain.Event) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Verificar versão atual (controle de concorrência)
var currentVersion int
err = tx.QueryRowContext(ctx, `
SELECT COALESCE(MAX(version), 0) FROM events
WHERE aggregate_id = $1 FOR UPDATE
`, aggregateID).Scan(¤tVersion)
if currentVersion != expectedVersion {
return ErrConcurrencyConflict
}
// Inserir eventos
for i, event := range events {
data, _ := json.Marshal(event)
_, err = tx.ExecContext(ctx, `
INSERT INTO events (aggregate_id, version, event_type, data, timestamp)
VALUES ($1, $2, $3, $4, $5)
`, aggregateID, expectedVersion+i+1, event.EventType(), data, event.Timestamp())
if err != nil {
return err
}
}
return tx.Commit()
}
func (s *PostgresEventStore) Load(ctx context.Context, aggregateID string, fromVersion int) ([]domain.Event, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT event_type, data FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version
`, aggregateID, fromVersion)
if err != nil {
return nil, err
}
defer rows.Close()
var events []domain.Event
for rows.Next() {
var eventType string
var data []byte
if err := rows.Scan(&eventType, &data); err != nil {
return nil, err
}
event := deserializeEvent(eventType, data)
events = append(events, event)
}
return events, nil
}
Agregado (Write Model)
// domain/conta.go
package domain
import "errors"
type Conta struct {
id string
titular string
saldo float64
versao int
eventos []Event
}
func NovaConta(id, titular string, saldoInicial float64) (*Conta, error) {
if saldoInicial < 0 {
return nil, errors.New("saldo inicial não pode ser negativo")
}
conta := &Conta{}
conta.aplicar(ContaCriada{
BaseEvent: BaseEvent{AggregateId: id, Type: "ContaCriada"},
NomeTitular: titular,
SaldoInicial: saldoInicial,
})
return conta, nil
}
func (c *Conta) Depositar(valor float64, descricao string) error {
if valor <= 0 {
return errors.New("valor deve ser positivo")
}
c.aplicar(DepositoRealizado{
BaseEvent: BaseEvent{AggregateId: c.id, Type: "DepositoRealizado"},
Valor: valor,
Descricao: descricao,
})
return nil
}
func (c *Conta) Sacar(valor float64, descricao string) error {
if valor <= 0 {
return errors.New("valor deve ser positivo")
}
if c.saldo < valor {
return errors.New("saldo insuficiente")
}
c.aplicar(SaqueRealizado{
BaseEvent: BaseEvent{AggregateId: c.id, Type: "SaqueRealizado"},
Valor: valor,
Descricao: descricao,
})
return nil
}
func (c *Conta) aplicar(evento Event) {
c.aplicarEvento(evento)
c.eventos = append(c.eventos, evento)
}
func (c *Conta) aplicarEvento(evento Event) {
switch e := evento.(type) {
case ContaCriada:
c.id = e.AggregateId
c.titular = e.NomeTitular
c.saldo = e.SaldoInicial
case DepositoRealizado:
c.saldo += e.Valor
case SaqueRealizado:
c.saldo -= e.Valor
}
c.versao++
}
// CarregarDeEventos reconstrói o agregado a partir de eventos
func CarregarContaDeEventos(eventos []Event) *Conta {
conta := &Conta{}
for _, evento := range eventos {
conta.aplicarEvento(evento)
}
return conta
}
Command Handler
// usecase/conta_commands.go
package usecase
import (
"context"
"myapp/domain"
"myapp/eventstore"
)
type ContaCommandHandler struct {
store eventstore.EventStore
}
type DepositarCommand struct {
ContaID string
Valor float64
Descricao string
}
func (h *ContaCommandHandler) Depositar(ctx context.Context, cmd DepositarCommand) error {
// 1. Carregar eventos
eventos, err := h.store.Load(ctx, cmd.ContaID, 0)
if err != nil {
return err
}
// 2. Reconstruir agregado
conta := domain.CarregarContaDeEventos(eventos)
// 3. Executar comando (gera novos eventos)
if err := conta.Depositar(cmd.Valor, cmd.Descricao); err != nil {
return err
}
// 4. Persistir novos eventos
return h.store.Append(ctx, cmd.ContaID, conta.Versao(), conta.EventosPendentes())
}
Projeções (Read Model)
// projections/saldo_projection.go
package projections
import (
"context"
"database/sql"
"myapp/domain"
)
type SaldoProjection struct {
db *sql.DB
}
func (p *SaldoProjection) Handle(ctx context.Context, evento domain.Event) error {
switch e := evento.(type) {
case domain.ContaCriada:
_, err := p.db.ExecContext(ctx, `
INSERT INTO conta_saldos (conta_id, titular, saldo)
VALUES ($1, $2, $3)
`, e.AggregateID(), e.NomeTitular, e.SaldoInicial)
return err
case domain.DepositoRealizado:
_, err := p.db.ExecContext(ctx, `
UPDATE conta_saldos SET saldo = saldo + $1 WHERE conta_id = $2
`, e.Valor, e.AggregateID())
return err
case domain.SaqueRealizado:
_, err := p.db.ExecContext(ctx, `
UPDATE conta_saldos SET saldo = saldo - $1 WHERE conta_id = $2
`, e.Valor, e.AggregateID())
return err
}
return nil
}
Query Service
// query/conta_queries.go
package query
import (
"context"
"database/sql"
)
type ContaQueryService struct {
db *sql.DB
}
type ContaSaldo struct {
ContaID string
Titular string
Saldo float64
}
func (s *ContaQueryService) ObterSaldo(ctx context.Context, contaID string) (*ContaSaldo, error) {
var saldo ContaSaldo
err := s.db.QueryRowContext(ctx, `
SELECT conta_id, titular, saldo FROM conta_saldos WHERE conta_id = $1
`, contaID).Scan(&saldo.ContaID, &saldo.Titular, &saldo.Saldo)
if err == sql.ErrNoRows {
return nil, nil
}
return &saldo, err
}
func (s *ContaQueryService) ListarContas(ctx context.Context) ([]ContaSaldo, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT conta_id, titular, saldo FROM conta_saldos ORDER BY titular
`)
if err != nil {
return nil, err
}
defer rows.Close()
var contas []ContaSaldo
for rows.Next() {
var c ContaSaldo
rows.Scan(&c.ContaID, &c.Titular, &c.Saldo)
contas = append(contas, c)
}
return contas, nil
}
Quando Usar
Use CQRS + Event Sourcing Quando:
- ✅ Você precisa de auditoria completa
- ✅ Requisitos de leitura e escrita são muito diferentes
- ✅ Você precisa de múltiplas visões dos mesmos dados
- ✅ Eventos são significativos para o negócio
- ✅ Time-travel queries são necessárias
Não Use Quando:
- ❌ CRUD simples é suficiente
- ❌ Consistência forte é obrigatória em todo lugar
- ❌ A equipe não tem experiência com esses padrões
- ❌ É um projeto pequeno ou protótipo
Conclusão
CQRS e Event Sourcing são ferramentas poderosas, mas adicionam complexidade. Use-os quando os benefícios claramente superam os custos. Para muitas aplicações, um modelo tradicional bem estruturado é mais apropriado.
Quando usados corretamente, esses padrões permitem construir sistemas que são não apenas escaláveis, mas também compreensíveis e auditáveis - características valiosas em sistemas de missão crítica.