Event Sourcing

Core Concept

advanced
30-40 minutes
event-driven-architectureaudit-trailimmutable-datacqrsdomain-driven-designmicroservices

Storing application state as a sequence of immutable events for auditability and rebuilding state

Event Sourcing

Overview

Event Sourcing is an architectural pattern where application state is stored as a sequence of immutable events rather than just the current state. Instead of updating records in place, each state change is captured as an event and appended to an event store. The current state is derived by replaying these events from the beginning.

This pattern provides complete auditability, enables temporal queries, and allows rebuilding state at any point in time. Companies like Netflix, Uber, and financial institutions use event sourcing for critical systems where data lineage, compliance, and the ability to analyze historical behavior patterns are essential.

The main technical challenges this addresses include:

  • Complete audit trail: Every change is recorded with full context and timing
  • Temporal querying: Ability to see system state at any point in time
  • Event replay: Rebuilding state or creating new projections from historical events
  • Data corruption recovery: Reconstruct state from immutable event log

Core Principles: Events as Source of Truth

Traditional State Storage vs Event Sourcing

Traditional Approach (State-based):

-- Users table stores current state
CREATE TABLE users (
  id INT PRIMARY KEY,
  email VARCHAR(255),
  name VARCHAR(255),
  status ENUM('active', 'suspended', 'deleted'),
  last_login TIMESTAMP,
  updated_at TIMESTAMP
);

-- Update overwrites previous values
UPDATE users 
SET status = 'suspended', updated_at = NOW() 
WHERE id = 123;

-- Previous state is lost forever
-- No way to know: when was user active? why suspended? who made the change?

Event Sourcing Approach:

// Events capture what happened, when, and why
const events = [
  {
    eventId: 'evt_001',
    eventType: 'UserRegistered',
    aggregateId: 'user_123',
    timestamp: '2024-01-15T10:30:00Z',
    data: {
      email: 'john@example.com',
      name: 'John Doe'
    },
    metadata: {
      userId: 'admin_456',
      correlationId: 'reg_789'
    }
  },
  {
    eventId: 'evt_002',
    eventType: 'UserSuspended',
    aggregateId: 'user_123',
    timestamp: '2024-02-01T14:20:00Z',
    data: {
      reason: 'Suspicious activity detected',
      suspendedBy: 'security_system'
    },
    metadata: {
      userId: 'security_bot',
      correlationId: 'alert_321'
    }
  }
];

// Current state is derived by replaying events
function buildUserState(events) {
  let user = null;
  
  for (const event of events) {
    switch (event.eventType) {
      case 'UserRegistered':
        user = {
          id: event.aggregateId,
          email: event.data.email,
          name: event.data.name,
          status: 'active',
          registeredAt: event.timestamp
        };
        break;
        
      case 'UserSuspended':
        user.status = 'suspended';
        user.suspendedAt = event.timestamp;
        user.suspensionReason = event.data.reason;
        break;
    }
  }
  
  return user;
}

Event Store Architecture

System Architecture Diagram

Practical Implementation Patterns

Basic Event Store Implementation

class EventStore {
  constructor(database) {
    this.db = database;
    this.initializeSchema();
  }
  
