Problem Statement
Design a distributed caching system that:
- Stores key-value pairs in memory for fast access
- Supports horizontal scaling across multiple nodes
- Provides high availability and fault tolerance
- Implements efficient eviction policies
- Handles millions of operations per second
Requirements
Functional Requirements
- Put(key, value): Store a key-value pair with optional TTL
- Get(key): Retrieve value for a given key
- Delete(key): Remove a key-value pair
- Eviction: Automatically remove least recently/frequently used items
- TTL Support: Auto-expire keys after specified time
Non-Functional Requirements
- Low Latency: Sub-millisecond read/write operations
- High Throughput: Handle millions of requests per second
- Scalability: Horizontal scaling by adding nodes
- High Availability: 99.99% uptime with replication
- Consistency: Configurable (eventual vs strong)
Extended Requirements
- Atomic operations (increment, decrement)
- Pub/Sub messaging
- Data persistence options
- Cluster management
Capacity Estimation
Memory Requirements
- Average key size: 50 bytes
- Average value size: 1 KB
- Total entries: 1 billion keys
- Total Memory: 1B × 1KB = 1TB across cluster
QPS (Queries Per Second)
- Read Operations: 10 million/sec
- Write Operations: 1 million/sec
- Read:Write Ratio: 10:1
System APIs
class CacheAPI:
def put(self, key: str, value: Any, ttl: int = None) -> bool:
"""
Store key-value pair with optional TTL.
Args:
key: Cache key
value: Value to store (serialized)
ttl: Time-to-live in seconds
Returns:
Success status
"""
pass
def get(self, key: str) -> Optional[Any]:
"""
Retrieve value for given key.
Returns:
Cached value or None if not found
"""
pass
def delete(self, key: str) -> bool:
"""Delete key from cache"""
pass
def exists(self, key: str) -> bool:
"""Check if key exists"""
pass
def increment(self, key: str, delta: int = 1) -> int:
"""Atomically increment value"""
pass
High-Level Architecture
┌──────────────────┐
│ Load Balancer │
└─────────┬────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌────▼─────┐ ┌────▼─────┐ ┌────▼─────┐
│ Cache │ │ Cache │ │ Cache │
│ Node 1 │◄───────►│ Node 2 │◄───────►│ Node 3 │
│ (Master) │ │ (Master) │ │ (Master) │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────▼─────┐ ┌────▼─────┐ ┌────▼─────┐
│ Cache │ │ Cache │ │ Cache │
│ Node 4 │ │ Node 5 │ │ Node 6 │
│ (Replica)│ │ (Replica)│ │ (Replica)│
└──────────┘ └──────────┘ └──────────┘
Core Components
1. Consistent Hashing
Distribute keys across nodes to minimize data movement when nodes are added/removed.
import hashlib
from typing import List, Optional
from bisect import bisect_right
class ConsistentHash:
def __init__(self, nodes: List[str], virtual_nodes: int = 150):
"""
Initialize consistent hash ring.
Args:
nodes: List of cache node identifiers
virtual_nodes: Number of virtual nodes per physical node
"""
self.virtual_nodes = virtual_nodes
self.ring = {} # hash -> node mapping
self.sorted_keys = []
for node in nodes:
self.add_node(node)
def _hash(self, key: str) -> int:
"""Generate hash for key"""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node: str):
"""Add node to hash ring"""
for i in range(self.virtual_nodes):
virtual_key = f"{node}:{i}"
hash_value = self._hash(virtual_key)
self.ring[hash_value] = node
self.sorted_keys.append(hash_value)
self.sorted_keys.sort()
def remove_node(self, node: str):
"""Remove node from hash ring"""
for i in range(self.virtual_nodes):
virtual_key = f"{node}:{i}"
hash_value = self._hash(virtual_key)
del self.ring[hash_value]
self.sorted_keys.remove(hash_value)
def get_node(self, key: str) -> str:
"""Get node responsible for key"""
if not self.ring:
return None
hash_value = self._hash(key)
idx = bisect_right(self.sorted_keys, hash_value)
# Wrap around to first node
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
2. Cache Node Implementation
from collections import OrderedDict
import time
from threading import Lock
class CacheNode:
def __init__(self, capacity: int, eviction_policy: str = "LRU"):
"""
Initialize cache node.
Args:
capacity: Maximum number of items
eviction_policy: "LRU", "LFU", or "FIFO"
"""
self.capacity = capacity
self.eviction_policy = eviction_policy
self.cache = OrderedDict() # For LRU
self.ttl_map = {} # key -> expiration timestamp
self.lock = Lock()
def get(self, key: str) -> Optional[Any]:
"""Get value with LRU update"""
with self.lock:
# Check if expired
if key in self.ttl_map:
if time.time() > self.ttl_map[key]:
self._evict(key)
return None
if key not in self.cache:
return None
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: str, value: Any, ttl: int = None):
"""Store value with optional TTL"""
with self.lock:
# Update existing key
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
else:
# Check capacity
if len(self.cache) >= self.capacity:
self._evict_lru()
self.cache[key] = value
# Set TTL
if ttl:
self.ttl_map[key] = time.time() + ttl
def delete(self, key: str) -> bool:
"""Remove key from cache"""
with self.lock:
if key in self.cache:
del self.cache[key]
if key in self.ttl_map:
del self.ttl_map[key]
return True
return False
def _evict_lru(self):
"""Evict least recently used item"""
if self.cache:
oldest_key = next(iter(self.cache))
self._evict(oldest_key)
def _evict(self, key: str):
"""Evict specific key"""
if key in self.cache:
del self.cache[key]
if key in self.ttl_map:
del self.ttl_map[key]
3. Distributed Cache Client
class DistributedCache:
def __init__(self, nodes: List[str]):
"""
Initialize distributed cache client.
Args:
nodes: List of cache node addresses
"""
self.consistent_hash = ConsistentHash(nodes)
self.node_clients = {} # node -> client connection
self.replication_factor = 2
for node in nodes:
self.node_clients[node] = self._create_connection(node)
def put(self, key: str, value: Any, ttl: int = None) -> bool:
"""Store key-value pair with replication"""
try:
# Get primary node
primary_node = self.consistent_hash.get_node(key)
# Write to primary
client = self.node_clients[primary_node]
client.put(key, value, ttl)
# Async replication to replicas
self._replicate_async(key, value, ttl, primary_node)
return True
except Exception as e:
print(f"Put failed: {e}")
return False
def get(self, key: str) -> Optional[Any]:
"""Retrieve value from cache"""
try:
# Get node for key
node = self.consistent_hash.get_node(key)
client = self.node_clients[node]
return client.get(key)
except Exception as e:
# Try replica on failure
return self._get_from_replica(key)
def delete(self, key: str) -> bool:
"""Delete key from all replicas"""
node = self.consistent_hash.get_node(key)
client = self.node_clients[node]
# Delete from primary and replicas
success = client.delete(key)
self._delete_from_replicas(key, node)
return success
def _get_from_replica(self, key: str) -> Optional[Any]:
"""Fallback to replica nodes"""
# Implementation to query replica nodes
pass
def _replicate_async(self, key: str, value: Any,
ttl: int, primary_node: str):
"""Asynchronously replicate to backup nodes"""
# Get next N nodes in hash ring as replicas
# Write to replicas in background
pass
Eviction Policies
LRU (Least Recently Used)
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key: str) -> Optional[Any]:
if key not in self.cache:
return None
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: str, value: Any):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
LFU (Least Frequently Used)
from collections import defaultdict
class LFUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = {} # key -> value
self.freq = defaultdict(int) # key -> frequency
self.min_freq = 0
self.freq_list = defaultdict(OrderedDict) # freq -> {keys}
def get(self, key: str) -> Optional[Any]:
if key not in self.cache:
return None
# Update frequency
self._update_freq(key)
return self.cache[key]
def put(self, key: str, value: Any):
if self.capacity == 0:
return
if key in self.cache:
self.cache[key] = value
self._update_freq(key)
else:
if len(self.cache) >= self.capacity:
self._evict()
self.cache[key] = value
self.freq[key] = 1
self.freq_list[1][key] = None
self.min_freq = 1
def _update_freq(self, key: str):
freq = self.freq[key]
del self.freq_list[freq][key]
if not self.freq_list[freq] and freq == self.min_freq:
self.min_freq += 1
self.freq[key] += 1
self.freq_list[self.freq[key]][key] = None
def _evict(self):
key, _ = self.freq_list[self.min_freq].popitem(last=False)
del self.cache[key]
del self.freq[key]
Replication Strategy
Master-Slave Replication
class ReplicationManager:
def __init__(self, master_node: str, slave_nodes: List[str]):
self.master = master_node
self.slaves = slave_nodes
self.replication_log = []
def replicate_write(self, operation: dict):
"""
Replicate write operation to slaves.
Args:
operation: {
'type': 'PUT' | 'DELETE',
'key': str,
'value': Any,
'ttl': int
}
"""
# Log operation
self.replication_log.append(operation)
# Async replication to slaves
for slave in self.slaves:
self._send_to_slave(slave, operation)
def _send_to_slave(self, slave: str, operation: dict):
"""Send operation to slave node (async)"""
# Network call to slave
# Handle failures with retry logic
pass
Data Persistence
AOF (Append-Only File)
class PersistenceManager:
def __init__(self, filepath: str):
self.filepath = filepath
self.file = open(filepath, 'a')
def log_operation(self, operation: dict):
"""Append operation to log file"""
import json
self.file.write(json.dumps(operation) + '\n')
self.file.flush()
def restore(self) -> dict:
"""Restore cache from log file"""
cache = {}
with open(self.filepath, 'r') as f:
for line in f:
op = json.loads(line)
if op['type'] == 'PUT':
cache[op['key']] = op['value']
elif op['type'] == 'DELETE':
cache.pop(op['key'], None)
return cache
Monitoring and Metrics
class CacheMetrics:
def __init__(self):
self.hit_count = 0
self.miss_count = 0
self.eviction_count = 0
def record_hit(self):
self.hit_count += 1
def record_miss(self):
self.miss_count += 1
def record_eviction(self):
self.eviction_count += 1
def get_hit_rate(self) -> float:
total = self.hit_count + self.miss_count
if total == 0:
return 0.0
return self.hit_count / total
def get_stats(self) -> dict:
return {
'hits': self.hit_count,
'misses': self.miss_count,
'hit_rate': self.get_hit_rate(),
'evictions': self.eviction_count
}
Scaling Considerations
Horizontal Scaling
- Add nodes to hash ring
- Use consistent hashing to minimize data movement
- Rebalance only affected key ranges (typically ~1/N keys)
Sharding Strategy
- Hash-based sharding (consistent hashing)
- Range-based sharding (ordered keys)
- Geographic sharding (latency optimization)
Hot Key Problem
- Replicate popular keys to multiple nodes
- Use local cache tier (L1 cache)
- Implement read-through cache pattern
Trade-offs
Consistency vs Availability
- Strong Consistency: Synchronous replication, higher latency
- Eventual Consistency: Async replication, better performance
Memory vs Persistence
- Pure In-Memory: Fastest but data loss on crash
- Hybrid: Periodic snapshots + AOF logs
- Full Persistence: Slower but durable
Eviction Policy Choice
- LRU: Good for temporal locality, O(1) operations
- LFU: Better for frequency patterns, more memory overhead
- TTL-based: Predictable expiration, requires timestamp management
Summary
Key design decisions:
- Consistent hashing for data distribution
- Master-slave replication for high availability
- LRU eviction for memory management
- Optional persistence with AOF/snapshots
- Partition tolerance over strong consistency (AP in CAP)
This design provides a highly scalable, low-latency distributed cache similar to Redis/Memcached.