Leaderless Replication

Core Concept

advanced
25-30 minutes
replicationdynamoquorumseventual-consistencyavailabilitycassandra

Dynamo-style replication with quorums and eventual consistency

Leaderless Replication

Overview

Leaderless replication eliminates the concept of a designated leader node, allowing any replica to accept both read and write operations. This approach, popularized by Amazon's Dynamo paper, uses quorum-based consensus to ensure consistency while providing high availability and partition tolerance.

This pattern is ideal for systems that prioritize availability and can tolerate eventual consistency, making it popular in distributed databases like Cassandra, Riak, and DynamoDB.

Core Concepts

Quorum-Based Operations

class LeaderlessReplication:
    def __init__(self, node_id, replicas, n, r, w):
        self.node_id = node_id
        self.replicas = replicas  # List of all replica nodes
        self.n = n  # Total replicas
        self.r = r  # Read quorum
        self.w = w  # Write quorum
        self.data = {}
        self.vector_clock = {}
    
    def write(self, key, value):
        """Write to W replicas"""
        # Determine replicas responsible for this key
        responsible_replicas = self.get_replicas_for_key(key, self.n)
        
        # Create versioned entry
        entry = {
            'key': key,
            'value': value,
            'timestamp': time.time(),
            'vector_clock': self.increment_vector_clock()
        }
        
        # Send write to all N replicas
        successful_writes = 0
        for replica in responsible_replicas:
            try:
                replica.store(key, entry)
                successful_writes += 1
                
                if successful_writes >= self.w:
                    return True  # Write successful
            except Exception:
                continue
        
        return False  # Write failed
    
    def read(self, key):
        """Read from R replicas and resolve conflicts"""
        responsible_replicas = self.get_replicas_for_key(key, self.n)
        
        responses = []
        successful_reads = 0
        
        for replica in responsible_replicas:
            try:
                response = replica.get(key)
                if response:
                    responses.append(response)
                successful_reads += 1
                
                if successful_reads >= self.r:
                    break  # Got enough responses
            except Exception:
                continue
        
        if successful_reads < self.r:
            raise Exception("Not enough replicas available for read")
        
        # Resolve conflicts and return latest value
        return self.resolve_read_conflicts(responses)

Consistency Models

Eventual Consistency with Read Repair

def resolve_read_conflicts(self, responses):
    """Resolve conflicts using vector clocks and read repair"""
    if not responses:
        return None
    
    if len(responses) == 1:
        return responses[0]['value']
    
    # Find the most recent version using vector clocks
    latest_response = responses[0]
    
    for response in responses[1:]:
        if self.is_newer(response['vector_clock'], latest_response['vector_clock']):
            latest_response = response
    
    # Perform read repair - update replicas with latest value
    self.read_repair(responses, latest_response)
    
    return latest_response['value']

def read_repair(self, all_responses, latest_response):
    """Update outdated replicas with latest value"""
    for response in all_responses:
        if response['vector_clock'] != latest_response['vector_clock']:
            # This replica is outdated, send it the latest value
            replica = response['replica']
            replica.store(latest_response['key'], latest_response)

Hinted Handoff for Temporary Failures

class HintedHandoff:
    def __init__(self):
        self.hints = {}  # node_id -> list of hints
    
    def store_hint(self, target_node, key, value, metadata):
        """Store hint when target node is unavailable"""
        if target_node not in self.hints:
            self.hints[target_node] = []
        
        hint = {
            'key': key,
            'value': value,
            'metadata': metadata,
            'timestamp': time.time(),
            'target_node': target_node
        }
        
        self.hints[target_node].append(hint)
    
    def replay_hints(self, recovered_node):
        """Replay stored hints when node recovers"""
        if recovered_node not in self.hints:
            return
        
        hints_to_replay = self.hints[recovered_node]
        successful_hints = []
        
        for hint in hints_to_replay:
            try:
                recovered_node.store(hint['key'], hint['value'], hint['metadata'])
                successful_hints.append(hint)
            except Exception:
                # Keep hint for later retry
                continue
        
        # Remove successfully replayed hints
        for hint in successful_hints:
            self.hints[recovered_node].remove(hint)

Anti-Entropy and Merkle Trees

class MerkleTree:
    def __init__(self, data_ranges):
        self.data_ranges = data_ranges
        self.tree = self.build_tree()
    
    def build_tree(self):
        """Build Merkle tree for efficient comparison"""
        leaves = []
        
        for data_range in self.data_ranges:
            # Hash all data in this range
            range_hash = self.hash_range(data_range)
            leaves.append(range_hash)
        
        return self.build_tree_recursive(leaves)
    
    def compare_with_peer(self, peer_merkle_tree):
        """Find differences with peer's Merkle tree"""
        differences = []
        self.compare_nodes(self.tree, peer_merkle_tree.tree, differences, [])
        return differences
    
    def compare_nodes(self, local_node, peer_node, differences, path):
        """Recursively compare tree nodes"""
        if local_node['hash'] == peer_node['hash']:
            return  # Subtrees are identical
        
        if 'children' not in local_node:
            # Leaf node - this range differs
            differences.append(path)
            return
        
        # Compare children
        for i, (local_child, peer_child) in enumerate(zip(
            local_node['children'], peer_node['children']
        )):
            self.compare_nodes(local_child, peer_child, differences, path + [i])

