Find Median from Data Stream

Design data structure to find median from continuous stream of integers

Language Selection

Choose your preferred programming language

Showing: Python

Find Median from Data Stream

Problem Statement

The median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value, and the median is the mean of the two middle values.

Design a data structure that supports the following two operations:

  • addNum(num): Add an integer from the data stream to the data structure
  • findMedian(): Return the median of all elements so far

Input/Output Specifications

  • addNum(num):
    • Input: Integer num
    • Output: None (void)
  • findMedian():
    • Input: None
    • Output: Double representing the median

Constraints

  • -10^5 <= num <= 10^5
  • There will be at least one element before calling findMedian
  • At most 5 * 10^4 calls will be made to addNum and findMedian

Examples

Example 1:

Input:
["MedianFinder", "addNum", "addNum", "findMedian", "addNum", "findMedian"]
[[], [1], [2], [], [3], []]
Output:
[null, null, null, 1.5, null, 2.0]

Explanation:
MedianFinder medianFinder = new MedianFinder();
medianFinder.addNum(1);    // arr = [1]
medianFinder.addNum(2);    // arr = [1, 2]
medianFinder.findMedian(); // return 1.5 (i.e., (1 + 2) / 2)
medianFinder.addNum(3);    // arr = [1, 2, 3]
medianFinder.findMedian(); // return 2.0

Solution Approaches

Approach 1: Two Heaps (Optimal)

Algorithm Explanation:

  1. Use two heaps: max-heap for smaller half, min-heap for larger half
  2. Maintain invariant: max-heap size = min-heap size or max-heap size = min-heap size + 1
  3. addNum: Add to appropriate heap, rebalance if necessary
  4. findMedian: Return max-heap top (odd total) or average of both tops (even total)

Implementation:

Python:

import heapq

class MedianFinder:
    """
    Find median from data stream using two heaps
    addNum: O(log n), findMedian: O(1)
    Space: O(n)
    """
    
    def __init__(self):
        # Max-heap for smaller half (use negative values)
        self.max_heap = []  # Smaller half
        # Min-heap for larger half
        self.min_heap = []  # Larger half
    
    def addNum(self, num: int) -> None:
        # Add to max-heap first (smaller half)
        heapq.heappush(self.max_heap, -num)
        
        # Ensure max-heap top <= min-heap top
        if (self.min_heap and 
            -self.max_heap[0] > self.min_heap[0]):
            # Move from max-heap to min-heap
            val = -heapq.heappop(self.max_heap)
            heapq.heappush(self.min_heap, val)
        
        # Balance heap sizes
        if len(self.max_heap) > len(self.min_heap) + 1:
            # Move from max-heap to min-heap
            val = -heapq.heappop(self.max_heap)
            heapq.heappush(self.min_heap, val)
        elif len(self.min_heap) > len(self.max_heap):
            # Move from min-heap to max-heap
            val = heapq.heappop(self.min_heap)
            heapq.heappush(self.max_heap, -val)
    
    def findMedian(self) -> float:
        if len(self.max_heap) > len(self.min_heap):
            return float(-self.max_heap[0])
        else:
            return (-self.max_heap[0] + self.min_heap[0]) / 2.0

# Alternative cleaner implementation
class MedianFinderV2:
    def __init__(self):
        self.small = []  # Max heap (smaller half)
        self.large = []  # Min heap (larger half)
    
    def addNum(self, num: int) -> None:
        # Always add to small heap first
        heapq.heappush(self.small, -num)
        
        # Move largest from small to large if necessary
        if (self.large and 
            -self.small[0] > self.large[0]):
            val = -heapq.heappop(self.small)
            heapq.heappush(self.large, val)
        
        # Balance sizes: small can have at most 1 extra element
        if len(self.small) > len(self.large) + 1:
            val = -heapq.heappop(self.small)
            heapq.heappush(self.large, val)
        elif len(self.large) > len(self.small):
            val = heapq.heappop(self.large)
            heapq.heappush(self.small, -val)
    
    def findMedian(self) -> float:
        if len(self.small) > len(self.large):
            return -self.small[0]
        return (-self.small[0] + self.large[0]) / 2.0

