Conflict Resolution Strategies
Core Concept
Techniques for handling concurrent writes and conflicts in distributed systems
Conflict Resolution Strategies
Overview
Conflict resolution is crucial in distributed systems where multiple nodes can receive concurrent writes to the same data. When replicas diverge due to network partitions or concurrent updates, systems need strategies to detect and resolve these conflicts to maintain data consistency, like having a referee who decides what happens when two players try to claim the same ball.
This concept is fundamental to multi-leader replication systems and eventually consistent databases.
System Architecture Diagram
Common Conflict Resolution Strategies
1. Last-Write-Wins (LWW)
The simplest approach uses timestamps to resolve conflicts, like deciding which version of a document to keep based on when it was last modified. This method assigns each write operation a timestamp and keeps the version with the most recent timestamp. While simple to implement, it can result in data loss if the "winning" write was actually based on stale data.
How it works: When multiple versions of the same data exist, the system compares timestamps and keeps the version with the latest timestamp. If timestamps are identical, it uses a deterministic tie-breaker like node ID to ensure consistent resolution across all replicas.
Trade-offs: This approach is simple and fast but may lose important data if the latest write was based on outdated information. It's suitable for systems where data loss is acceptable or where timestamps reliably reflect the true order of operations.
2. Multi-Value Approach
Keep all conflicting values and let application resolve:
class MultiValue:
def __init__(self):
self.data = {} # key -> list of conflicting values
def write(self, key, value, context=None):
if key not in self.data:
self.data[key] = []
entry = {
'value': value,
'context': context,
'timestamp': time.time()
}
# Remove superseded values
self.data[key] = [v for v in self.data[key] if not self.supersedes(entry, v)]
# Add new value if it's not superseded
if not any(self.supersedes(v, entry) for v in self.data[key]):
self.data[key].append(entry)
def read(self, key):
"""Return all concurrent values"""
return self.data.get(key, [])
def supersedes(self, entry1, entry2):
"""Check if entry1 supersedes entry2 using vector clocks"""
if not entry1['context'] or not entry2['context']:
return False
return self.vector_clock_dominates(entry1['context'], entry2['context'])
3. Application-Specific Resolution
Custom logic based on data semantics:
class ShoppingCartResolver:
def resolve_cart_conflict(self, carts):
"""Merge shopping carts by combining items"""
merged_items = {}
for cart in carts:
for item_id, quantity in cart['items'].items():
if item_id in merged_items:
# Add quantities for same item
merged_items[item_id] = max(merged_items[item_id], quantity)
else:
merged_items[item_id] = quantity
return {
'items': merged_items,
'user_id': carts[0]['user_id'], # Should be same across all carts
'timestamp': max(cart['timestamp'] for cart in carts)
}
class BankAccountResolver:
def resolve_balance_conflict(self, operations):
"""Resolve account operations by replaying in timestamp order"""
# Sort operations by timestamp
sorted_ops = sorted(operations, key=lambda x: x['timestamp'])
balance = 0
applied_ops = []
for op in sorted_ops:
if op['type'] == 'deposit':
balance += op['amount']
applied_ops.append(op)
elif op['type'] == 'withdrawal':
if balance >= op['amount']:
balance -= op['amount']
applied_ops.append(op)
else:
# Insufficient funds - reject operation
pass
return {
'balance': balance,
'operations': applied_ops
}
Vector Clocks for Conflict Detection
class VectorClock:
def __init__(self, node_id):
self.node_id = node_id
self.clock = {}
def increment(self):
"""Increment local node's clock"""
self.clock[self.node_id] = self.clock.get(self.node_id, 0) + 1
return self.clock.copy()
def update(self, other_clock):
"""Update clock with received clock"""
for node, timestamp in other_clock.items():
self.clock[node] = max(self.clock.get(node, 0), timestamp)
# Increment local clock
self.increment()
def compare(self, other_clock):
"""Compare two vector clocks"""
all_nodes = set(self.clock.keys()) | set(other_clock.keys())
self_dominates = True
other_dominates = True
for node in all_nodes:
self_val = self.clock.get(node, 0)
other_val = other_clock.get(node, 0)
if self_val < other_val:
self_dominates = False
if self_val > other_val:
other_dominates = False
if self_dominates and not other_dominates:
return "DOMINATES"
elif other_dominates and not self_dominates:
return "DOMINATED"
elif self.clock == other_clock:
return "EQUAL"
else:
return "CONCURRENT"
Conflict-Free Replicated Data Types (CRDTs)
class GSetCRDT:
"""Grow-only set CRDT"""
def __init__(self):
self.elements = set()
def add(self, element):
"""Add element to set"""
self.elements.add(element)
def contains(self, element):
"""Check if element exists"""
return element in self.elements
def merge(self, other):
"""Merge with another G-Set (conflict-free)"""
result = GSetCRDT()
result.elements = self.elements | other.elements
return result
class ORSetCRDT:
"""Observed-Remove set CRDT"""
def __init__(self, node_id):
self.node_id = node_id
self.added = {} # element -> set of unique tags
self.removed = set() # set of removed tags
self.tag_counter = 0
def add(self, element):
"""Add element with unique tag"""
tag = f"{self.node_id}:{self.tag_counter}"
self.tag_counter += 1
if element not in self.added:
self.added[element] = set()
self.added[element].add(tag)
return tag
def remove(self, element):
"""Remove element by marking all its tags as removed"""
if element in self.added:
self.removed.update(self.added[element])
def contains(self, element):
"""Element exists if it has tags not in removed set"""
if element not in self.added:
return False
return bool(self.added[element] - self.removed)
def merge(self, other):
"""Merge with another OR-Set"""
result = ORSetCRDT(self.node_id)
# Merge added elements
all_elements = set(self.added.keys()) | set(other.added.keys())
for element in all_elements:
self_tags = self.added.get(element, set())
other_tags = other.added.get(element, set())
result.added[element] = self_tags | other_tags
# Merge removed tags
result.removed = self.removed | other.removed
return result
Operational Transformation (OT)
class TextOperation:
def __init__(self, op_type, position, content=""):
self.type = op_type # 'insert', 'delete', 'retain'
self.position = position
self.content = content
def __repr__(self):
return f"{self.type}({self.position}, '{self.content}')"
class OperationalTransform:
def transform_operations(self, op1, op2):
"""Transform operations for concurrent execution"""
if op1.type == 'insert' and op2.type == 'insert':
return self.transform_insert_insert(op1, op2)
elif op1.type == 'insert' and op2.type == 'delete':
return self.transform_insert_delete(op1, op2)
elif op1.type == 'delete' and op2.type == 'insert':
return self.transform_delete_insert(op1, op2)
elif op1.type == 'delete' and op2.type == 'delete':
return self.transform_delete_delete(op1, op2)
return op1, op2
def transform_insert_insert(self, op1, op2):
"""Transform two concurrent insert operations"""
if op1.position <= op2.position:
# op1 comes before op2, adjust op2's position
new_op2 = TextOperation('insert', op2.position + len(op1.content), op2.content)
return op1, new_op2
else:
# op2 comes before op1, adjust op1's position
new_op1 = TextOperation('insert', op1.position + len(op2.content), op1.content)
return new_op1, op2
Best Practices
- Choose Strategy Based on Use Case:
- LWW: Simple systems, acceptable data loss
- Multi-value: User can resolve conflicts
- CRDTs: Mathematical properties important
- Custom: Domain-specific requirements
- Design for Conflicts:
- Minimize conflicting operations
- Use commutative operations when possible
- Partition data to reduce conflicts
- Monitor and Measure:
- Track conflict rates
- Measure resolution latency
- Alert on high conflict scenarios
- Test Thoroughly:
- Simulate network partitions
- Test concurrent write scenarios
- Validate resolution correctness
Conflict resolution is a critical aspect of distributed systems that directly impacts data consistency and user experience.