Distributed Cache

Learn how to design a highly available, distributed caching system with consistent hashing, replication, and eviction policies.

Problem Statement

Design a distributed caching system that:

  • Stores key-value pairs in memory for fast access
  • Supports horizontal scaling across multiple nodes
  • Provides high availability and fault tolerance
  • Implements efficient eviction policies
  • Handles millions of operations per second

Requirements

Functional Requirements

  1. Put(key, value): Store a key-value pair with optional TTL
  2. Get(key): Retrieve value for a given key
  3. Delete(key): Remove a key-value pair
  4. Eviction: Automatically remove least recently/frequently used items
  5. TTL Support: Auto-expire keys after specified time

Non-Functional Requirements

  1. Low Latency: Sub-millisecond read/write operations
  2. High Throughput: Handle millions of requests per second
  3. Scalability: Horizontal scaling by adding nodes
  4. High Availability: 99.99% uptime with replication
  5. Consistency: Configurable (eventual vs strong)

Extended Requirements

  • Atomic operations (increment, decrement)
  • Pub/Sub messaging
  • Data persistence options
  • Cluster management

Capacity Estimation

Memory Requirements

  • Average key size: 50 bytes
  • Average value size: 1 KB
  • Total entries: 1 billion keys
  • Total Memory: 1B × 1KB = 1TB across cluster

QPS (Queries Per Second)

  • Read Operations: 10 million/sec
  • Write Operations: 1 million/sec
  • Read:Write Ratio: 10:1

System APIs

class CacheAPI:
    def put(self, key: str, value: Any, ttl: int = None) -> bool:
        """
        Store key-value pair with optional TTL.
        
        Args:
            key: Cache key
            value: Value to store (serialized)
            ttl: Time-to-live in seconds
            
        Returns:
            Success status
        """
        pass
    
    def get(self, key: str) -> Optional[Any]:
        """
        Retrieve value for given key.
        
        Returns:
            Cached value or None if not found
        """
        pass
    
    def delete(self, key: str) -> bool:
        """Delete key from cache"""
        pass
    
    def exists(self, key: str) -> bool:
        """Check if key exists"""
        pass
    
    def increment(self, key: str, delta: int = 1) -> int:
        """Atomically increment value"""
        pass

High-Level Architecture

                    ┌──────────────────┐
                    │  Load Balancer   │
                    └─────────┬────────┘
                              │
        ┌─────────────────────┼─────────────────────┐
        │                     │                     │
   ┌────▼─────┐         ┌────▼─────┐         ┌────▼─────┐
   │  Cache   │         │  Cache   │         │  Cache   │
   │  Node 1  │◄───────►│  Node 2  │◄───────►│  Node 3  │
   │ (Master) │         │ (Master) │         │ (Master) │
   └────┬─────┘         └────┬─────┘         └────┬─────┘
        │                    │                    │
   ┌────▼─────┐         ┌────▼─────┐         ┌────▼─────┐
   │  Cache   │         │  Cache   │         │  Cache   │
   │  Node 4  │         │  Node 5  │         │  Node 6  │
   │ (Replica)│         │ (Replica)│         │ (Replica)│
   └──────────┘         └──────────┘         └──────────┘

Core Components

1. Consistent Hashing

Distribute keys across nodes to minimize data movement when nodes are added/removed.

import hashlib
from typing import List, Optional
from bisect import bisect_right

class ConsistentHash:
    def __init__(self, nodes: List[str], virtual_nodes: int = 150):
        """
        Initialize consistent hash ring.
        
        Args:
            nodes: List of cache node identifiers
            virtual_nodes: Number of virtual nodes per physical node
        """
        self.virtual_nodes = virtual_nodes
        self.ring = {}  # hash -> node mapping
        self.sorted_keys = []
        
        for node in nodes:
            self.add_node(node)
    
    def _hash(self, key: str) -> int:
        """Generate hash for key"""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def add_node(self, node: str):
        """Add node to hash ring"""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:{i}"
            hash_value = self._hash(virtual_key)
            self.ring[hash_value] = node
            self.sorted_keys.append(hash_value)
        
        self.sorted_keys.sort()
    
    def remove_node(self, node: str):
        """Remove node from hash ring"""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:{i}"
            hash_value = self._hash(virtual_key)
            del self.ring[hash_value]
            self.sorted_keys.remove(hash_value)
    
    def get_node(self, key: str) -> str:
        """Get node responsible for key"""
        if not self.ring:
            return None
        
        hash_value = self._hash(key)
        idx = bisect_right(self.sorted_keys, hash_value)
        
        # Wrap around to first node
        if idx == len(self.sorted_keys):
            idx = 0
        
        return self.ring[self.sorted_keys[idx]]

