Column-Oriented Storage

Core Concept

intermediate
25-35 minutes
storage-formatsanalyticscompressiondata-warehousingolapperformance

How columnar storage optimizes analytical workloads and compression

Column-Oriented Storage

Overview

Column-oriented storage, also known as columnar storage, organizes data by storing values from each column together, rather than storing entire rows together (row-oriented storage). This fundamental difference in data organization enables significant optimizations for analytical workloads, compression, and query performance.

While traditional OLTP databases use row-oriented storage for transactional workloads, modern analytical systems like Snowflake, Amazon Redshift, Google BigQuery, and Apache Parquet leverage columnar storage to achieve orders of magnitude better performance for analytical queries.

System Architecture Diagram

Row-Oriented vs Column-Oriented Storage

Row-Oriented Storage (Traditional)

Row-oriented storage stores complete rows together, like organizing a filing cabinet where each folder contains all the documents for one person. This approach is optimized for transactional workloads where you typically need to access all columns of a row at once, similar to how you might need all the information about a customer when processing their order.

Example Table Structure: Imagine a sales table with columns for ID, Product, Category, Price, and Date. In row-oriented storage, all the data for each sale is stored together in blocks, making it efficient to retrieve complete records but inefficient to analyze specific columns across many rows.

Column-Oriented Storage

Column-oriented storage stores all values from each column together, like organizing a library by subject where all books about history are in one section, all books about science in another. This approach is optimized for analytical workloads where you typically need to analyze specific columns across many rows, similar to how you might want to calculate the average price of all products.

Example Table Structure: The same sales table would be stored with all ID values together, all Product names together, all Categories together, all Prices together, and all Dates together. This makes it very efficient to calculate statistics like average price or count products by category.

Core Advantages

1. I/O Efficiency for Analytics

Example Query:

SELECT AVG(price) FROM sales WHERE category = 'Phone';

Row-Oriented:

  • Must read entire rows to access price and category columns
  • Reads: ID, Product, Category, Price, Date (5 columns)
  • I/O amplification: 5x

Column-Oriented:

  • Only reads category and price columns
  • Reads: Category, Price (2 columns)
  • I/O reduction: 2.5x improvement

2. Superior Compression

Values in the same column are often similar, enabling better compression:

# Price column example
prices = [999, 999, 1299, 999, 1299, 599, 999, 1299]

# Row storage: mixed data types, poor compression
row_data = "1,iPhone,Phone,999,2023-01-15,2,MacBook,Laptop,1299,2023-01-16..."

# Column storage: homogeneous data, excellent compression
price_column = "999,999,1299,999,1299,599,999,1299"
# Can use run-length encoding: "999×4,1299×2,599×1,999×1,1299×1"

3. Vectorized Processing

Modern CPUs can process arrays of the same type very efficiently:

# Column-oriented allows vectorized operations
import numpy as np

prices = np.array([999, 999, 1299, 999, 1299, 599, 999, 1299])
categories = np.array(['Phone', 'Phone', 'Laptop', 'Phone', 'Laptop', 'Tablet', 'Phone', 'Laptop'])

# Vectorized filtering and aggregation
phone_mask = (categories == 'Phone')
phone_prices = prices[phone_mask]  # [999, 999, 999, 999]
avg_phone_price = np.mean(phone_prices)  # Single vectorized operation

Columnar Storage Implementation

Basic Column Store Structure

from typing import Dict, List, Any, Optional
import struct
import gzip
from abc import ABC, abstractmethod

class ColumnEncoder(ABC):
    @abstractmethod
    def encode(self, values: List[Any]) -> bytes:
        pass
    
    @abstractmethod
    def decode(self, data: bytes) -> List[Any]:
        pass