  async initializeSchema() {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS events (
        id BIGINT PRIMARY KEY AUTO_INCREMENT,
        event_id VARCHAR(255) UNIQUE NOT NULL,
        event_type VARCHAR(255) NOT NULL,
        aggregate_id VARCHAR(255) NOT NULL,
        aggregate_version INT NOT NULL,
        event_data JSON NOT NULL,
        metadata JSON,
        timestamp TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
        INDEX idx_aggregate (aggregate_id, aggregate_version),
        INDEX idx_event_type (event_type),
        INDEX idx_timestamp (timestamp),
        UNIQUE KEY unique_version (aggregate_id, aggregate_version)
      )
    `);
  }
  
  async appendEvents(aggregateId, expectedVersion, events) {
    const transaction = await this.db.beginTransaction();
    
    try {
      // Check current version
      const currentVersion = await this.getCurrentVersion(aggregateId);
      
      if (currentVersion !== expectedVersion) {
        throw new Error(
          `Concurrency conflict: expected version ${expectedVersion}, ` +
          `but current version is ${currentVersion}`
        );
      }
      
      // Append events
      for (let i = 0; i < events.length; i++) {
        const event = events[i];
        const version = expectedVersion + i + 1;
        
        await this.db.query(
          `INSERT INTO events 
           (event_id, event_type, aggregate_id, aggregate_version, event_data, metadata)
           VALUES (?, ?, ?, ?, ?, ?)`,
          [
            event.eventId || this.generateEventId(),
            event.eventType,
            aggregateId,
            version,
            JSON.stringify(event.data),
            JSON.stringify(event.metadata || {})
          ],
          { transaction }
        );
      }
      
      await transaction.commit();
      return expectedVersion + events.length;
    } catch (error) {
      await transaction.rollback();
      throw error;
    }
  }
  
  async getEvents(aggregateId, fromVersion = 0) {
    const rows = await this.db.query(
      `SELECT event_id, event_type, aggregate_version, 
              event_data, metadata, timestamp
       FROM events 
       WHERE aggregate_id = ? AND aggregate_version > ?
       ORDER BY aggregate_version ASC`,
      [aggregateId, fromVersion]
    );
    
    return rows.map(row => ({
      eventId: row.event_id,
      eventType: row.event_type,
      aggregateId,
      version: row.aggregate_version,
      data: JSON.parse(row.event_data),
      metadata: JSON.parse(row.metadata),
      timestamp: row.timestamp
    }));
  }
  
  async getCurrentVersion(aggregateId) {
    const result = await this.db.query(
      `SELECT MAX(aggregate_version) as version 
       FROM events 
       WHERE aggregate_id = ?`,
      [aggregateId]
    );
    
    return result[0]?.version || 0;
  }
  
  async getAllEvents(fromTimestamp = null, eventTypes = null) {
    let query = `
      SELECT event_id, event_type, aggregate_id, aggregate_version,
             event_data, metadata, timestamp
      FROM events
    `;
    
    const params = [];
    const conditions = [];
    
    if (fromTimestamp) {
      conditions.push('timestamp >= ?');
      params.push(fromTimestamp);
    }
    
    if (eventTypes && eventTypes.length > 0) {
      conditions.push(`event_type IN (${eventTypes.map(() => '?').join(',')})`);
      params.push(...eventTypes);
    }
    
    if (conditions.length > 0) {
      query += ' WHERE ' + conditions.join(' AND ');
    }
    
    query += ' ORDER BY id ASC';
    
    const rows = await this.db.query(query, params);
    
    return rows.map(row => ({
      eventId: row.event_id,
      eventType: row.event_type,
      aggregateId: row.aggregate_id,
      version: row.aggregate_version,
      data: JSON.parse(row.event_data),
      metadata: JSON.parse(row.metadata),
      timestamp: row.timestamp
    }));
  }
  
  generateEventId() {
    return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

Domain Aggregate with Event Sourcing

class BankAccount {
  constructor(accountId) {
    this.accountId = accountId;
    this.version = 0;
    this.balance = 0;
    this.status = 'pending';
    this.uncommittedEvents = [];
  }
  
  // Static factory method to create from events
  static fromEvents(accountId, events) {
    const account = new BankAccount(accountId);
    account.replay(events);
    return account;
  }
  
  // Business methods (commands)
  openAccount(initialDeposit, accountHolder) {
    if (this.status !== 'pending') {
      throw new Error('Account already opened');
    }
    
    if (initialDeposit < 0) {
      throw new Error('Initial deposit must be positive');
    }
    
    this.applyEvent({
      eventType: 'AccountOpened',
      data: {
        accountId: this.accountId,
        initialDeposit,
        accountHolder
      }
    });
  }
  
  deposit(amount, description) {
    this.validateActiveAccount();
    
    if (amount <= 0) {
      throw new Error('Deposit amount must be positive');
    }
    
    this.applyEvent({
      eventType: 'MoneyDeposited',
      data: {
        amount,
        description,
        newBalance: this.balance + amount
      }
    });
  }
  
  withdraw(amount, description) {
    this.validateActiveAccount();
    
    if (amount <= 0) {
      throw new Error('Withdrawal amount must be positive');
    }
    
    if (this.balance < amount) {
      throw new Error('Insufficient funds');
    }
    
    this.applyEvent({
      eventType: 'MoneyWithdrawn',
      data: {
        amount,
        description,
        newBalance: this.balance - amount
      }
    });
  }
  
  closeAccount(reason) {
    this.validateActiveAccount();
    
    if (this.balance !== 0) {
      throw new Error('Cannot close account with non-zero balance');
    }
    
    this.applyEvent({
      eventType: 'AccountClosed',
      data: {
        reason,
        closedAt: new Date().toISOString()
      }
    });
  }
  
  // Event application
  applyEvent(event) {
    // Add metadata
    const enrichedEvent = {
      ...event,
      eventId: this.generateEventId(),
      aggregateId: this.accountId,
      timestamp: new Date().toISOString(),
      metadata: {
        ...event.metadata,
        correlationId: event.metadata?.correlationId || this.generateCorrelationId()
      }
    };
    
    // Apply to state
    this.when(enrichedEvent);
    
    // Track for persistence
    this.uncommittedEvents.push(enrichedEvent);
    this.version++;
  }
  
  // State mutations (event handlers)
  when(event) {
    switch (event.eventType) {
      case 'AccountOpened':
        this.balance = event.data.initialDeposit;
        this.status = 'active';
        this.accountHolder = event.data.accountHolder;
        break;
        
      case 'MoneyDeposited':
        this.balance = event.data.newBalance;
        break;
        
      case 'MoneyWithdrawn':
        this.balance = event.data.newBalance;
        break;
        
      case 'AccountClosed':
        this.status = 'closed';
        break;
        
      default:
        console.warn(`Unknown event type: ${event.eventType}`);
    }
  }
  
  // Replay events for reconstruction
  replay(events) {
    for (const event of events) {
      this.when(event);
      this.version++;
    }
    // Clear uncommitted events after replay
    this.uncommittedEvents = [];
  }
  
  // Get uncommitted events for persistence
  getUncommittedEvents() {
    return [...this.uncommittedEvents];
  }
  
  // Mark events as committed
  markEventsAsCommitted() {
    this.uncommittedEvents = [];
  }
  
  validateActiveAccount() {
    if (this.status !== 'active') {
      throw new Error(`Account is ${this.status}, cannot perform operation`);
    }
  }
  
  generateEventId() {
    return `evt_${this.accountId}_${Date.now()}_${Math.random().toString(36).substr(2, 6)}`;
  }
  
  generateCorrelationId() {
    return `corr_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
}

Repository Pattern for Event-Sourced Aggregates

class EventSourcedRepository {
  constructor(eventStore, snapshotStore = null) {
    this.eventStore = eventStore;
    this.snapshotStore = snapshotStore;
    this.snapshotFrequency = 10; // Snapshot every 10 events
  }
  