2. Cache Node Implementation

from collections import OrderedDict
import time
from threading import Lock

class CacheNode:
    def __init__(self, capacity: int, eviction_policy: str = "LRU"):
        """
        Initialize cache node.
        
        Args:
            capacity: Maximum number of items
            eviction_policy: "LRU", "LFU", or "FIFO"
        """
        self.capacity = capacity
        self.eviction_policy = eviction_policy
        self.cache = OrderedDict()  # For LRU
        self.ttl_map = {}  # key -> expiration timestamp
        self.lock = Lock()
    
    def get(self, key: str) -> Optional[Any]:
        """Get value with LRU update"""
        with self.lock:
            # Check if expired
            if key in self.ttl_map:
                if time.time() > self.ttl_map[key]:
                    self._evict(key)
                    return None
            
            if key not in self.cache:
                return None
            
            # Move to end (most recently used)
            self.cache.move_to_end(key)
            return self.cache[key]
    
    def put(self, key: str, value: Any, ttl: int = None):
        """Store value with optional TTL"""
        with self.lock:
            # Update existing key
            if key in self.cache:
                self.cache.move_to_end(key)
                self.cache[key] = value
            else:
                # Check capacity
                if len(self.cache) >= self.capacity:
                    self._evict_lru()
                
                self.cache[key] = value
            
            # Set TTL
            if ttl:
                self.ttl_map[key] = time.time() + ttl
    
    def delete(self, key: str) -> bool:
        """Remove key from cache"""
        with self.lock:
            if key in self.cache:
                del self.cache[key]
                if key in self.ttl_map:
                    del self.ttl_map[key]
                return True
            return False
    
    def _evict_lru(self):
        """Evict least recently used item"""
        if self.cache:
            oldest_key = next(iter(self.cache))
            self._evict(oldest_key)
    
    def _evict(self, key: str):
        """Evict specific key"""
        if key in self.cache:
            del self.cache[key]
        if key in self.ttl_map:
            del self.ttl_map[key]

3. Distributed Cache Client

class DistributedCache:
    def __init__(self, nodes: List[str]):
        """
        Initialize distributed cache client.
        
        Args:
            nodes: List of cache node addresses
        """
        self.consistent_hash = ConsistentHash(nodes)
        self.node_clients = {}  # node -> client connection
        self.replication_factor = 2
        
        for node in nodes:
            self.node_clients[node] = self._create_connection(node)
    
    def put(self, key: str, value: Any, ttl: int = None) -> bool:
        """Store key-value pair with replication"""
        try:
            # Get primary node
            primary_node = self.consistent_hash.get_node(key)
            
            # Write to primary
            client = self.node_clients[primary_node]
            client.put(key, value, ttl)
            
            # Async replication to replicas
            self._replicate_async(key, value, ttl, primary_node)
            
            return True
        except Exception as e:
            print(f"Put failed: {e}")
            return False
    
    def get(self, key: str) -> Optional[Any]:
        """Retrieve value from cache"""
        try:
            # Get node for key
            node = self.consistent_hash.get_node(key)
            client = self.node_clients[node]
            
            return client.get(key)
        except Exception as e:
            # Try replica on failure
            return self._get_from_replica(key)
    
    def delete(self, key: str) -> bool:
        """Delete key from all replicas"""
        node = self.consistent_hash.get_node(key)
        client = self.node_clients[node]
        
        # Delete from primary and replicas
        success = client.delete(key)
        self._delete_from_replicas(key, node)
        
        return success
    
    def _get_from_replica(self, key: str) -> Optional[Any]:
        """Fallback to replica nodes"""
        # Implementation to query replica nodes
        pass
    
    def _replicate_async(self, key: str, value: Any, 
                        ttl: int, primary_node: str):
        """Asynchronously replicate to backup nodes"""
        # Get next N nodes in hash ring as replicas
        # Write to replicas in background
        pass

Eviction Policies

LRU (Least Recently Used)

class LRUCache:
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache = OrderedDict()
    
    def get(self, key: str) -> Optional[Any]:
        if key not in self.cache:
            return None
        self.cache.move_to_end(key)
        return self.cache[key]
    
    def put(self, key: str, value: Any):
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = value
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)

LFU (Least Frequently Used)

from collections import defaultdict