def anti_entropy_repair(replica1, replica2):
    """Perform anti-entropy repair between two replicas"""
    # Build Merkle trees
    tree1 = replica1.build_merkle_tree()
    tree2 = replica2.build_merkle_tree()
    
    # Find differences
    differences = tree1.compare_with_peer(tree2)
    
    # Sync different ranges
    for diff_range in differences:
        data1 = replica1.get_range_data(diff_range)
        data2 = replica2.get_range_data(diff_range)
        
        # Merge and update both replicas
        merged_data = merge_range_data(data1, data2)
        replica1.update_range(diff_range, merged_data)
        replica2.update_range(diff_range, merged_data)

Consistent Hashing for Data Placement

import hashlib

class ConsistentHashRing:
    def __init__(self, nodes, replicas_per_node=3):
        self.nodes = nodes
        self.replicas_per_node = replicas_per_node
        self.ring = {}
        self.sorted_keys = []
        self.build_ring()
    
    def build_ring(self):
        """Build consistent hash ring"""
        for node in self.nodes:
            for i in range(self.replicas_per_node):
                key = self.hash(f"{node}:{i}")
                self.ring[key] = node
        
        self.sorted_keys = sorted(self.ring.keys())
    
    def hash(self, key):
        """Hash function for ring placement"""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def get_replicas_for_key(self, key, n_replicas):
        """Get N replicas responsible for a key"""
        if not self.ring:
            return []
        
        key_hash = self.hash(str(key))
        
        # Find first node clockwise from key position
        idx = self.find_next_node_index(key_hash)
        
        replicas = []
        seen_nodes = set()
        
        # Collect N unique replicas moving clockwise
        while len(replicas) < n_replicas and len(seen_nodes) < len(self.nodes):
            ring_key = self.sorted_keys[idx % len(self.sorted_keys)]
            node = self.ring[ring_key]
            
            if node not in seen_nodes:
                replicas.append(node)
                seen_nodes.add(node)
            
            idx += 1
        
        return replicas

Real-World Implementation: Cassandra

# Cassandra-style configuration
CASSANDRA_CONFIG = {
    'replication_factor': 3,        # N = 3
    'consistency_level': {
        'write': 'QUORUM',          # W = 2
        'read': 'QUORUM'            # R = 2
    },
    'hinted_handoff_enabled': True,
    'read_repair_chance': 0.1
}

# CQL examples
"""
-- Create keyspace with replication
CREATE KEYSPACE mykeyspace
WITH REPLICATION = {
    'class': 'SimpleStrategy',
    'replication_factor': 3
};

-- Write with consistency level
INSERT INTO users (id, name, email) VALUES (1, 'John', 'john@example.com')
USING CONSISTENCY QUORUM;

-- Read with consistency level  
SELECT * FROM users WHERE id = 1
USING CONSISTENCY QUORUM;
"""

Tunable Consistency

The R + W > N rule provides strong consistency:

# Strong consistency: R + W > N
# Example: N=3, R=2, W=2 (2 + 2 > 3)

# Eventual consistency: R + W ≤ N  
# Example: N=3, R=1, W=1 (1 + 1 ≤ 3)

class TunableConsistency:
    def __init__(self, n, r, w):
        self.n = n  # Total replicas
        self.r = r  # Read quorum
        self.w = w  # Write quorum
        
        self.consistency_level = self.determine_consistency()
    
    def determine_consistency(self):
        if self.r + self.w > self.n:
            return "STRONG"
        else:
            return "EVENTUAL"
    
    def availability_score(self):
        """Calculate availability based on quorum sizes"""
        # Higher quorums = lower availability but stronger consistency
        max_failures_read = self.n - self.r
        max_failures_write = self.n - self.w
        
        return {
            'max_read_failures': max_failures_read,
            'max_write_failures': max_failures_write,
            'consistency_level': self.consistency_level
        }

Performance Characteristics

Write Performance:

  • Latency: P99 of W replicas
  • Throughput: Limited by slowest W replicas
  • Availability: Can tolerate N-W failures

Read Performance:

  • Latency: P99 of R replicas
  • Consistency: Depends on R+W vs N
  • Availability: Can tolerate N-R failures

Trade-offs

Advantages:

  • No single point of failure
  • High availability
  • Tunable consistency
  • Scales horizontally
  • Partition tolerant

Disadvantages:

  • Eventual consistency complexity
  • Read/write latency from quorums
  • Conflict resolution overhead
  • Complex failure scenarios
  • Network partition challenges

Leaderless replication provides excellent availability and partition tolerance, making it ideal for globally distributed systems that can work with eventual consistency models.

Related Concepts

quorum-systems
eventual-consistency
vector-clocks
gossip-protocols

Used By

amazon-dynamocassandrariakvoldemort