  async getById(aggregateId, AggregateClass) {
    let fromVersion = 0;
    
    // Try to load from snapshot first
    if (this.snapshotStore) {
      const snapshot = await this.snapshotStore.getSnapshot(aggregateId);
      if (snapshot) {
        const aggregate = AggregateClass.fromSnapshot(snapshot);
        fromVersion = snapshot.version;
        
        // Load events since snapshot
        const events = await this.eventStore.getEvents(aggregateId, fromVersion);
        if (events.length > 0) {
          aggregate.replay(events);
        }
        
        return aggregate;
      }
    }
    
    // Load all events
    const events = await this.eventStore.getEvents(aggregateId, fromVersion);
    
    if (events.length === 0) {
      return null; // Aggregate doesn't exist
    }
    
    const aggregate = AggregateClass.fromEvents(aggregateId, events);
    return aggregate;
  }
  
  async save(aggregate) {
    const uncommittedEvents = aggregate.getUncommittedEvents();
    
    if (uncommittedEvents.length === 0) {
      return; // Nothing to save
    }
    
    const expectedVersion = aggregate.version - uncommittedEvents.length;
    
    try {
      // Append events to store
      const newVersion = await this.eventStore.appendEvents(
        aggregate.accountId,
        expectedVersion,
        uncommittedEvents
      );
      
      // Mark events as committed
      aggregate.markEventsAsCommitted();
      
      // Create snapshot if needed
      if (this.snapshotStore && newVersion % this.snapshotFrequency === 0) {
        await this.createSnapshot(aggregate, newVersion);
      }
      
      // Publish events to external systems
      await this.publishEvents(uncommittedEvents);
      
    } catch (error) {
      throw new Error(`Failed to save aggregate: ${error.message}`);
    }
  }
  