class LFUCache:
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache = {}  # key -> value
        self.freq = defaultdict(int)  # key -> frequency
        self.min_freq = 0
        self.freq_list = defaultdict(OrderedDict)  # freq -> {keys}
    
    def get(self, key: str) -> Optional[Any]:
        if key not in self.cache:
            return None
        
        # Update frequency
        self._update_freq(key)
        return self.cache[key]
    
    def put(self, key: str, value: Any):
        if self.capacity == 0:
            return
        
        if key in self.cache:
            self.cache[key] = value
            self._update_freq(key)
        else:
            if len(self.cache) >= self.capacity:
                self._evict()
            
            self.cache[key] = value
            self.freq[key] = 1
            self.freq_list[1][key] = None
            self.min_freq = 1
    
    def _update_freq(self, key: str):
        freq = self.freq[key]
        del self.freq_list[freq][key]
        
        if not self.freq_list[freq] and freq == self.min_freq:
            self.min_freq += 1
        
        self.freq[key] += 1
        self.freq_list[self.freq[key]][key] = None
    
    def _evict(self):
        key, _ = self.freq_list[self.min_freq].popitem(last=False)
        del self.cache[key]
        del self.freq[key]

Replication Strategy

Master-Slave Replication

class ReplicationManager:
    def __init__(self, master_node: str, slave_nodes: List[str]):
        self.master = master_node
        self.slaves = slave_nodes
        self.replication_log = []
    
    def replicate_write(self, operation: dict):
        """
        Replicate write operation to slaves.
        
        Args:
            operation: {
                'type': 'PUT' | 'DELETE',
                'key': str,
                'value': Any,
                'ttl': int
            }
        """
        # Log operation
        self.replication_log.append(operation)
        
        # Async replication to slaves
        for slave in self.slaves:
            self._send_to_slave(slave, operation)
    
    def _send_to_slave(self, slave: str, operation: dict):
        """Send operation to slave node (async)"""
        # Network call to slave
        # Handle failures with retry logic
        pass

Data Persistence

AOF (Append-Only File)

class PersistenceManager:
    def __init__(self, filepath: str):
        self.filepath = filepath
        self.file = open(filepath, 'a')
    
    def log_operation(self, operation: dict):
        """Append operation to log file"""
        import json
        self.file.write(json.dumps(operation) + '\n')
        self.file.flush()
    
    def restore(self) -> dict:
        """Restore cache from log file"""
        cache = {}
        with open(self.filepath, 'r') as f:
            for line in f:
                op = json.loads(line)
                if op['type'] == 'PUT':
                    cache[op['key']] = op['value']
                elif op['type'] == 'DELETE':
                    cache.pop(op['key'], None)
        return cache

Monitoring and Metrics

class CacheMetrics:
    def __init__(self):
        self.hit_count = 0
        self.miss_count = 0
        self.eviction_count = 0
    
    def record_hit(self):
        self.hit_count += 1
    
    def record_miss(self):
        self.miss_count += 1
    
    def record_eviction(self):
        self.eviction_count += 1
    
    def get_hit_rate(self) -> float:
        total = self.hit_count + self.miss_count
        if total == 0:
            return 0.0
        return self.hit_count / total
    
    def get_stats(self) -> dict:
        return {
            'hits': self.hit_count,
            'misses': self.miss_count,
            'hit_rate': self.get_hit_rate(),
            'evictions': self.eviction_count
        }

Scaling Considerations

Horizontal Scaling

  • Add nodes to hash ring
  • Use consistent hashing to minimize data movement
  • Rebalance only affected key ranges (typically ~1/N keys)

Sharding Strategy

  • Hash-based sharding (consistent hashing)
  • Range-based sharding (ordered keys)
  • Geographic sharding (latency optimization)

Hot Key Problem

  • Replicate popular keys to multiple nodes
  • Use local cache tier (L1 cache)
  • Implement read-through cache pattern

Trade-offs

Consistency vs Availability

  • Strong Consistency: Synchronous replication, higher latency
  • Eventual Consistency: Async replication, better performance

Memory vs Persistence

  • Pure In-Memory: Fastest but data loss on crash
  • Hybrid: Periodic snapshots + AOF logs
  • Full Persistence: Slower but durable

Eviction Policy Choice

  • LRU: Good for temporal locality, O(1) operations
  • LFU: Better for frequency patterns, more memory overhead
  • TTL-based: Predictable expiration, requires timestamp management

Summary

Key design decisions:

  • Consistent hashing for data distribution
  • Master-slave replication for high availability
  • LRU eviction for memory management
  • Optional persistence with AOF/snapshots
  • Partition tolerance over strong consistency (AP in CAP)

This design provides a highly scalable, low-latency distributed cache similar to Redis/Memcached.