class RunLengthEncoder(ColumnEncoder):
    def encode(self, values: List[Any]) -> bytes:
        """Run-length encoding for repeated values"""
        if not values:
            return b''
        
        encoded = []
        current_value = values[0]
        count = 1
        
        for i in range(1, len(values)):
            if values[i] == current_value:
                count += 1
            else:
                encoded.append((current_value, count))
                current_value = values[i]
                count = 1
        
        encoded.append((current_value, count))
        
        # Serialize to bytes
        result = b''
        for value, count in encoded:
            value_bytes = str(value).encode('utf-8')
            result += struct.pack('I', len(value_bytes))  # Length prefix
            result += value_bytes
            result += struct.pack('I', count)
        
        return result
    
    def decode(self, data: bytes) -> List[Any]:
        """Decode run-length encoded data"""
        result = []
        offset = 0
        
        while offset < len(data):
            # Read value length
            value_len = struct.unpack('I', data[offset:offset+4])[0]
            offset += 4
            
            # Read value
            value = data[offset:offset+value_len].decode('utf-8')
            offset += value_len
            
            # Read count
            count = struct.unpack('I', data[offset:offset+4])[0]
            offset += 4
            
            # Expand run
            result.extend([value] * count)
        
        return result

class DeltaEncoder(ColumnEncoder):
    def encode(self, values: List[int]) -> bytes:
        """Delta encoding for sorted integer sequences"""
        if not values:
            return b''
        
        deltas = [values[0]]  # First value as-is
        for i in range(1, len(values)):
            delta = values[i] - values[i-1]
            deltas.append(delta)
        
        # Use variable-length encoding for deltas
        result = b''
        for delta in deltas:
            result += self._encode_varint(delta)
        
        return result
    
    def _encode_varint(self, value: int) -> bytes:
        """Variable-length integer encoding"""
        result = []
        while value >= 128:
            result.append((value & 127) | 128)
            value >>= 7
        result.append(value)
        return bytes(result)
    
    def decode(self, data: bytes) -> List[int]:
        """Decode delta-encoded integers"""
        deltas = []
        offset = 0
        
        while offset < len(data):
            delta, new_offset = self._decode_varint(data, offset)
            deltas.append(delta)
            offset = new_offset
        
        # Reconstruct original values
        if not deltas:
            return []
        
        result = [deltas[0]]
        for i in range(1, len(deltas)):
            result.append(result[-1] + deltas[i])
        
        return result

class ColumnStore:
    def __init__(self):
        self.columns: Dict[str, 'Column'] = {}
        self.row_count = 0
    
    def add_column(self, name: str, encoder: ColumnEncoder):
        """Add a new column with specified encoder"""
        self.columns[name] = Column(name, encoder)
    
    def insert_batch(self, data: List[Dict[str, Any]]):
        """Insert batch of rows"""
        if not data:
            return
        
        # Group data by column
        column_data = {}
        for col_name in self.columns:
            column_data[col_name] = [row.get(col_name) for row in data]
        
        # Store each column
        for col_name, values in column_data.items():
            if col_name in self.columns:
                self.columns[col_name].append_values(values)
        
        self.row_count += len(data)
    
    def select_columns(self, column_names: List[str], 
                      where_clause: Optional[Dict] = None) -> List[Dict]:
        """Select specific columns with optional filtering"""
        if not column_names or not any(col in self.columns for col in column_names):
            return []
        
        # Get row indices that match where clause
        if where_clause:
            matching_rows = self._apply_where_clause(where_clause)
        else:
            matching_rows = list(range(self.row_count))
        
        # Fetch requested columns for matching rows
        result = []
        column_values = {}
        
        for col_name in column_names:
            if col_name in self.columns:
                column_values[col_name] = self.columns[col_name].get_values(matching_rows)
        
        # Reconstruct rows
        for i in range(len(matching_rows)):
            row = {}
            for col_name in column_names:
                if col_name in column_values:
                    row[col_name] = column_values[col_name][i]
            result.append(row)
        
        return result
    
    def aggregate(self, column: str, operation: str, 
                  where_clause: Optional[Dict] = None) -> float:
        """Perform aggregation on a column"""
        if column not in self.columns:
            return 0.0
        
        # Get matching row indices
        if where_clause:
            matching_rows = self._apply_where_clause(where_clause)
        else:
            matching_rows = list(range(self.row_count))
        
        # Get column values for matching rows
        values = self.columns[column].get_values(matching_rows)
        numeric_values = [float(v) for v in values if v is not None]
        
        if not numeric_values:
            return 0.0
        
        if operation.upper() == 'SUM':
            return sum(numeric_values)
        elif operation.upper() == 'AVG':
            return sum(numeric_values) / len(numeric_values)
        elif operation.upper() == 'COUNT':
            return len(numeric_values)
        elif operation.upper() == 'MIN':
            return min(numeric_values)
        elif operation.upper() == 'MAX':
            return max(numeric_values)
        else:
            return 0.0