  async createSnapshot(aggregate, version) {
    const snapshot = {
      aggregateId: aggregate.accountId,
      version,
      data: {
        balance: aggregate.balance,
        status: aggregate.status,
        accountHolder: aggregate.accountHolder
      },
      timestamp: new Date().toISOString()
    };
    
    await this.snapshotStore.saveSnapshot(snapshot);
  }
  
  async publishEvents(events) {
    // Publish to event bus for read model updates and external integrations
    for (const event of events) {
      await this.eventBus.publish(event);
    }
  }
}

Event Projection for Read Models

class AccountProjection {
  constructor(database) {
    this.db = database;
    this.initializeSchema();
  }
  
  async initializeSchema() {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS account_summary (
        account_id VARCHAR(255) PRIMARY KEY,
        account_holder VARCHAR(255),
        current_balance DECIMAL(15,2),
        status VARCHAR(50),
        total_deposits DECIMAL(15,2) DEFAULT 0,
        total_withdrawals DECIMAL(15,2) DEFAULT 0,
        transaction_count INT DEFAULT 0,
        opened_at TIMESTAMP,
        last_activity TIMESTAMP,
        version BIGINT DEFAULT 0
      )
    `);
    
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS account_transactions (
        id BIGINT PRIMARY KEY AUTO_INCREMENT,
        account_id VARCHAR(255),
        event_id VARCHAR(255) UNIQUE,
        transaction_type ENUM('deposit', 'withdrawal'),
        amount DECIMAL(15,2),
        description TEXT,
        balance_after DECIMAL(15,2),
        timestamp TIMESTAMP,
        INDEX idx_account_time (account_id, timestamp)
      )
    `);
  }
  
  async handleEvent(event) {
    switch (event.eventType) {
      case 'AccountOpened':
        await this.handleAccountOpened(event);
        break;
      case 'MoneyDeposited':
        await this.handleMoneyDeposited(event);
        break;
      case 'MoneyWithdrawn':
        await this.handleMoneyWithdrawn(event);
        break;
      case 'AccountClosed':
        await this.handleAccountClosed(event);
        break;
    }
  }
  
  async handleAccountOpened(event) {
    await this.db.query(
      `INSERT INTO account_summary 
       (account_id, account_holder, current_balance, status, 
        total_deposits, opened_at, last_activity, version)
       VALUES (?, ?, ?, 'active', ?, ?, ?, ?)`,
      [
        event.aggregateId,
        event.data.accountHolder,
        event.data.initialDeposit,
        event.data.initialDeposit,
        event.timestamp,
        event.timestamp,
        event.version
      ]
    );
    
    if (event.data.initialDeposit > 0) {
      await this.db.query(
        `INSERT INTO account_transactions 
         (account_id, event_id, transaction_type, amount, description, 
          balance_after, timestamp)
         VALUES (?, ?, 'deposit', ?, 'Initial deposit', ?, ?)`,
        [
          event.aggregateId,
          event.eventId,
          event.data.initialDeposit,
          event.data.initialDeposit,
          event.timestamp
        ]
      );
    }
  }
  
  async handleMoneyDeposited(event) {
    await this.db.query(
      `UPDATE account_summary 
       SET current_balance = ?, 
           total_deposits = total_deposits + ?,
           transaction_count = transaction_count + 1,
           last_activity = ?,
           version = ?
       WHERE account_id = ?`,
      [
        event.data.newBalance,
        event.data.amount,
        event.timestamp,
        event.version,
        event.aggregateId
      ]
    );
    
    await this.db.query(
      `INSERT INTO account_transactions 
       (account_id, event_id, transaction_type, amount, description, 
        balance_after, timestamp)
       VALUES (?, ?, 'deposit', ?, ?, ?, ?)`,
      [
        event.aggregateId,
        event.eventId,
        event.data.amount,
        event.data.description,
        event.data.newBalance,
        event.timestamp
      ]
    );
  }
  
  async handleMoneyWithdrawn(event) {
    await this.db.query(
      `UPDATE account_summary 
       SET current_balance = ?, 
           total_withdrawals = total_withdrawals + ?,
           transaction_count = transaction_count + 1,
           last_activity = ?,
           version = ?
       WHERE account_id = ?`,
      [
        event.data.newBalance,
        event.data.amount,
        event.timestamp,
        event.version,
        event.aggregateId
      ]
    );
    
    await this.db.query(
      `INSERT INTO account_transactions 
       (account_id, event_id, transaction_type, amount, description, 
        balance_after, timestamp)
       VALUES (?, ?, 'withdrawal', ?, ?, ?, ?)`,
      [
        event.aggregateId,
        event.eventId,
        event.data.amount,
        event.data.description,
        event.data.newBalance,
        event.timestamp
      ]
    );
  }
  
