ESC
Type to search guides, tutorials, and reference documentation.
Verified by Garnet Grid

Event Sourcing Implementation

Build event-sourced systems that store every state change as an immutable event. Covers event stores, aggregate reconstruction, snapshots, projections, temporal queries, and the patterns that make event sourcing practical in production.

Event sourcing stores every state change as an immutable event. Instead of overwriting the current state, you append events to a log. The current state is derived by replaying events. This gives you a complete audit trail, temporal queries, and the ability to reconstruct any past state — at the cost of increased complexity.


Event Sourcing vs Traditional State

Traditional (CRUD):
  UPDATE orders SET status = 'shipped' WHERE id = 123;
  
  Problem: Previous state lost. Was it 'pending' or 'processing'?
  
Event Sourcing:
  Event 1: OrderCreated { id: 123, items: [...], total: 99.99 }
  Event 2: PaymentReceived { id: 123, amount: 99.99 }
  Event 3: OrderShipped { id: 123, carrier: "UPS", tracking: "1Z..." }
  
  Current state = replay events 1 → 2 → 3
  Historical state at any point in time available
  Complete audit trail for free

Event Store

class EventStore:
    def __init__(self, db):
        self.db = db
    
    async def append(self, aggregate_id: str, events: list, expected_version: int):
        """Append events with optimistic concurrency."""
        async with self.db.transaction():
            current = await self.db.fetchval(
                "SELECT MAX(version) FROM events WHERE aggregate_id = $1",
                aggregate_id
            )
            
            if current != expected_version:
                raise ConcurrencyError(
                    f"Expected version {expected_version}, got {current}"
                )
            
            for i, event in enumerate(events):
                version = expected_version + i + 1
                await self.db.execute(
                    """INSERT INTO events 
                       (aggregate_id, version, event_type, data, timestamp)
                       VALUES ($1, $2, $3, $4, NOW())""",
                    aggregate_id, version, event.__class__.__name__,
                    event.to_json()
                )
    
    async def get_events(self, aggregate_id: str, after_version: int = 0):
        return await self.db.fetch(
            """SELECT * FROM events 
               WHERE aggregate_id = $1 AND version > $2
               ORDER BY version""",
            aggregate_id, after_version
        )

Aggregate Reconstruction

class OrderAggregate:
    def __init__(self):
        self.id = None
        self.status = None
        self.items = []
        self.total = 0
        self.version = 0
    
    def apply(self, event):
        handler = getattr(self, f"_on_{event.type}", None)
        if handler:
            handler(event.data)
        self.version = event.version
    
    def _on_OrderCreated(self, data):
        self.id = data["id"]
        self.status = "created"
        self.items = data["items"]
        self.total = data["total"]
    
    def _on_PaymentReceived(self, data):
        self.status = "paid"
    
    def _on_OrderShipped(self, data):
        self.status = "shipped"
        self.tracking = data["tracking"]
    
    @classmethod
    async def load(cls, event_store, aggregate_id):
        aggregate = cls()
        events = await event_store.get_events(aggregate_id)
        for event in events:
            aggregate.apply(event)
        return aggregate

Snapshots

class SnapshotStore:
    async def save_snapshot(self, aggregate_id, aggregate, version):
        """Save snapshot every N events for fast reconstruction."""
        await self.db.execute(
            """INSERT INTO snapshots (aggregate_id, version, state)
               VALUES ($1, $2, $3)
               ON CONFLICT (aggregate_id) DO UPDATE 
               SET version = $2, state = $3""",
            aggregate_id, version, aggregate.to_json()
        )
    
    async def load_with_snapshot(self, event_store, aggregate_id):
        snapshot = await self.get_latest_snapshot(aggregate_id)
        aggregate = OrderAggregate.from_snapshot(snapshot) if snapshot else OrderAggregate()
        
        # Replay only events after snapshot
        after_version = snapshot.version if snapshot else 0
        events = await event_store.get_events(aggregate_id, after_version)
        for event in events:
            aggregate.apply(event)
        
        return aggregate

Anti-Patterns

Anti-PatternConsequenceFix
Mutable eventsAudit trail destroyed, replay breaksEvents are immutable, append-only
No snapshotsReplay 1M events = 30s to loadSnapshot every 100-1000 events
Event = state dumpGiant events, no meaningful historyEvents describe what happened, not full state
No versioningCan’t evolve event schemaEvent version field + upcasters
Synchronous projectionsWrite path blocked by read modelAsync projections with eventual consistency

Event sourcing is not for every system. Use it when: audit trail is critical, temporal queries are needed, or the domain is inherently event-modeled. Do not use it for simple CRUD applications.

Jakub Dimitri Rezayev
Jakub Dimitri Rezayev
Founder & Chief Architect • Garnet Grid Consulting

Jakub holds an M.S. in Customer Intelligence & Analytics and a B.S. in Finance & Computer Science from Pace University. With deep expertise spanning D365 F&O, Azure, Power BI, and AI/ML systems, he architects enterprise solutions that bridge legacy systems and modern technology — and has led multi-million dollar ERP implementations for Fortune 500 supply chains.

View Full Profile →