Event Sourcing
Core Concept
Storing application state as a sequence of immutable events for auditability and rebuilding state
Event Sourcing
Overview
Event Sourcing is an architectural pattern where application state is stored as a sequence of immutable events rather than just the current state. Instead of updating records in place, each state change is captured as an event and appended to an event store. The current state is derived by replaying these events from the beginning.
This pattern provides complete auditability, enables temporal queries, and allows rebuilding state at any point in time. Companies like Netflix, Uber, and financial institutions use event sourcing for critical systems where data lineage, compliance, and the ability to analyze historical behavior patterns are essential.
The main technical challenges this addresses include:
- Complete audit trail: Every change is recorded with full context and timing
- Temporal querying: Ability to see system state at any point in time
- Event replay: Rebuilding state or creating new projections from historical events
- Data corruption recovery: Reconstruct state from immutable event log
Core Principles: Events as Source of Truth
Traditional State Storage vs Event Sourcing
Traditional Approach (State-based):
-- Users table stores current state
CREATE TABLE users (
id INT PRIMARY KEY,
email VARCHAR(255),
name VARCHAR(255),
status ENUM('active', 'suspended', 'deleted'),
last_login TIMESTAMP,
updated_at TIMESTAMP
);
-- Update overwrites previous values
UPDATE users
SET status = 'suspended', updated_at = NOW()
WHERE id = 123;
-- Previous state is lost forever
-- No way to know: when was user active? why suspended? who made the change?
Event Sourcing Approach:
// Events capture what happened, when, and why
const events = [
{
eventId: 'evt_001',
eventType: 'UserRegistered',
aggregateId: 'user_123',
timestamp: '2024-01-15T10:30:00Z',
data: {
email: 'john@example.com',
name: 'John Doe'
},
metadata: {
userId: 'admin_456',
correlationId: 'reg_789'
}
},
{
eventId: 'evt_002',
eventType: 'UserSuspended',
aggregateId: 'user_123',
timestamp: '2024-02-01T14:20:00Z',
data: {
reason: 'Suspicious activity detected',
suspendedBy: 'security_system'
},
metadata: {
userId: 'security_bot',
correlationId: 'alert_321'
}
}
];
// Current state is derived by replaying events
function buildUserState(events) {
let user = null;
for (const event of events) {
switch (event.eventType) {
case 'UserRegistered':
user = {
id: event.aggregateId,
email: event.data.email,
name: event.data.name,
status: 'active',
registeredAt: event.timestamp
};
break;
case 'UserSuspended':
user.status = 'suspended';
user.suspendedAt = event.timestamp;
user.suspensionReason = event.data.reason;
break;
}
}
return user;
}
Event Store Architecture
System Architecture Diagram
Practical Implementation Patterns
Basic Event Store Implementation
class EventStore {
constructor(database) {
this.db = database;
this.initializeSchema();
}
async initializeSchema() {
await this.db.query(`
CREATE TABLE IF NOT EXISTS events (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
event_id VARCHAR(255) UNIQUE NOT NULL,
event_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_version INT NOT NULL,
event_data JSON NOT NULL,
metadata JSON,
timestamp TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
INDEX idx_aggregate (aggregate_id, aggregate_version),
INDEX idx_event_type (event_type),
INDEX idx_timestamp (timestamp),
UNIQUE KEY unique_version (aggregate_id, aggregate_version)
)
`);
}
async appendEvents(aggregateId, expectedVersion, events) {
const transaction = await this.db.beginTransaction();
try {
// Check current version
const currentVersion = await this.getCurrentVersion(aggregateId);
if (currentVersion !== expectedVersion) {
throw new Error(
`Concurrency conflict: expected version ${expectedVersion}, ` +
`but current version is ${currentVersion}`
);
}
// Append events
for (let i = 0; i < events.length; i++) {
const event = events[i];
const version = expectedVersion + i + 1;
await this.db.query(
`INSERT INTO events
(event_id, event_type, aggregate_id, aggregate_version, event_data, metadata)
VALUES (?, ?, ?, ?, ?, ?)`,
[
event.eventId || this.generateEventId(),
event.eventType,
aggregateId,
version,
JSON.stringify(event.data),
JSON.stringify(event.metadata || {})
],
{ transaction }
);
}
await transaction.commit();
return expectedVersion + events.length;
} catch (error) {
await transaction.rollback();
throw error;
}
}
async getEvents(aggregateId, fromVersion = 0) {
const rows = await this.db.query(
`SELECT event_id, event_type, aggregate_version,
event_data, metadata, timestamp
FROM events
WHERE aggregate_id = ? AND aggregate_version > ?
ORDER BY aggregate_version ASC`,
[aggregateId, fromVersion]
);
return rows.map(row => ({
eventId: row.event_id,
eventType: row.event_type,
aggregateId,
version: row.aggregate_version,
data: JSON.parse(row.event_data),
metadata: JSON.parse(row.metadata),
timestamp: row.timestamp
}));
}
async getCurrentVersion(aggregateId) {
const result = await this.db.query(
`SELECT MAX(aggregate_version) as version
FROM events
WHERE aggregate_id = ?`,
[aggregateId]
);
return result[0]?.version || 0;
}
async getAllEvents(fromTimestamp = null, eventTypes = null) {
let query = `
SELECT event_id, event_type, aggregate_id, aggregate_version,
event_data, metadata, timestamp
FROM events
`;
const params = [];
const conditions = [];
if (fromTimestamp) {
conditions.push('timestamp >= ?');
params.push(fromTimestamp);
}
if (eventTypes && eventTypes.length > 0) {
conditions.push(`event_type IN (${eventTypes.map(() => '?').join(',')})`);
params.push(...eventTypes);
}
if (conditions.length > 0) {
query += ' WHERE ' + conditions.join(' AND ');
}
query += ' ORDER BY id ASC';
const rows = await this.db.query(query, params);
return rows.map(row => ({
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
version: row.aggregate_version,
data: JSON.parse(row.event_data),
metadata: JSON.parse(row.metadata),
timestamp: row.timestamp
}));
}
generateEventId() {
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
Domain Aggregate with Event Sourcing
class BankAccount {
constructor(accountId) {
this.accountId = accountId;
this.version = 0;
this.balance = 0;
this.status = 'pending';
this.uncommittedEvents = [];
}
// Static factory method to create from events
static fromEvents(accountId, events) {
const account = new BankAccount(accountId);
account.replay(events);
return account;
}
// Business methods (commands)
openAccount(initialDeposit, accountHolder) {
if (this.status !== 'pending') {
throw new Error('Account already opened');
}
if (initialDeposit < 0) {
throw new Error('Initial deposit must be positive');
}
this.applyEvent({
eventType: 'AccountOpened',
data: {
accountId: this.accountId,
initialDeposit,
accountHolder
}
});
}
deposit(amount, description) {
this.validateActiveAccount();
if (amount <= 0) {
throw new Error('Deposit amount must be positive');
}
this.applyEvent({
eventType: 'MoneyDeposited',
data: {
amount,
description,
newBalance: this.balance + amount
}
});
}
withdraw(amount, description) {
this.validateActiveAccount();
if (amount <= 0) {
throw new Error('Withdrawal amount must be positive');
}
if (this.balance < amount) {
throw new Error('Insufficient funds');
}
this.applyEvent({
eventType: 'MoneyWithdrawn',
data: {
amount,
description,
newBalance: this.balance - amount
}
});
}
closeAccount(reason) {
this.validateActiveAccount();
if (this.balance !== 0) {
throw new Error('Cannot close account with non-zero balance');
}
this.applyEvent({
eventType: 'AccountClosed',
data: {
reason,
closedAt: new Date().toISOString()
}
});
}
// Event application
applyEvent(event) {
// Add metadata
const enrichedEvent = {
...event,
eventId: this.generateEventId(),
aggregateId: this.accountId,
timestamp: new Date().toISOString(),
metadata: {
...event.metadata,
correlationId: event.metadata?.correlationId || this.generateCorrelationId()
}
};
// Apply to state
this.when(enrichedEvent);
// Track for persistence
this.uncommittedEvents.push(enrichedEvent);
this.version++;
}
// State mutations (event handlers)
when(event) {
switch (event.eventType) {
case 'AccountOpened':
this.balance = event.data.initialDeposit;
this.status = 'active';
this.accountHolder = event.data.accountHolder;
break;
case 'MoneyDeposited':
this.balance = event.data.newBalance;
break;
case 'MoneyWithdrawn':
this.balance = event.data.newBalance;
break;
case 'AccountClosed':
this.status = 'closed';
break;
default:
console.warn(`Unknown event type: ${event.eventType}`);
}
}
// Replay events for reconstruction
replay(events) {
for (const event of events) {
this.when(event);
this.version++;
}
// Clear uncommitted events after replay
this.uncommittedEvents = [];
}
// Get uncommitted events for persistence
getUncommittedEvents() {
return [...this.uncommittedEvents];
}
// Mark events as committed
markEventsAsCommitted() {
this.uncommittedEvents = [];
}
validateActiveAccount() {
if (this.status !== 'active') {
throw new Error(`Account is ${this.status}, cannot perform operation`);
}
}
generateEventId() {
return `evt_${this.accountId}_${Date.now()}_${Math.random().toString(36).substr(2, 6)}`;
}
generateCorrelationId() {
return `corr_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
Repository Pattern for Event-Sourced Aggregates
class EventSourcedRepository {
constructor(eventStore, snapshotStore = null) {
this.eventStore = eventStore;
this.snapshotStore = snapshotStore;
this.snapshotFrequency = 10; // Snapshot every 10 events
}
async getById(aggregateId, AggregateClass) {
let fromVersion = 0;
// Try to load from snapshot first
if (this.snapshotStore) {
const snapshot = await this.snapshotStore.getSnapshot(aggregateId);
if (snapshot) {
const aggregate = AggregateClass.fromSnapshot(snapshot);
fromVersion = snapshot.version;
// Load events since snapshot
const events = await this.eventStore.getEvents(aggregateId, fromVersion);
if (events.length > 0) {
aggregate.replay(events);
}
return aggregate;
}
}
// Load all events
const events = await this.eventStore.getEvents(aggregateId, fromVersion);
if (events.length === 0) {
return null; // Aggregate doesn't exist
}
const aggregate = AggregateClass.fromEvents(aggregateId, events);
return aggregate;
}
async save(aggregate) {
const uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.length === 0) {
return; // Nothing to save
}
const expectedVersion = aggregate.version - uncommittedEvents.length;
try {
// Append events to store
const newVersion = await this.eventStore.appendEvents(
aggregate.accountId,
expectedVersion,
uncommittedEvents
);
// Mark events as committed
aggregate.markEventsAsCommitted();
// Create snapshot if needed
if (this.snapshotStore && newVersion % this.snapshotFrequency === 0) {
await this.createSnapshot(aggregate, newVersion);
}
// Publish events to external systems
await this.publishEvents(uncommittedEvents);
} catch (error) {
throw new Error(`Failed to save aggregate: ${error.message}`);
}
}
async createSnapshot(aggregate, version) {
const snapshot = {
aggregateId: aggregate.accountId,
version,
data: {
balance: aggregate.balance,
status: aggregate.status,
accountHolder: aggregate.accountHolder
},
timestamp: new Date().toISOString()
};
await this.snapshotStore.saveSnapshot(snapshot);
}
async publishEvents(events) {
// Publish to event bus for read model updates and external integrations
for (const event of events) {
await this.eventBus.publish(event);
}
}
}
Event Projection for Read Models
class AccountProjection {
constructor(database) {
this.db = database;
this.initializeSchema();
}
async initializeSchema() {
await this.db.query(`
CREATE TABLE IF NOT EXISTS account_summary (
account_id VARCHAR(255) PRIMARY KEY,
account_holder VARCHAR(255),
current_balance DECIMAL(15,2),
status VARCHAR(50),
total_deposits DECIMAL(15,2) DEFAULT 0,
total_withdrawals DECIMAL(15,2) DEFAULT 0,
transaction_count INT DEFAULT 0,
opened_at TIMESTAMP,
last_activity TIMESTAMP,
version BIGINT DEFAULT 0
)
`);
await this.db.query(`
CREATE TABLE IF NOT EXISTS account_transactions (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
account_id VARCHAR(255),
event_id VARCHAR(255) UNIQUE,
transaction_type ENUM('deposit', 'withdrawal'),
amount DECIMAL(15,2),
description TEXT,
balance_after DECIMAL(15,2),
timestamp TIMESTAMP,
INDEX idx_account_time (account_id, timestamp)
)
`);
}
async handleEvent(event) {
switch (event.eventType) {
case 'AccountOpened':
await this.handleAccountOpened(event);
break;
case 'MoneyDeposited':
await this.handleMoneyDeposited(event);
break;
case 'MoneyWithdrawn':
await this.handleMoneyWithdrawn(event);
break;
case 'AccountClosed':
await this.handleAccountClosed(event);
break;
}
}
async handleAccountOpened(event) {
await this.db.query(
`INSERT INTO account_summary
(account_id, account_holder, current_balance, status,
total_deposits, opened_at, last_activity, version)
VALUES (?, ?, ?, 'active', ?, ?, ?, ?)`,
[
event.aggregateId,
event.data.accountHolder,
event.data.initialDeposit,
event.data.initialDeposit,
event.timestamp,
event.timestamp,
event.version
]
);
if (event.data.initialDeposit > 0) {
await this.db.query(
`INSERT INTO account_transactions
(account_id, event_id, transaction_type, amount, description,
balance_after, timestamp)
VALUES (?, ?, 'deposit', ?, 'Initial deposit', ?, ?)`,
[
event.aggregateId,
event.eventId,
event.data.initialDeposit,
event.data.initialDeposit,
event.timestamp
]
);
}
}
async handleMoneyDeposited(event) {
await this.db.query(
`UPDATE account_summary
SET current_balance = ?,
total_deposits = total_deposits + ?,
transaction_count = transaction_count + 1,
last_activity = ?,
version = ?
WHERE account_id = ?`,
[
event.data.newBalance,
event.data.amount,
event.timestamp,
event.version,
event.aggregateId
]
);
await this.db.query(
`INSERT INTO account_transactions
(account_id, event_id, transaction_type, amount, description,
balance_after, timestamp)
VALUES (?, ?, 'deposit', ?, ?, ?, ?)`,
[
event.aggregateId,
event.eventId,
event.data.amount,
event.data.description,
event.data.newBalance,
event.timestamp
]
);
}
async handleMoneyWithdrawn(event) {
await this.db.query(
`UPDATE account_summary
SET current_balance = ?,
total_withdrawals = total_withdrawals + ?,
transaction_count = transaction_count + 1,
last_activity = ?,
version = ?
WHERE account_id = ?`,
[
event.data.newBalance,
event.data.amount,
event.timestamp,
event.version,
event.aggregateId
]
);
await this.db.query(
`INSERT INTO account_transactions
(account_id, event_id, transaction_type, amount, description,
balance_after, timestamp)
VALUES (?, ?, 'withdrawal', ?, ?, ?, ?)`,
[
event.aggregateId,
event.eventId,
event.data.amount,
event.data.description,
event.data.newBalance,
event.timestamp
]
);
}
async handleAccountClosed(event) {
await this.db.query(
`UPDATE account_summary
SET status = 'closed', last_activity = ?, version = ?
WHERE account_id = ?`,
[event.timestamp, event.version, event.aggregateId]
);
}
}
Advanced Patterns and Considerations
Snapshot Optimization
class SnapshotStore {
constructor(database) {
this.db = database;
this.initializeSchema();
}
async initializeSchema() {
await this.db.query(`
CREATE TABLE IF NOT EXISTS snapshots (
aggregate_id VARCHAR(255),
aggregate_type VARCHAR(255),
version BIGINT,
snapshot_data JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (aggregate_id, version),
INDEX idx_latest (aggregate_id, version DESC)
)
`);
}
async saveSnapshot(aggregateId, aggregateType, version, data) {
await this.db.query(
`INSERT INTO snapshots
(aggregate_id, aggregate_type, version, snapshot_data)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
snapshot_data = VALUES(snapshot_data),
created_at = CURRENT_TIMESTAMP`,
[aggregateId, aggregateType, version, JSON.stringify(data)]
);
// Clean old snapshots (keep last 3)
await this.db.query(
`DELETE FROM snapshots
WHERE aggregate_id = ? AND version < (
SELECT min_version FROM (
SELECT version as min_version
FROM snapshots
WHERE aggregate_id = ?
ORDER BY version DESC
LIMIT 1 OFFSET 3
) as subquery
)`,
[aggregateId, aggregateId]
);
}
async getLatestSnapshot(aggregateId) {
const result = await this.db.query(
`SELECT version, snapshot_data, created_at
FROM snapshots
WHERE aggregate_id = ?
ORDER BY version DESC
LIMIT 1`,
[aggregateId]
);
if (result.length === 0) {
return null;
}
return {
aggregateId,
version: result[0].version,
data: JSON.parse(result[0].snapshot_data),
createdAt: result[0].created_at
};
}
}
Event Schema Evolution
class EventUpgrader {
constructor() {
this.upgraders = new Map();
this.setupUpgraders();
}
setupUpgraders() {
// V1 to V2: Add correlation ID
this.upgraders.set('UserRegistered_v1', (event) => ({
...event,
eventType: 'UserRegistered',
version: 2,
metadata: {
...event.metadata,
correlationId: event.metadata.correlationId || `corr_${event.timestamp}_${Math.random().toString(36).substr(2, 6)}`
}
}));
// V2 to V3: Restructure user data
this.upgraders.set('UserRegistered_v2', (event) => ({
...event,
version: 3,
data: {
userId: event.data.id,
profile: {
email: event.data.email,
name: event.data.name
},
preferences: {
newsletter: event.data.newsletter || false
}
}
}));
}
upgradeEvent(event) {
let currentEvent = { ...event };
const eventVersion = event.version || 1;
const eventTypeWithVersion = `${event.eventType}_v${eventVersion}`;
// Apply upgrades sequentially
let version = eventVersion;
while (this.upgraders.has(`${event.eventType}_v${version}`)) {
const upgrader = this.upgraders.get(`${event.eventType}_v${version}`);
currentEvent = upgrader(currentEvent);
version++;
}
return currentEvent;
}
upgradeEventStream(events) {
return events.map(event => this.upgradeEvent(event));
}
}
Temporal Queries
class TemporalQueryService {
constructor(eventStore, eventUpgrader) {
this.eventStore = eventStore;
this.eventUpgrader = eventUpgrader;
}
async getAggregateAtTime(aggregateId, timestamp, AggregateClass) {
// Get all events up to the specified time
const allEvents = await this.eventStore.getEvents(aggregateId);
// Filter events up to timestamp
const eventsUpToTime = allEvents.filter(event =>
new Date(event.timestamp) <= new Date(timestamp)
);
if (eventsUpToTime.length === 0) {
return null;
}
// Upgrade events for compatibility
const upgradedEvents = this.eventUpgrader.upgradeEventStream(eventsUpToTime);
// Reconstruct aggregate state
const aggregate = AggregateClass.fromEvents(aggregateId, upgradedEvents);
return aggregate;
}
async getSystemStateAtTime(timestamp) {
// Get all events up to timestamp across all aggregates
const allEvents = await this.eventStore.getAllEvents(null, null);
const eventsUpToTime = allEvents.filter(event =>
new Date(event.timestamp) <= new Date(timestamp)
);
// Group by aggregate
const eventsByAggregate = new Map();
for (const event of eventsUpToTime) {
if (!eventsByAggregate.has(event.aggregateId)) {
eventsByAggregate.set(event.aggregateId, []);
}
eventsByAggregate.get(event.aggregateId).push(event);
}
return eventsByAggregate;
}
async getEventsBetween(startTime, endTime, eventTypes = null) {
const allEvents = await this.eventStore.getAllEvents(startTime, eventTypes);
return allEvents.filter(event => {
const eventTime = new Date(event.timestamp);
return eventTime >= new Date(startTime) && eventTime <= new Date(endTime);
});
}
async analyzeEventPatterns(aggregateId, eventType, timeWindow) {
const events = await this.eventStore.getEvents(aggregateId);
const filteredEvents = events.filter(event =>
event.eventType === eventType &&
new Date(event.timestamp) >= new Date(timeWindow.start) &&
new Date(event.timestamp) <= new Date(timeWindow.end)
);
// Analyze patterns
const hourlyDistribution = new Map();
const dailyTotals = new Map();
for (const event of filteredEvents) {
const eventDate = new Date(event.timestamp);
const hour = eventDate.getHours();
const day = eventDate.toISOString().split('T')[0];
hourlyDistribution.set(hour, (hourlyDistribution.get(hour) || 0) + 1);
dailyTotals.set(day, (dailyTotals.get(day) || 0) + 1);
}
return {
totalEvents: filteredEvents.length,
hourlyDistribution: Object.fromEntries(hourlyDistribution),
dailyTotals: Object.fromEntries(dailyTotals),
averagePerDay: filteredEvents.length / dailyTotals.size
};
}
}
Deep Dive Analysis
Event Sourcing vs Traditional Approaches
Aspect | Traditional (CRUD) | Event Sourcing |
---|---|---|
Data Storage | Current state only | Complete event history |
Auditability | Limited (if tracked separately) | Complete audit trail |
Debugging | Difficult to reproduce issues | Full event replay available |
Complexity | Lower initial complexity | Higher initial complexity |
Storage Cost | Lower storage requirements | Higher storage requirements |
Query Performance | Fast for current state | Requires projection for queries |
Temporal Queries | Not possible | Native support |
Schema Changes | Can be destructive | Non-destructive with upcasting |
Common Pitfalls and Solutions
1. Event Explosion and Storage Growth
// Problem: Too many fine-grained events
const badEvents = [
{ eventType: 'UserFirstNameChanged', data: { firstName: 'John' }},
{ eventType: 'UserLastNameChanged', data: { lastName: 'Doe' }},
{ eventType: 'UserEmailChanged', data: { email: 'john@example.com' }}
];
// Solution: Aggregate related changes
const goodEvent = {
eventType: 'UserProfileUpdated',
data: {
firstName: 'John',
lastName: 'Doe',
email: 'john@example.com'
}
};
2. Event Ordering and Causality
// Problem: Events processed out of order
class EventProcessor {
async handleEvent(event) {
// Ensure events are processed in order for each aggregate
const lastProcessedVersion = await this.getLastProcessedVersion(event.aggregateId);
if (event.version !== lastProcessedVersion + 1) {
// Queue for later processing or request missing events
await this.queueForLaterProcessing(event);
return;
}
await this.processEvent(event);
await this.updateLastProcessedVersion(event.aggregateId, event.version);
}
}
3. Privacy and GDPR Compliance
// Solution: Crypto-shredding for data deletion
class GDPRCompliantEventStore {
async storeEvent(event) {
// Encrypt PII with user-specific key
if (this.containsPII(event)) {
const userKey = await this.getUserEncryptionKey(event.userId);
event.data = await this.encrypt(event.data, userKey);
}
await this.eventStore.append(event);
}
async deleteUserData(userId) {
// Delete encryption key to make data unreadable
await this.deleteUserEncryptionKey(userId);
// Optionally add tombstone event
await this.eventStore.append({
eventType: 'UserDataDeleted',
aggregateId: userId,
data: { deletedAt: new Date().toISOString() }
});
}
}
Interview-Focused Content
Junior Level (2-4 YOE)
Q: What is Event Sourcing and how is it different from traditional database storage? A: Event Sourcing stores data as a sequence of immutable events rather than just the current state. Instead of updating records, each change is captured as an event and appended to an event store. The current state is derived by replaying these events. This provides complete audit trail and ability to see system state at any point in time.
Q: What are the main benefits of Event Sourcing? A: Key benefits include:
- Complete audit trail: Every change is recorded with context
- Temporal queries: Can see state at any point in time
- Debugging: Can replay events to reproduce issues
- Data recovery: Can rebuild state from immutable events
- Analytics: Rich historical data for business intelligence
Q: What is an event and how is it different from a command? A: An event represents something that has already happened (past tense) like "UserRegistered" or "OrderPlaced". A command represents an intent to do something (imperative) like "RegisterUser" or "PlaceOrder". Commands can be rejected, but events are facts that have already occurred.
Senior Level (5-8 YOE)
Q: How do you handle event schema evolution in Event Sourcing? A: Strategies include:
- Versioning: Include version numbers in events
- Upcasting: Transform old events to new format when reading
- Additive changes: Only add fields, don't remove or change existing ones
- Event transformation: Convert events during replay
- Weak schema: Use flexible formats like JSON
Q: What are the challenges of implementing Event Sourcing and how do you address them? A: Key challenges:
- Complexity: Higher development overhead - mitigate with good frameworks
- Performance: Slow query performance - use CQRS with read models
- Storage growth: Large event stores - implement snapshotting
- Event ordering: Out-of-order processing - use version numbers and queuing
- Privacy compliance: GDPR issues - use encryption and crypto-shredding
Q: How do you implement snapshotting in Event Sourcing? A: Snapshotting stores periodic state snapshots to avoid replaying all events:
- Frequency: Snapshot every N events (e.g., every 10-100 events)
- Storage: Store snapshots separately from events
- Loading: Load latest snapshot + subsequent events
- Cleanup: Keep few recent snapshots, delete old ones
- Consistency: Ensure snapshot represents exact state at that event version
Staff+ Level (8+ YOE)
Q: Design an Event Sourcing system for a global e-commerce platform. How do you handle consistency, performance, and compliance? A: Design considerations:
- Partitioning: Partition events by aggregate ID for scalability
- Regional deployment: Event stores in multiple regions with async replication
- CQRS integration: Separate read models optimized for different query patterns
- Saga patterns: Handle distributed transactions across aggregates
- Compliance: Implement crypto-shredding for GDPR, audit logs for SOX
- Performance: Use snapshotting, caching, and event compression
Q: How would you migrate an existing CRUD application to Event Sourcing? A: Migration strategy:
- Strangler fig pattern: Gradually replace components
- Event capture: Start capturing events for new changes
- Historical events: Generate events from current state and change logs
- Dual writes: Write to both systems during transition
- Read model migration: Build read models from events
- Validation: Ensure event replay produces correct state
Q: What are the implications of Event Sourcing for system observability and debugging? A: Observability benefits:
- Perfect audit trail: Every change is recorded with full context
- Reproducible debugging: Can replay exact sequence of events
- Business analytics: Rich historical data for insights
- Compliance reporting: Automated audit trails
- System behavior analysis: Understand patterns and trends
- Root cause analysis: Trace events leading to issues