  async handleAccountClosed(event) {
    await this.db.query(
      `UPDATE account_summary 
       SET status = 'closed', last_activity = ?, version = ?
       WHERE account_id = ?`,
      [event.timestamp, event.version, event.aggregateId]
    );
  }
}

Advanced Patterns and Considerations

Snapshot Optimization

class SnapshotStore {
  constructor(database) {
    this.db = database;
    this.initializeSchema();
  }
  
  async initializeSchema() {
    await this.db.query(`
      CREATE TABLE IF NOT EXISTS snapshots (
        aggregate_id VARCHAR(255),
        aggregate_type VARCHAR(255),
        version BIGINT,
        snapshot_data JSON,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY (aggregate_id, version),
        INDEX idx_latest (aggregate_id, version DESC)
      )
    `);
  }
  
  async saveSnapshot(aggregateId, aggregateType, version, data) {
    await this.db.query(
      `INSERT INTO snapshots 
       (aggregate_id, aggregate_type, version, snapshot_data)
       VALUES (?, ?, ?, ?)
       ON DUPLICATE KEY UPDATE 
       snapshot_data = VALUES(snapshot_data),
       created_at = CURRENT_TIMESTAMP`,
      [aggregateId, aggregateType, version, JSON.stringify(data)]
    );
    
    // Clean old snapshots (keep last 3)
    await this.db.query(
      `DELETE FROM snapshots 
       WHERE aggregate_id = ? AND version < (
         SELECT min_version FROM (
           SELECT version as min_version 
           FROM snapshots 
           WHERE aggregate_id = ? 
           ORDER BY version DESC 
           LIMIT 1 OFFSET 3
         ) as subquery
       )`,
      [aggregateId, aggregateId]
    );
  }
  
  async getLatestSnapshot(aggregateId) {
    const result = await this.db.query(
      `SELECT version, snapshot_data, created_at
       FROM snapshots 
       WHERE aggregate_id = ? 
       ORDER BY version DESC 
       LIMIT 1`,
      [aggregateId]
    );
    
    if (result.length === 0) {
      return null;
    }
    
    return {
      aggregateId,
      version: result[0].version,
      data: JSON.parse(result[0].snapshot_data),
      createdAt: result[0].created_at
    };
  }
}

Event Schema Evolution

class EventUpgrader {
  constructor() {
    this.upgraders = new Map();
    this.setupUpgraders();
  }
  
  setupUpgraders() {
    // V1 to V2: Add correlation ID
    this.upgraders.set('UserRegistered_v1', (event) => ({
      ...event,
      eventType: 'UserRegistered',
      version: 2,
      metadata: {
        ...event.metadata,
        correlationId: event.metadata.correlationId || `corr_${event.timestamp}_${Math.random().toString(36).substr(2, 6)}`
      }
    }));
    
    // V2 to V3: Restructure user data
    this.upgraders.set('UserRegistered_v2', (event) => ({
      ...event,
      version: 3,
      data: {
        userId: event.data.id,
        profile: {
          email: event.data.email,
          name: event.data.name
        },
        preferences: {
          newsletter: event.data.newsletter || false
        }
      }
    }));
  }
  
  upgradeEvent(event) {
    let currentEvent = { ...event };
    const eventVersion = event.version || 1;
    const eventTypeWithVersion = `${event.eventType}_v${eventVersion}`;
    
    // Apply upgrades sequentially
    let version = eventVersion;
    while (this.upgraders.has(`${event.eventType}_v${version}`)) {
      const upgrader = this.upgraders.get(`${event.eventType}_v${version}`);
      currentEvent = upgrader(currentEvent);
      version++;
    }
    
    return currentEvent;
  }
  
  upgradeEventStream(events) {
    return events.map(event => this.upgradeEvent(event));
  }
}

Temporal Queries

class TemporalQueryService {
  constructor(eventStore, eventUpgrader) {
    this.eventStore = eventStore;
    this.eventUpgrader = eventUpgrader;
  }
  