class Column:
    def __init__(self, name: str, encoder: ColumnEncoder):
        self.name = name
        self.encoder = encoder
        self.compressed_data = b''
        self.values_cache = None
        self.dirty = False
    
    def append_values(self, values: List[Any]):
        """Append values to column"""
        # Decompress existing data if needed
        if self.compressed_data and self.values_cache is None:
            self.values_cache = self.encoder.decode(self.compressed_data)
        elif self.values_cache is None:
            self.values_cache = []
        
        # Append new values
        self.values_cache.extend(values)
        self.dirty = True
    
    def get_values(self, row_indices: List[int]) -> List[Any]:
        """Get values for specific row indices"""
        if self.values_cache is None:
            self.values_cache = self.encoder.decode(self.compressed_data)
        
        return [self.values_cache[i] for i in row_indices if i < len(self.values_cache)]
    
    def compress(self):
        """Compress column data"""
        if self.dirty and self.values_cache:
            self.compressed_data = gzip.compress(self.encoder.encode(self.values_cache))
            self.values_cache = None  # Clear cache to save memory
            self.dirty = False

Advanced Compression Techniques

1. Dictionary Encoding

class DictionaryEncoder(ColumnEncoder):
    def encode(self, values: List[str]) -> bytes:
        """Dictionary compression for string columns"""
        # Build dictionary
        unique_values = list(set(values))
        dictionary = {value: idx for idx, value in enumerate(unique_values)}
        
        # Encode values as indices
        indices = [dictionary[value] for value in values]
        
        # Serialize dictionary and indices
        dict_data = b''.join(f"{value}\n".encode('utf-8') for value in unique_values)
        dict_size = struct.pack('I', len(dict_data))
        
        indices_data = b''.join(struct.pack('H', idx) for idx in indices)  # 2-byte indices
        
        return dict_size + dict_data + indices_data
    
    def decode(self, data: bytes) -> List[str]:
        """Decode dictionary-compressed data"""
        # Read dictionary size
        dict_size = struct.unpack('I', data[:4])[0]
        
        # Read dictionary
        dict_data = data[4:4+dict_size].decode('utf-8')
        dictionary = dict_data.strip().split('\n')
        
        # Read indices
        indices_data = data[4+dict_size:]
        indices = []
        for i in range(0, len(indices_data), 2):
            idx = struct.unpack('H', indices_data[i:i+2])[0]
            indices.append(idx)
        
        # Reconstruct values
        return [dictionary[idx] for idx in indices]

2. Bit Packing

class BitPackedEncoder(ColumnEncoder):
    def encode(self, values: List[int]) -> bytes:
        """Bit packing for small integer ranges"""
        if not values:
            return b''
        
        # Determine bits needed
        max_value = max(values)
        bits_needed = max_value.bit_length()
        
        # Pack values
        packed_data = []
        current_byte = 0
        bits_in_current_byte = 0
        
        for value in values:
            # Add value to current accumulation
            current_byte |= (value << bits_in_current_byte)
            bits_in_current_byte += bits_needed
            
            # Flush complete bytes
            while bits_in_current_byte >= 8:
                packed_data.append(current_byte & 0xFF)
                current_byte >>= 8
                bits_in_current_byte -= 8
        
        # Flush remaining bits
        if bits_in_current_byte > 0:
            packed_data.append(current_byte & 0xFF)
        
        # Prepend metadata
        metadata = struct.pack('II', bits_needed, len(values))
        return metadata + bytes(packed_data)
    
    def decode(self, data: bytes) -> List[int]:
        """Decode bit-packed data"""
        if len(data) < 8:
            return []
        
        # Read metadata
        bits_needed, count = struct.unpack('II', data[:8])
        packed_data = data[8:]
        
        # Unpack values
        values = []
        bit_buffer = 0
        bits_in_buffer = 0
        data_index = 0
        
        mask = (1 << bits_needed) - 1
        
        for _ in range(count):
            # Ensure we have enough bits
            while bits_in_buffer < bits_needed and data_index < len(packed_data):
                bit_buffer |= (packed_data[data_index] << bits_in_buffer)
                bits_in_buffer += 8
                data_index += 1
            
            # Extract value
            value = bit_buffer & mask
            values.append(value)
            
            # Remove used bits
            bit_buffer >>= bits_needed
            bits_in_buffer -= bits_needed
        
        return values