Java:

import java.util.*;

class MedianFinder {
    private PriorityQueue<Integer> maxHeap; // Smaller half
    private PriorityQueue<Integer> minHeap; // Larger half
    
    /**
     * Initialize data structure
     */
    public MedianFinder() {
        maxHeap = new PriorityQueue<>(Collections.reverseOrder());
        minHeap = new PriorityQueue<>();
    }
    
    /**
     * Add number to data structure
     * Time: O(log n)
     */
    public void addNum(int num) {
        // Add to max heap first
        maxHeap.offer(num);
        
        // Ensure max heap top <= min heap top
        if (!minHeap.isEmpty() && maxHeap.peek() > minHeap.peek()) {
            minHeap.offer(maxHeap.poll());
        }
        
        // Balance heap sizes
        if (maxHeap.size() > minHeap.size() + 1) {
            minHeap.offer(maxHeap.poll());
        } else if (minHeap.size() > maxHeap.size()) {
            maxHeap.offer(minHeap.poll());
        }
    }
    
    /**
     * Find median of all elements
     * Time: O(1)
     */
    public double findMedian() {
        if (maxHeap.size() > minHeap.size()) {
            return maxHeap.peek();
        } else {
            return (maxHeap.peek() + minHeap.peek()) / 2.0;
        }
    }
}

Go:

import (
    "container/heap"
)

type MaxHeap []int
func (h MaxHeap) Len() int           { return len(h) }
func (h MaxHeap) Less(i, j int) bool { return h[i] > h[j] }
func (h MaxHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
func (h *MaxHeap) Push(x interface{}) { *h = append(*h, x.(int)) }
func (h *MaxHeap) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[0 : n-1]
    return x
}

type MinHeap []int
func (h MinHeap) Len() int           { return len(h) }
func (h MinHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h MinHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
func (h *MinHeap) Push(x interface{}) { *h = append(*h, x.(int)) }
func (h *MinHeap) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[0 : n-1]
    return x
}

type MedianFinder struct {
    maxHeap *MaxHeap // Smaller half
    minHeap *MinHeap // Larger half
}

func Constructor() MedianFinder {
    maxHeap := &MaxHeap{}
    minHeap := &MinHeap{}
    heap.Init(maxHeap)
    heap.Init(minHeap)
    return MedianFinder{maxHeap, minHeap}
}

func (this *MedianFinder) AddNum(num int) {
    // Add to max heap first
    heap.Push(this.maxHeap, num)
    
    // Ensure ordering
    if this.minHeap.Len() > 0 && (*this.maxHeap)[0] > (*this.minHeap)[0] {
        val := heap.Pop(this.maxHeap).(int)
        heap.Push(this.minHeap, val)
    }
    
    // Balance sizes
    if this.maxHeap.Len() > this.minHeap.Len()+1 {
        val := heap.Pop(this.maxHeap).(int)
        heap.Push(this.minHeap, val)
    } else if this.minHeap.Len() > this.maxHeap.Len() {
        val := heap.Pop(this.minHeap).(int)
        heap.Push(this.maxHeap, val)
    }
}

func (this *MedianFinder) FindMedian() float64 {
    if this.maxHeap.Len() > this.minHeap.Len() {
        return float64((*this.maxHeap)[0])
    }
    return float64((*this.maxHeap)[0]+(*this.minHeap)[0]) / 2.0
}

JavaScript:

class MedianFinder {
    constructor() {
        this.maxHeap = new MaxHeap(); // Smaller half
        this.minHeap = new MinHeap(); // Larger half
    }
    
    /**
     * Add number to data structure
     * Time: O(log n)
     */
    addNum(num) {
        // Add to max heap first
        this.maxHeap.push(num);
        
        // Ensure ordering
        if (this.minHeap.size() > 0 && 
            this.maxHeap.peek() > this.minHeap.peek()) {
            const val = this.maxHeap.pop();
            this.minHeap.push(val);
        }
        
        // Balance sizes
        if (this.maxHeap.size() > this.minHeap.size() + 1) {
            const val = this.maxHeap.pop();
            this.minHeap.push(val);
        } else if (this.minHeap.size() > this.maxHeap.size()) {
            const val = this.minHeap.pop();
            this.maxHeap.push(val);
        }
    }
    