  async getAggregateAtTime(aggregateId, timestamp, AggregateClass) {
    // Get all events up to the specified time
    const allEvents = await this.eventStore.getEvents(aggregateId);
    
    // Filter events up to timestamp
    const eventsUpToTime = allEvents.filter(event => 
      new Date(event.timestamp) <= new Date(timestamp)
    );
    
    if (eventsUpToTime.length === 0) {
      return null;
    }
    
    // Upgrade events for compatibility
    const upgradedEvents = this.eventUpgrader.upgradeEventStream(eventsUpToTime);
    
    // Reconstruct aggregate state
    const aggregate = AggregateClass.fromEvents(aggregateId, upgradedEvents);
    return aggregate;
  }
  
  async getSystemStateAtTime(timestamp) {
    // Get all events up to timestamp across all aggregates
    const allEvents = await this.eventStore.getAllEvents(null, null);
    
    const eventsUpToTime = allEvents.filter(event => 
      new Date(event.timestamp) <= new Date(timestamp)
    );
    
    // Group by aggregate
    const eventsByAggregate = new Map();
    for (const event of eventsUpToTime) {
      if (!eventsByAggregate.has(event.aggregateId)) {
        eventsByAggregate.set(event.aggregateId, []);
      }
      eventsByAggregate.get(event.aggregateId).push(event);
    }
    
    return eventsByAggregate;
  }
  
  async getEventsBetween(startTime, endTime, eventTypes = null) {
    const allEvents = await this.eventStore.getAllEvents(startTime, eventTypes);
    
    return allEvents.filter(event => {
      const eventTime = new Date(event.timestamp);
      return eventTime >= new Date(startTime) && eventTime <= new Date(endTime);
    });
  }
  
  async analyzeEventPatterns(aggregateId, eventType, timeWindow) {
    const events = await this.eventStore.getEvents(aggregateId);
    
    const filteredEvents = events.filter(event => 
      event.eventType === eventType &&
      new Date(event.timestamp) >= new Date(timeWindow.start) &&
      new Date(event.timestamp) <= new Date(timeWindow.end)
    );
    
    // Analyze patterns
    const hourlyDistribution = new Map();
    const dailyTotals = new Map();
    
    for (const event of filteredEvents) {
      const eventDate = new Date(event.timestamp);
      const hour = eventDate.getHours();
      const day = eventDate.toISOString().split('T')[0];
      
      hourlyDistribution.set(hour, (hourlyDistribution.get(hour) || 0) + 1);
      dailyTotals.set(day, (dailyTotals.get(day) || 0) + 1);
    }
    
    return {
      totalEvents: filteredEvents.length,
      hourlyDistribution: Object.fromEntries(hourlyDistribution),
      dailyTotals: Object.fromEntries(dailyTotals),
      averagePerDay: filteredEvents.length / dailyTotals.size
    };
  }
}

Deep Dive Analysis

Event Sourcing vs Traditional Approaches

AspectTraditional (CRUD)Event Sourcing
Data StorageCurrent state onlyComplete event history
AuditabilityLimited (if tracked separately)Complete audit trail
DebuggingDifficult to reproduce issuesFull event replay available
ComplexityLower initial complexityHigher initial complexity
Storage CostLower storage requirementsHigher storage requirements
Query PerformanceFast for current stateRequires projection for queries
Temporal QueriesNot possibleNative support
Schema ChangesCan be destructiveNon-destructive with upcasting

Common Pitfalls and Solutions

1. Event Explosion and Storage Growth

// Problem: Too many fine-grained events
const badEvents = [
  { eventType: 'UserFirstNameChanged', data: { firstName: 'John' }},
  { eventType: 'UserLastNameChanged', data: { lastName: 'Doe' }},
  { eventType: 'UserEmailChanged', data: { email: 'john@example.com' }}
];

// Solution: Aggregate related changes
const goodEvent = {
  eventType: 'UserProfileUpdated',
  data: {
    firstName: 'John',
    lastName: 'Doe',
    email: 'john@example.com'
  }
};

2. Event Ordering and Causality

// Problem: Events processed out of order
class EventProcessor {
  async handleEvent(event) {
    // Ensure events are processed in order for each aggregate
    const lastProcessedVersion = await this.getLastProcessedVersion(event.aggregateId);
    
    if (event.version !== lastProcessedVersion + 1) {
      // Queue for later processing or request missing events
      await this.queueForLaterProcessing(event);
      return;
    }
    
    await this.processEvent(event);
    await this.updateLastProcessedVersion(event.aggregateId, event.version);
  }
}

