A practical implementation of Event Sourcing and CQRS patterns in Go, demonstrated through a banking domain.
- Event Store: Append-only event log with PostgreSQL backend
- Aggregate Roots: Bank Account aggregate with domain logic
- CQRS: Separate command and query models
- Event Projections: Real-time read models for account balances and transaction history
- Snapshots: Performance optimization for aggregates with many events
- Optimistic Concurrency: Version-based conflict detection
- Event Replay: Rebuild projections from event history
- Temporal Queries: View aggregate state at any point in history
┌─────────────────────────────────────────────────────────────────────┐
│ Client │
└─────────────────────────────────────────────────────────────────────┘
│ │
│ Commands │ Queries
▼ ▼
┌─────────────────────────────┐ ┌─────────────────────────────────┐
│ Command Handler │ │ Query Service │
│ (OpenAccount, Deposit, │ │ (GetAccount, ListAccounts, │
│ Withdraw, Transfer) │ │ GetTransactions) │
└─────────────────────────────┘ └─────────────────────────────────┘
│ │
▼ │
┌─────────────────────────────┐ │
│ Aggregate Root │ │
│ (BankAccount) │ │
└─────────────────────────────┘ │
│ │
▼ │
┌─────────────────────────────┐ ┌────────────┴────────────────────┐
│ Event Store │──▶│ Projections │
│ (Append-only log) │ │ (AccountBalance, Transactions) │
└─────────────────────────────┘ └─────────────────────────────────┘
│ │
└────────────────────────────────────┘
│
▼
┌─────────────────┐
│ PostgreSQL │
└─────────────────┘
| Event | Description |
|---|---|
AccountOpened |
A new bank account was created |
MoneyDeposited |
Money was deposited into an account |
MoneyWithdrawn |
Money was withdrawn from an account |
TransferSent |
Money was sent to another account |
TransferReceived |
Money was received from another account |
AccountClosed |
An account was closed |
# Start PostgreSQL and the application
docker-compose up -d
# Check logs
docker-compose logs -f eventsource# Start PostgreSQL and run migrations
make dev-db
# Run the server
make devcurl -X POST http://localhost:8080/api/v1/accounts \
-H "Content-Type: application/json" \
-d '{
"owner_name": "John Doe",
"initial_deposit": "1000.00"
}'Response:
{
"success": true,
"data": {
"account_id": "550e8400-e29b-41d4-a716-446655440000"
}
}curl -X POST http://localhost:8080/api/v1/accounts/{account_id}/deposit \
-H "Content-Type: application/json" \
-d '{
"amount": "500.00",
"description": "Salary deposit"
}'curl -X POST http://localhost:8080/api/v1/accounts/{account_id}/withdraw \
-H "Content-Type: application/json" \
-d '{
"amount": "100.00",
"description": "ATM withdrawal"
}'curl -X POST http://localhost:8080/api/v1/accounts/{from_account_id}/transfer \
-H "Content-Type: application/json" \
-d '{
"to_account_id": "destination-account-id",
"amount": "250.00",
"description": "Payment for services"
}'curl -X POST http://localhost:8080/api/v1/accounts/{account_id}/close \
-H "Content-Type: application/json" \
-d '{
"reason": "Account no longer needed"
}'curl "http://localhost:8080/api/v1/accounts?limit=10&offset=0"curl http://localhost:8080/api/v1/accounts/{account_id}curl "http://localhost:8080/api/v1/accounts/{account_id}/transactions?limit=50"curl http://localhost:8080/api/v1/accounts/{account_id}/eventsInstead of storing the current state, we store all events that led to that state. The current state is derived by replaying events.
// Events are immutable facts
event := NewMoneyDeposited(accountID, version, amount, "Salary")
// State is rebuilt by applying events
for _, event := range events {
account.ApplyEvent(event)
}Commands (writes) and Queries (reads) use different models optimized for their purpose.
- Command Model: Aggregates with business logic
- Query Model: Denormalized read projections
Version-based conflict detection prevents concurrent modifications:
-- Only insert if version matches expected
INSERT INTO events (...) WHERE current_version = expected_versionFor aggregates with many events, snapshots cache the state periodically:
// Create snapshot every 100 events
if account.Version() % 100 == 0 {
snapshot := account.ToSnapshot()
store.SaveSnapshot(snapshot)
}| Flag | Environment Variable | Default | Description |
|---|---|---|---|
-http-addr |
HTTP_ADDR |
:8080 |
HTTP server address |
-database-url |
DATABASE_URL |
postgres://localhost:5432/eventsource |
PostgreSQL connection URL |
# Run tests
make test
# Run tests with coverage
make test-coverage
# Lint code
make lint
# Build binary
make buildeventsource/
├── cmd/server/ # Application entry point
├── internal/
│ ├── aggregate/ # Domain aggregates (BankAccount)
│ ├── api/ # REST API handlers
│ ├── command/ # Command definitions and handlers
│ ├── event/ # Domain events
│ ├── projection/ # Read model projections
│ └── store/ # Event store implementation
├── pkg/es/ # Reusable event sourcing primitives
├── migrations/ # Database migrations
└── README.md
MIT