CQRS and Event Sourcing in Go: Practical Patterns from the Trenches
Building scalable systems often means accepting that the traditional CRUD approach won't cut it. When audit requirements get strict, when read and write patterns diverge, or when you need to answer "how did we get to this state?"—that's when CQRS and Event Sourcing become your best friends.
After implementing these patterns across multiple production systems and diving deep into Three Dots Labs' excellent "Go with the Domain" book, I've learned that Go's explicit nature makes it surprisingly well-suited for these architectural patterns. Let me share what actually works.
Why CQRS and Event Sourcing Matter
Before diving into code, let's be clear about the problems these patterns solve:
CQRS (Command Query Responsibility Segregation) separates your write model from your read model. Your commands focus on business logic and consistency, while your queries optimize for fast, denormalized reads. No more trying to serve both masters with one model.
Event Sourcing stores your state as a sequence of events rather than current state snapshots. Every change becomes an immutable fact—perfect for audit trails, debugging production issues, and even time-travel debugging.
As the Three Dots Labs team puts it in their book: "Implementation details are often just details. Get the domain code right." This philosophy drives everything that follows.
The Command Side: Where Business Logic Lives
Let's start with a practical example from a payment processing system. Here's how we structure commands:
package command
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
)
// ProcessPayment represents the intent to process a payment
type ProcessPayment struct {
PaymentID uuid.UUID
Amount Money
Method PaymentMethod
CustomerID string
IdempotencyKey string
}
// ProcessPaymentHandler handles the business logic for payment processing
type ProcessPaymentHandler struct {
repo PaymentRepository
eventBus EventBus
riskEngine RiskAssessment
}
func (h *ProcessPaymentHandler) Handle(ctx context.Context, cmd ProcessPayment) error {
// Load aggregate (or create if new)
payment, err := h.repo.GetByID(ctx, cmd.PaymentID)
if err != nil {
payment = NewPayment(cmd.PaymentID, cmd.CustomerID)
}
// Check idempotency
if payment.HasProcessedIdempotencyKey(cmd.IdempotencyKey) {
return nil // Already processed
}
// Business logic lives in the domain
riskScore := h.riskEngine.Assess(cmd.CustomerID, cmd.Amount)
events, err := payment.Process(
cmd.Amount,
cmd.Method,
riskScore,
cmd.IdempotencyKey,
)
if err != nil {
return fmt.Errorf("processing payment: %w", err)
}
// Save events
if err := h.repo.Save(ctx, payment); err != nil {
return fmt.Errorf("saving payment: %w", err)
}
// Publish for read model updates and integration
for _, event := range events {
if err := h.eventBus.Publish(ctx, event); err != nil {
// Log but don't fail - events are persisted
log.Printf("failed to publish event: %v", err)
}
}
return nil
}
The beauty here? The handler orchestrates, but the domain makes decisions. This separation becomes crucial as complexity grows.
The Domain Layer: Where Events Are Born
Following Domain-Driven Design principles from "Go with the Domain," our aggregates produce events as the result of business operations:
package payment
import (
"fmt"
"time"
"github.com/google/uuid"
)
// Payment aggregate root
type Payment struct {
id uuid.UUID
customerID string
status PaymentStatus
amount Money
method PaymentMethod
processedKeys map[string]bool
// Event sourcing
version int
pendingEvents []Event
appliedEvents []Event
}
// Process applies business rules and generates events
func (p *Payment) Process(
amount Money,
method PaymentMethod,
riskScore RiskScore,
idempotencyKey string,
) ([]Event, error) {
// Guard against invalid state transitions
if p.status == PaymentStatusCompleted {
return nil, ErrPaymentAlreadyCompleted
}
if p.status == PaymentStatusFailed {
return nil, ErrPaymentAlreadyFailed
}
// Check idempotency
if p.processedKeys[idempotencyKey] {
return nil, nil // Already processed
}
// Business rule: High risk payments need review
if riskScore.IsHigh() && amount.GreaterThan(NewMoney(10000, "USD")) {
event := PaymentFlaggedForReview{
PaymentID: p.id,
Amount: amount,
RiskScore: riskScore,
FlaggedAt: time.Now(),
}
p.apply(event)
return []Event{event}, nil
}
// Business rule: Different limits per method
if err := method.ValidateLimit(amount); err != nil {
event := PaymentFailed{
PaymentID: p.id,
Reason: err.Error(),
FailedAt: time.Now(),
}
p.apply(event)
return []Event{event}, nil
}
// Process payment
event := PaymentProcessed{
PaymentID: p.id,
Amount: amount,
Method: method,
ProcessedAt: time.Now(),
IdempotencyKey: idempotencyKey,
}
p.apply(event)
return []Event{event}, nil
}
// apply updates the aggregate state from an event
func (p *Payment) apply(event Event) {
switch e := event.(type) {
case PaymentProcessed:
p.status = PaymentStatusCompleted
p.amount = e.Amount
p.method = e.Method
p.processedKeys[e.IdempotencyKey] = true
case PaymentFailed:
p.status = PaymentStatusFailed
case PaymentFlaggedForReview:
p.status = PaymentStatusPendingReview
}
p.pendingEvents = append(p.pendingEvents, event)
p.version++
}
// Reconstitute rebuilds aggregate from events (Event Sourcing)
func (p *Payment) Reconstitute(events []Event) {
for _, event := range events {
p.apply(event)
p.appliedEvents = append(p.appliedEvents, event)
}
p.pendingEvents = nil // Clear pending after reconstitution
}
This pattern, heavily inspired by Three Dots Labs' approach, keeps business logic in the domain while maintaining a clear event trail.
Event Storage: The Source of Truth
With Event Sourcing, your events become the source of truth. Here's a production-ready event store implementation:
package eventstore
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
)
type PostgresEventStore struct {
db *sql.DB
}
type StoredEvent struct {
ID uuid.UUID
AggregateID uuid.UUID
AggregateType string
EventType string
EventData json.RawMessage
EventVersion int
Metadata map[string]string
Timestamp time.Time
}
func (s *PostgresEventStore) SaveEvents(
ctx context.Context,
aggregateID uuid.UUID,
events []Event,
expectedVersion int,
) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Optimistic concurrency control
var currentVersion int
err = tx.QueryRowContext(ctx, `
SELECT COALESCE(MAX(event_version), 0)
FROM events
WHERE aggregate_id = $1
`, aggregateID).Scan(¤tVersion)
if err != nil {
return err
}
if currentVersion != expectedVersion {
return ErrConcurrentModification
}
// Store each event
for i, event := range events {
eventData, err := json.Marshal(event)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `
INSERT INTO events (
id, aggregate_id, aggregate_type,
event_type, event_data, event_version,
metadata, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`,
uuid.New(),
aggregateID,
event.AggregateType(),
event.EventType(),
eventData,
currentVersion + i + 1,
event.Metadata(),
event.Timestamp(),
)
if err != nil {
return err
}
}
// Update snapshot for read optimization (optional)
if err := s.updateSnapshot(ctx, tx, aggregateID); err != nil {
return err
}
return tx.Commit()
}
func (s *PostgresEventStore) GetEvents(
ctx context.Context,
aggregateID uuid.UUID,
fromVersion int,
) ([]StoredEvent, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, aggregate_id, aggregate_type,
event_type, event_data, event_version,
metadata, timestamp
FROM events
WHERE aggregate_id = $1 AND event_version > $2
ORDER BY event_version ASC
`, aggregateID, fromVersion)
if err != nil {
return nil, err
}
defer rows.Close()
var events []StoredEvent
for rows.Next() {
var event StoredEvent
var metadata sql.NullString
err := rows.Scan(
&event.ID,
&event.AggregateID,
&event.AggregateType,
&event.EventType,
&event.EventData,
&event.EventVersion,
&metadata,
&event.Timestamp,
)
if err != nil {
return nil, err
}
if metadata.Valid {
json.Unmarshal([]byte(metadata.String), &event.Metadata)
}
events = append(events, event)
}
return events, nil
}
The Query Side: Optimized for Reads
Now for the query side. As Three Dots Labs demonstrates, your read models can be completely different from your write models:
package query
import (
"context"
"database/sql"
"time"
)
// PaymentSummaryView is optimized for dashboard queries
type PaymentSummaryView struct {
PaymentID string `json:"payment_id"`
CustomerID string `json:"customer_id"`
CustomerName string `json:"customer_name"`
Amount float64 `json:"amount"`
Currency string `json:"currency"`
Status string `json:"status"`
Method string `json:"method"`
ProcessedAt time.Time `json:"processed_at"`
RiskScore int `json:"risk_score"`
RequiresReview bool `json:"requires_review"`
}
type PaymentQueryService struct {
db *sql.DB
}
func (s *PaymentQueryService) GetCustomerPayments(
ctx context.Context,
customerID string,
filter PaymentFilter,
) ([]PaymentSummaryView, error) {
query := `
SELECT
p.payment_id,
p.customer_id,
c.name as customer_name,
p.amount,
p.currency,
p.status,
p.method,
p.processed_at,
p.risk_score,
p.requires_review
FROM payment_summary p
JOIN customers c ON c.id = p.customer_id
WHERE p.customer_id = $1
AND p.processed_at BETWEEN $2 AND $3
ORDER BY p.processed_at DESC
LIMIT $4 OFFSET $5
`
rows, err := s.db.QueryContext(
ctx, query,
customerID,
filter.From,
filter.To,
filter.Limit,
filter.Offset,
)
if err != nil {
return nil, err
}
defer rows.Close()
var payments []PaymentSummaryView
for rows.Next() {
var payment PaymentSummaryView
err := rows.Scan(
&payment.PaymentID,
&payment.CustomerID,
&payment.CustomerName,
&payment.Amount,
&payment.Currency,
&payment.Status,
&payment.Method,
&payment.ProcessedAt,
&payment.RiskScore,
&payment.RequiresReview,
)
if err != nil {
return nil, err
}
payments = append(payments, payment)
}
return payments, nil
}
// GetDailyStatistics provides aggregated views
func (s *PaymentQueryService) GetDailyStatistics(
ctx context.Context,
date time.Time,
) (*DailyStats, error) {
// This query would be expensive with Event Sourcing alone
// But with CQRS, we can maintain optimized read models
query := `
SELECT
COUNT(*) as total_transactions,
SUM(amount) as total_volume,
AVG(amount) as average_amount,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_count,
COUNT(CASE WHEN requires_review THEN 1 END) as review_count
FROM payment_summary
WHERE DATE(processed_at) = DATE($1)
`
var stats DailyStats
err := s.db.QueryRowContext(ctx, query, date).Scan(
&stats.TotalTransactions,
&stats.TotalVolume,
&stats.AverageAmount,
&stats.FailedCount,
&stats.ReviewCount,
)
return &stats, err
}
Projections: Keeping Read Models in Sync
The glue between commands and queries? Projections that listen to events and update read models:
package projection
import (
"context"
"database/sql"
"encoding/json"
"log"
)
type PaymentProjection struct {
db *sql.DB
customerSvc CustomerService
}
func (p *PaymentProjection) Handle(ctx context.Context, event Event) error {
switch e := event.(type) {
case PaymentProcessed:
return p.handlePaymentProcessed(ctx, e)
case PaymentFailed:
return p.handlePaymentFailed(ctx, e)
case PaymentFlaggedForReview:
return p.handlePaymentFlagged(ctx, e)
default:
log.Printf("Unknown event type: %T", e)
return nil
}
}
func (p *PaymentProjection) handlePaymentProcessed(
ctx context.Context,
event PaymentProcessed,
) error {
// Enrich with customer data
customer, err := p.customerSvc.GetByID(ctx, event.CustomerID)
if err != nil {
return err
}
// Update denormalized read model
_, err = p.db.ExecContext(ctx, `
INSERT INTO payment_summary (
payment_id, customer_id, customer_name,
amount, currency, status, method,
processed_at, risk_score, requires_review
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (payment_id) DO UPDATE SET
status = EXCLUDED.status,
processed_at = EXCLUDED.processed_at
`,
event.PaymentID,
event.CustomerID,
customer.Name,
event.Amount.Value,
event.Amount.Currency,
"completed",
event.Method,
event.ProcessedAt,
event.RiskScore,
false,
)
return err
}
func (p *PaymentProjection) handlePaymentFlagged(
ctx context.Context,
event PaymentFlaggedForReview,
) error {
_, err := p.db.ExecContext(ctx, `
UPDATE payment_summary
SET requires_review = true,
status = 'pending_review',
risk_score = $2
WHERE payment_id = $1
`, event.PaymentID, event.RiskScore)
return err
}
Eventual Consistency: Embracing the Reality
One of the key insights from "Go with the Domain" is that eventual consistency isn't a bug—it's a feature. Your projections might lag behind by milliseconds or seconds, but that's fine for most use cases:
// Anti-pattern: Trying to read immediately after write
payment := ProcessPayment(cmd)
summary := GetPaymentSummary(payment.ID) // Might not exist yet!
// Better: Return write model data for immediate feedback
payment := ProcessPayment(cmd)
return PaymentProcessedResponse{
ID: payment.ID,
Status: payment.Status,
// Don't query read model immediately
}
// Best: Design UI for eventual consistency
// Show "Processing..." state and poll or use websockets
Testing Event-Sourced Systems
Testing becomes surprisingly elegant with Event Sourcing:
func TestPaymentProcessing(t *testing.T) {
// Given: Previous events
payment := NewPayment(uuid.New(), "customer-123")
payment.Reconstitute([]Event{
PaymentInitiated{
PaymentID: payment.ID,
CustomerID: "customer-123",
Amount: NewMoney(100, "USD"),
},
})
// When: Business operation
events, err := payment.Process(
NewMoney(100, "USD"),
CreditCard,
LowRisk,
"idempotency-key-456",
)
// Then: Verify events
require.NoError(t, err)
require.Len(t, events, 1)
processed, ok := events[0].(PaymentProcessed)
require.True(t, ok)
assert.Equal(t, NewMoney(100, "USD"), processed.Amount)
assert.Equal(t, CreditCard, processed.Method)
}
// Test projections independently
func TestPaymentProjection(t *testing.T) {
db := setupTestDB(t)
projection := NewPaymentProjection(db, mockCustomerService())
// Apply event
err := projection.Handle(context.Background(), PaymentProcessed{
PaymentID: uuid.New(),
CustomerID: "customer-123",
Amount: NewMoney(100, "USD"),
ProcessedAt: time.Now(),
})
require.NoError(t, err)
// Verify read model updated
var count int
err = db.QueryRow(
"SELECT COUNT(*) FROM payment_summary WHERE customer_id = $1",
"customer-123",
).Scan(&count)
assert.Equal(t, 1, count)
}
Production Considerations
After running CQRS + Event Sourcing in production, here are the critical lessons:
1. Snapshot Strategy
Don't replay thousands of events on every load. Implement snapshots:
func (s *EventStore) LoadAggregate(
ctx context.Context,
id uuid.UUID,
) (*Payment, error) {
// Try loading from snapshot first
snapshot, err := s.loadSnapshot(ctx, id)
if err == nil {
// Load events after snapshot
events, err := s.GetEvents(ctx, id, snapshot.Version)
if err != nil {
return nil, err
}
payment := snapshot.ToAggregate()
payment.Reconstitute(events)
return payment, nil
}
// No snapshot, load all events
events, err := s.GetEvents(ctx, id, 0)
if err != nil {
return nil, err
}
payment := NewPayment(id, "")
payment.Reconstitute(events)
// Create snapshot if many events
if len(events) > 100 {
s.saveSnapshot(ctx, payment)
}
return payment, nil
}
2. Event Schema Evolution
Events are immutable, but requirements change. Plan for it:
type EventUpgrader struct {
upgraders map[string]func(json.RawMessage) (Event, error)
}
func (u *EventUpgrader) Upgrade(eventType string, data json.RawMessage) (Event, error) {
switch eventType {
case "PaymentProcessedV1":
// Convert old format to new
var v1 PaymentProcessedV1
json.Unmarshal(data, &v1)
return PaymentProcessed{
PaymentID: v1.ID, // Map old field names
Amount: NewMoney(v1.Amount, "USD"), // Add defaults
}, nil
default:
// Current version
return u.unmarshal(eventType, data)
}
}
3. Monitoring and Observability
Track projection lag and event processing:
func (p *ProjectionRunner) Run(ctx context.Context) {
for {
select {
case event := <-p.events:
start := time.Now()
err := p.projection.Handle(ctx, event)
p.metrics.RecordProjection(
event.EventType(),
time.Since(start),
err == nil,
)
if err != nil {
p.handleError(event, err)
}
case <-ctx.Done():
return
}
}
}
When to Use (and When Not To)
Three Dots Labs makes an excellent point in their book: these patterns aren't silver bullets. Use them when:
✅ You need audit trails - Every state change is recorded ✅ Complex domain logic - Commands and domain events model it naturally ✅ Read/write patterns diverge - One size doesn't fit all ✅ Time travel debugging - Replay events to any point in time ✅ Multiple read models - Same events, different projections
Avoid them when:
❌ Simple CRUD - Don't overcomplicate ❌ Strong consistency required - Eventual consistency is the default ❌ Team lacks experience - Start with simpler patterns ❌ Low event volume - The overhead might not be worth it
Conclusion
CQRS and Event Sourcing in Go provide a powerful foundation for complex business domains. The patterns force you to think about your domain, separate concerns clearly, and build systems that tell the story of how they got to their current state.
Three Dots Labs' "Go with the Domain" book brilliantly demonstrates these patterns using real, production-ready Go code. Their approach of showing the refactoring journey—from typical Go application to full CQRS + Event Sourcing—makes the concepts tangible and practical.
The key insight? Start with the domain. When you model behaviors instead of data structures, when you think in events rather than state mutations, the implementation flows naturally. Go's simplicity and explicitness make it an excellent choice for these patterns—no framework magic, just clear, testable code.
Remember: CQRS and Event Sourcing are tools. Use them when they solve real problems, not because they're trendy. But when you do need them, Go and the patterns from "Go with the Domain" will serve you well.
Resources
- Go with the Domain - Three Dots Labs - The book that inspired many patterns in this post
- Wild Workouts Example - Complete DDD + CQRS implementation
- Watermill - Three Dots Labs' excellent event-driven library
- Event Sourcing in Production - Real-world experiences from Three Dots Labs