3. Privacy and GDPR Compliance

// Solution: Crypto-shredding for data deletion
class GDPRCompliantEventStore {
  async storeEvent(event) {
    // Encrypt PII with user-specific key
    if (this.containsPII(event)) {
      const userKey = await this.getUserEncryptionKey(event.userId);
      event.data = await this.encrypt(event.data, userKey);
    }
    
    await this.eventStore.append(event);
  }
  
  async deleteUserData(userId) {
    // Delete encryption key to make data unreadable
    await this.deleteUserEncryptionKey(userId);
    
    // Optionally add tombstone event
    await this.eventStore.append({
      eventType: 'UserDataDeleted',
      aggregateId: userId,
      data: { deletedAt: new Date().toISOString() }
    });
  }
}

Interview-Focused Content

Junior Level (2-4 YOE)

Q: What is Event Sourcing and how is it different from traditional database storage? A: Event Sourcing stores data as a sequence of immutable events rather than just the current state. Instead of updating records, each change is captured as an event and appended to an event store. The current state is derived by replaying these events. This provides complete audit trail and ability to see system state at any point in time.

Q: What are the main benefits of Event Sourcing? A: Key benefits include:

  • Complete audit trail: Every change is recorded with context
  • Temporal queries: Can see state at any point in time
  • Debugging: Can replay events to reproduce issues
  • Data recovery: Can rebuild state from immutable events
  • Analytics: Rich historical data for business intelligence

Q: What is an event and how is it different from a command? A: An event represents something that has already happened (past tense) like "UserRegistered" or "OrderPlaced". A command represents an intent to do something (imperative) like "RegisterUser" or "PlaceOrder". Commands can be rejected, but events are facts that have already occurred.

Senior Level (5-8 YOE)

Q: How do you handle event schema evolution in Event Sourcing? A: Strategies include:

  • Versioning: Include version numbers in events
  • Upcasting: Transform old events to new format when reading
  • Additive changes: Only add fields, don't remove or change existing ones
  • Event transformation: Convert events during replay
  • Weak schema: Use flexible formats like JSON

Q: What are the challenges of implementing Event Sourcing and how do you address them? A: Key challenges:

  • Complexity: Higher development overhead - mitigate with good frameworks
  • Performance: Slow query performance - use CQRS with read models
  • Storage growth: Large event stores - implement snapshotting
  • Event ordering: Out-of-order processing - use version numbers and queuing
  • Privacy compliance: GDPR issues - use encryption and crypto-shredding

Q: How do you implement snapshotting in Event Sourcing? A: Snapshotting stores periodic state snapshots to avoid replaying all events:

  • Frequency: Snapshot every N events (e.g., every 10-100 events)
  • Storage: Store snapshots separately from events
  • Loading: Load latest snapshot + subsequent events
  • Cleanup: Keep few recent snapshots, delete old ones
  • Consistency: Ensure snapshot represents exact state at that event version

Staff+ Level (8+ YOE)

Q: Design an Event Sourcing system for a global e-commerce platform. How do you handle consistency, performance, and compliance? A: Design considerations:

  • Partitioning: Partition events by aggregate ID for scalability
  • Regional deployment: Event stores in multiple regions with async replication
  • CQRS integration: Separate read models optimized for different query patterns
  • Saga patterns: Handle distributed transactions across aggregates
  • Compliance: Implement crypto-shredding for GDPR, audit logs for SOX
  • Performance: Use snapshotting, caching, and event compression

Q: How would you migrate an existing CRUD application to Event Sourcing? A: Migration strategy:

  • Strangler fig pattern: Gradually replace components
  • Event capture: Start capturing events for new changes
  • Historical events: Generate events from current state and change logs
  • Dual writes: Write to both systems during transition
  • Read model migration: Build read models from events
  • Validation: Ensure event replay produces correct state

Q: What are the implications of Event Sourcing for system observability and debugging? A: Observability benefits:

  • Perfect audit trail: Every change is recorded with full context
  • Reproducible debugging: Can replay exact sequence of events
  • Business analytics: Rich historical data for insights
  • Compliance reporting: Automated audit trails
  • System behavior analysis: Understand patterns and trends
  • Root cause analysis: Trace events leading to issues

Further Reading

Related Concepts

cqrs
event-driven-architecture
saga-pattern
domain-driven-design