    /**
     * Find median of all elements
     * Time: O(1)
     */
    findMedian() {
        if (this.maxHeap.size() > this.minHeap.size()) {
            return this.maxHeap.peek();
        } else {
            return (this.maxHeap.peek() + this.minHeap.peek()) / 2.0;
        }
    }
}

Approach 2: Sorted Array with Binary Search (Alternative)

Implementation:

Python:

import bisect

class MedianFinderArray:
    """
    Find median using sorted array
    addNum: O(n), findMedian: O(1)
    Space: O(n)
    """
    
    def __init__(self):
        self.nums = []
    
    def addNum(self, num: int) -> None:
        bisect.insort(self.nums, num)
    
    def findMedian(self) -> float:
        n = len(self.nums)
        if n % 2 == 1:
            return self.nums[n // 2]
        else:
            return (self.nums[n // 2 - 1] + self.nums[n // 2]) / 2.0

Key Insights

  • Two-Heap Strategy: Maintain balance between smaller and larger halves
  • Size Invariant: Max-heap size = min-heap size ± 1
  • Efficient Operations: O(log n) insertion, O(1) median retrieval
  • Balance Maintenance: Critical for correct median calculation

Edge Cases

  1. Single Element: First element becomes median
  2. Two Elements: Median is average of both
  3. All Same Values: Median equals any element
  4. Increasing/Decreasing Sequence: Heaps handle automatically
  5. Large Numbers: Watch for integer overflow in median calculation

Test Cases

def test_median_finder():
    mf = MedianFinder()
    
    # Test case 1: Basic operations
    mf.addNum(1)
    assert mf.findMedian() == 1.0
    
    mf.addNum(2)
    assert mf.findMedian() == 1.5
    
    mf.addNum(3)
    assert mf.findMedian() == 2.0
    
    # Test case 2: Negative numbers
    mf2 = MedianFinder()
    mf2.addNum(-1)
    mf2.addNum(-2)
    assert mf2.findMedian() == -1.5
    
    # Test case 3: Mixed positive and negative
    mf3 = MedianFinder()
    mf3.addNum(-1)
    mf3.addNum(0)
    mf3.addNum(1)
    assert mf3.findMedian() == 0.0
    
    print("All tests passed!")

test_median_finder()

Interview Tips

  1. Recognize Pattern: This is classic two-heap median problem
  2. Balance Strategy: Explain heap size maintenance clearly
  3. Invariant Explanation: Max-heap contains smaller half, min-heap larger half
  4. Alternative Approaches: Mention sorted array, but explain time complexity trade-offs
  5. Follow-up Ready: Be prepared for extensions like kth smallest

Follow-up Questions

  1. Kth Smallest: Design for finding kth smallest element
  2. Range Queries: Find median in a given range
  3. Memory Constraint: What if we can’t store all numbers?
  4. Approximate Median: Allow some error tolerance
  5. Distributed System: How to handle across multiple machines?

Common Mistakes

  1. Wrong Balance: Not maintaining proper heap size invariant
  2. Heap Type Confusion: Using wrong heap types for smaller/larger halves
  3. Integer Overflow: Not handling overflow in median calculation
  4. Empty Check: Not handling edge cases properly
  5. Rebalancing Logic: Incorrect rebalancing after insertions

Concept Explanations

Two-Heap Median Pattern: The key insight is to maintain two heaps where one contains the smaller half of elements (max-heap) and the other contains the larger half (min-heap). The median is always at the top of one or both heaps.

Balance Invariant: We maintain that the max-heap size equals the min-heap size (even total count) or is exactly one larger (odd total count). This ensures the median is always easily accessible.

When to Apply: This pattern is fundamental for any problem involving dynamic median calculation, streaming data analysis, or when you need to efficiently find middle elements in a continuously changing dataset. It’s also the foundation for sliding window median problems.