Query Processing Optimizations

Vectorized Execution

import numpy as np
from typing import Union

class VectorizedProcessor:
    @staticmethod
    def filter_column(column_data: np.ndarray, 
                     condition: str, 
                     value: Union[int, float, str]) -> np.ndarray:
        """Apply vectorized filtering"""
        if condition == '=':
            return column_data == value
        elif condition == '>':
            return column_data > value
        elif condition == '<':
            return column_data < value
        elif condition == '>=':
            return column_data >= value
        elif condition == '<=':
            return column_data <= value
        elif condition == '!=':
            return column_data != value
        else:
            return np.ones(len(column_data), dtype=bool)
    
    @staticmethod
    def aggregate_column(column_data: np.ndarray, 
                        operation: str, 
                        mask: Optional[np.ndarray] = None) -> float:
        """Vectorized aggregation"""
        if mask is not None:
            column_data = column_data[mask]
        
        if operation.upper() == 'SUM':
            return np.sum(column_data)
        elif operation.upper() == 'AVG':
            return np.mean(column_data)
        elif operation.upper() == 'COUNT':
            return len(column_data)
        elif operation.upper() == 'MIN':
            return np.min(column_data)
        elif operation.upper() == 'MAX':
            return np.max(column_data)
        else:
            return 0.0

# Example usage
def process_analytical_query():
    # Sample data: 1M rows
    prices = np.random.normal(1000, 200, 1000000)
    categories = np.random.choice(['Phone', 'Laptop', 'Tablet'], 1000000)
    dates = np.random.choice(['2023-01', '2023-02', '2023-03'], 1000000)
    
    # Query: Average price of phones in 2023-01
    phone_mask = VectorizedProcessor.filter_column(categories, '=', 'Phone')
    date_mask = VectorizedProcessor.filter_column(dates, '=', '2023-01')
    combined_mask = phone_mask & date_mask
    
    avg_price = VectorizedProcessor.aggregate_column(prices, 'AVG', combined_mask)
    return avg_price

Late Materialization

class ColumnStoreQuery:
    def __init__(self, column_store: ColumnStore):
        self.store = column_store
        self.selection_predicates = []
        self.projection_columns = []
    
    def where(self, column: str, operator: str, value: Any):
        """Add selection predicate"""
        self.selection_predicates.append((column, operator, value))
        return self
    
    def select(self, *columns: str):
        """Add projection columns"""
        self.projection_columns.extend(columns)
        return self
    
    def execute(self) -> List[Dict]:
        """Execute query with late materialization"""
        # Step 1: Apply all predicates to get qualifying row IDs
        qualifying_rows = None
        
        for column, operator, value in self.selection_predicates:
            if column in self.store.columns:
                column_mask = self._apply_predicate(column, operator, value)
                
                if qualifying_rows is None:
                    qualifying_rows = column_mask
                else:
                    # Intersect with previous results
                    qualifying_rows = [row for row in qualifying_rows if row in column_mask]
        
        if qualifying_rows is None:
            qualifying_rows = list(range(self.store.row_count))
        
        # Step 2: Materialize only requested columns for qualifying rows
        return self.store.select_columns(self.projection_columns, qualifying_rows)
    
    def _apply_predicate(self, column: str, operator: str, value: Any) -> List[int]:
        """Apply predicate to column and return matching row indices"""
        column_obj = self.store.columns[column]
        all_values = column_obj.get_values(list(range(self.store.row_count)))
        
        matching_indices = []
        for i, col_value in enumerate(all_values):
            if self._evaluate_condition(col_value, operator, value):
                matching_indices.append(i)
        
        return matching_indices

