Event Sourcing Implementation
Build event-sourced systems that store every state change as an immutable event. Covers event stores, aggregate reconstruction, snapshots, projections, temporal queries, and the patterns that make event sourcing practical in production.
Event sourcing stores every state change as an immutable event. Instead of overwriting the current state, you append events to a log. The current state is derived by replaying events. This gives you a complete audit trail, temporal queries, and the ability to reconstruct any past state — at the cost of increased complexity.
Event Sourcing vs Traditional State
Traditional (CRUD):
UPDATE orders SET status = 'shipped' WHERE id = 123;
Problem: Previous state lost. Was it 'pending' or 'processing'?
Event Sourcing:
Event 1: OrderCreated { id: 123, items: [...], total: 99.99 }
Event 2: PaymentReceived { id: 123, amount: 99.99 }
Event 3: OrderShipped { id: 123, carrier: "UPS", tracking: "1Z..." }
Current state = replay events 1 → 2 → 3
Historical state at any point in time available
Complete audit trail for free
Event Store
class EventStore:
def __init__(self, db):
self.db = db
async def append(self, aggregate_id: str, events: list, expected_version: int):
"""Append events with optimistic concurrency."""
async with self.db.transaction():
current = await self.db.fetchval(
"SELECT MAX(version) FROM events WHERE aggregate_id = $1",
aggregate_id
)
if current != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current}"
)
for i, event in enumerate(events):
version = expected_version + i + 1
await self.db.execute(
"""INSERT INTO events
(aggregate_id, version, event_type, data, timestamp)
VALUES ($1, $2, $3, $4, NOW())""",
aggregate_id, version, event.__class__.__name__,
event.to_json()
)
async def get_events(self, aggregate_id: str, after_version: int = 0):
return await self.db.fetch(
"""SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version""",
aggregate_id, after_version
)
Aggregate Reconstruction
class OrderAggregate:
def __init__(self):
self.id = None
self.status = None
self.items = []
self.total = 0
self.version = 0
def apply(self, event):
handler = getattr(self, f"_on_{event.type}", None)
if handler:
handler(event.data)
self.version = event.version
def _on_OrderCreated(self, data):
self.id = data["id"]
self.status = "created"
self.items = data["items"]
self.total = data["total"]
def _on_PaymentReceived(self, data):
self.status = "paid"
def _on_OrderShipped(self, data):
self.status = "shipped"
self.tracking = data["tracking"]
@classmethod
async def load(cls, event_store, aggregate_id):
aggregate = cls()
events = await event_store.get_events(aggregate_id)
for event in events:
aggregate.apply(event)
return aggregate
Snapshots
class SnapshotStore:
async def save_snapshot(self, aggregate_id, aggregate, version):
"""Save snapshot every N events for fast reconstruction."""
await self.db.execute(
"""INSERT INTO snapshots (aggregate_id, version, state)
VALUES ($1, $2, $3)
ON CONFLICT (aggregate_id) DO UPDATE
SET version = $2, state = $3""",
aggregate_id, version, aggregate.to_json()
)
async def load_with_snapshot(self, event_store, aggregate_id):
snapshot = await self.get_latest_snapshot(aggregate_id)
aggregate = OrderAggregate.from_snapshot(snapshot) if snapshot else OrderAggregate()
# Replay only events after snapshot
after_version = snapshot.version if snapshot else 0
events = await event_store.get_events(aggregate_id, after_version)
for event in events:
aggregate.apply(event)
return aggregate
Anti-Patterns
| Anti-Pattern | Consequence | Fix |
|---|---|---|
| Mutable events | Audit trail destroyed, replay breaks | Events are immutable, append-only |
| No snapshots | Replay 1M events = 30s to load | Snapshot every 100-1000 events |
| Event = state dump | Giant events, no meaningful history | Events describe what happened, not full state |
| No versioning | Can’t evolve event schema | Event version field + upcasters |
| Synchronous projections | Write path blocked by read model | Async projections with eventual consistency |
Event sourcing is not for every system. Use it when: audit trail is critical, temporal queries are needed, or the domain is inherently event-modeled. Do not use it for simple CRUD applications.