Performance Characteristics

Compression Ratios

Data Type          | Row Store | Column Store | Improvement
-------------------|-----------|--------------|------------
Integers          | 4 bytes   | 0.8 bytes    | 5x
Strings (dict)    | 20 bytes  | 2 bytes      | 10x
Timestamps        | 8 bytes   | 1.2 bytes    | 6.7x
Decimals          | 8 bytes   | 1.5 bytes    | 5.3x

Overall typical compression: 3-10x better than row stores

Query Performance

Analytical Workload Comparison:
Query Type              | Row Store | Column Store | Improvement
------------------------|-----------|--------------|------------
Simple aggregation     | 2.5s      | 0.3s         | 8.3x
Filtered aggregation   | 4.1s      | 0.4s         | 10.3x
Multi-column analysis  | 6.8s      | 0.9s         | 7.6x
Group by queries       | 5.2s      | 0.7s         | 7.4x

Real-World Implementations

Apache Parquet

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Write data in Parquet format (columnar)
def write_parquet_example():
    data = {
        'id': range(1000000),
        'price': np.random.normal(1000, 200, 1000000),
        'category': np.random.choice(['A', 'B', 'C'], 1000000),
        'date': pd.date_range('2023-01-01', periods=1000000, freq='H')
    }
    
    df = pd.DataFrame(data)
    
    # Write with optimizations
    table = pa.Table.from_pandas(df)
    pq.write_table(table, 'sales_data.parquet',
                   compression='snappy',
                   use_dictionary=['category'],
                   row_group_size=50000)

# Read with column pruning and predicate pushdown
def read_parquet_optimized():
    # Only read required columns
    columns = ['price', 'category']
    
    # Apply predicate pushdown
    filters = [('category', '=', 'A')]
    
    df = pd.read_parquet('sales_data.parquet',
                        columns=columns,
                        filters=filters)
    
    return df['price'].mean()

Amazon Redshift

-- Redshift uses columnar storage with compression
CREATE TABLE sales (
    id INTEGER ENCODE DELTA,
    product VARCHAR(100) ENCODE LZO,
    category VARCHAR(20) ENCODE BYTEDICT,  -- Dictionary compression
    price DECIMAL(10,2) ENCODE DELTA32K,
    sale_date DATE ENCODE DELTA32K
)
DISTKEY(category)  -- Distribution key for parallel processing
SORTKEY(sale_date); -- Sort key for range pruning

ClickHouse

-- ClickHouse MergeTree with columnar storage
CREATE TABLE sales (
    id UInt64,
    product String,
    category LowCardinality(String),  -- Dictionary encoding
    price Decimal(10, 2),
    sale_date Date
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(sale_date)  -- Partition pruning
ORDER BY (category, sale_date)    -- Primary key for sorting
SETTINGS index_granularity = 8192; -- Index granularity

Trade-offs and Considerations

Advantages

  • Analytical performance: 5-100x better for OLAP queries
  • Compression: 3-10x better compression ratios
  • I/O efficiency: Only read required columns
  • Vectorization: CPU-friendly processing

Disadvantages

  • OLTP performance: Poor for transactional workloads
  • Row reconstruction: Expensive to rebuild full rows
  • Update complexity: Modifications require rewriting columns
  • Memory usage: May require more memory during processing

When to Use Column Stores

Ideal for:

  • Data warehousing and analytics
  • OLAP workloads
  • Read-heavy scenarios
  • Time-series data
  • Reporting and business intelligence

Avoid for:

  • OLTP systems
  • Frequent updates/deletes
  • Applications requiring full row access
  • Small datasets (overhead not justified)

Column-oriented storage represents a fundamental shift in how we think about data organization, trading OLTP performance for massive gains in analytical processing capabilities.

Related Concepts

data-warehousing
compression
olap
parquet
big-data

Used By

snowflakeredshiftbigqueryclickhouseapache-sparkdatabricks