Column-Oriented Storage
Core Concept
How columnar storage optimizes analytical workloads and compression
Column-Oriented Storage
Overview
Column-oriented storage, also known as columnar storage, organizes data by storing values from each column together, rather than storing entire rows together (row-oriented storage). This fundamental difference in data organization enables significant optimizations for analytical workloads, compression, and query performance.
While traditional OLTP databases use row-oriented storage for transactional workloads, modern analytical systems like Snowflake, Amazon Redshift, Google BigQuery, and Apache Parquet leverage columnar storage to achieve orders of magnitude better performance for analytical queries.
System Architecture Diagram
Row-Oriented vs Column-Oriented Storage
Row-Oriented Storage (Traditional)
Row-oriented storage stores complete rows together, like organizing a filing cabinet where each folder contains all the documents for one person. This approach is optimized for transactional workloads where you typically need to access all columns of a row at once, similar to how you might need all the information about a customer when processing their order.
Example Table Structure: Imagine a sales table with columns for ID, Product, Category, Price, and Date. In row-oriented storage, all the data for each sale is stored together in blocks, making it efficient to retrieve complete records but inefficient to analyze specific columns across many rows.
Column-Oriented Storage
Column-oriented storage stores all values from each column together, like organizing a library by subject where all books about history are in one section, all books about science in another. This approach is optimized for analytical workloads where you typically need to analyze specific columns across many rows, similar to how you might want to calculate the average price of all products.
Example Table Structure: The same sales table would be stored with all ID values together, all Product names together, all Categories together, all Prices together, and all Dates together. This makes it very efficient to calculate statistics like average price or count products by category.
Core Advantages
1. I/O Efficiency for Analytics
Example Query:
SELECT AVG(price) FROM sales WHERE category = 'Phone';
Row-Oriented:
- Must read entire rows to access price and category columns
- Reads: ID, Product, Category, Price, Date (5 columns)
- I/O amplification: 5x
Column-Oriented:
- Only reads category and price columns
- Reads: Category, Price (2 columns)
- I/O reduction: 2.5x improvement
2. Superior Compression
Values in the same column are often similar, enabling better compression:
# Price column example
prices = [999, 999, 1299, 999, 1299, 599, 999, 1299]
# Row storage: mixed data types, poor compression
row_data = "1,iPhone,Phone,999,2023-01-15,2,MacBook,Laptop,1299,2023-01-16..."
# Column storage: homogeneous data, excellent compression
price_column = "999,999,1299,999,1299,599,999,1299"
# Can use run-length encoding: "999×4,1299×2,599×1,999×1,1299×1"
3. Vectorized Processing
Modern CPUs can process arrays of the same type very efficiently:
# Column-oriented allows vectorized operations
import numpy as np
prices = np.array([999, 999, 1299, 999, 1299, 599, 999, 1299])
categories = np.array(['Phone', 'Phone', 'Laptop', 'Phone', 'Laptop', 'Tablet', 'Phone', 'Laptop'])
# Vectorized filtering and aggregation
phone_mask = (categories == 'Phone')
phone_prices = prices[phone_mask] # [999, 999, 999, 999]
avg_phone_price = np.mean(phone_prices) # Single vectorized operation
Columnar Storage Implementation
Basic Column Store Structure
from typing import Dict, List, Any, Optional
import struct
import gzip
from abc import ABC, abstractmethod
class ColumnEncoder(ABC):
@abstractmethod
def encode(self, values: List[Any]) -> bytes:
pass
@abstractmethod
def decode(self, data: bytes) -> List[Any]:
pass
class RunLengthEncoder(ColumnEncoder):
def encode(self, values: List[Any]) -> bytes:
"""Run-length encoding for repeated values"""
if not values:
return b''
encoded = []
current_value = values[0]
count = 1
for i in range(1, len(values)):
if values[i] == current_value:
count += 1
else:
encoded.append((current_value, count))
current_value = values[i]
count = 1
encoded.append((current_value, count))
# Serialize to bytes
result = b''
for value, count in encoded:
value_bytes = str(value).encode('utf-8')
result += struct.pack('I', len(value_bytes)) # Length prefix
result += value_bytes
result += struct.pack('I', count)
return result
def decode(self, data: bytes) -> List[Any]:
"""Decode run-length encoded data"""
result = []
offset = 0
while offset < len(data):
# Read value length
value_len = struct.unpack('I', data[offset:offset+4])[0]
offset += 4
# Read value
value = data[offset:offset+value_len].decode('utf-8')
offset += value_len
# Read count
count = struct.unpack('I', data[offset:offset+4])[0]
offset += 4
# Expand run
result.extend([value] * count)
return result
class DeltaEncoder(ColumnEncoder):
def encode(self, values: List[int]) -> bytes:
"""Delta encoding for sorted integer sequences"""
if not values:
return b''
deltas = [values[0]] # First value as-is
for i in range(1, len(values)):
delta = values[i] - values[i-1]
deltas.append(delta)
# Use variable-length encoding for deltas
result = b''
for delta in deltas:
result += self._encode_varint(delta)
return result
def _encode_varint(self, value: int) -> bytes:
"""Variable-length integer encoding"""
result = []
while value >= 128:
result.append((value & 127) | 128)
value >>= 7
result.append(value)
return bytes(result)
def decode(self, data: bytes) -> List[int]:
"""Decode delta-encoded integers"""
deltas = []
offset = 0
while offset < len(data):
delta, new_offset = self._decode_varint(data, offset)
deltas.append(delta)
offset = new_offset
# Reconstruct original values
if not deltas:
return []
result = [deltas[0]]
for i in range(1, len(deltas)):
result.append(result[-1] + deltas[i])
return result
class ColumnStore:
def __init__(self):
self.columns: Dict[str, 'Column'] = {}
self.row_count = 0
def add_column(self, name: str, encoder: ColumnEncoder):
"""Add a new column with specified encoder"""
self.columns[name] = Column(name, encoder)
def insert_batch(self, data: List[Dict[str, Any]]):
"""Insert batch of rows"""
if not data:
return
# Group data by column
column_data = {}
for col_name in self.columns:
column_data[col_name] = [row.get(col_name) for row in data]
# Store each column
for col_name, values in column_data.items():
if col_name in self.columns:
self.columns[col_name].append_values(values)
self.row_count += len(data)
def select_columns(self, column_names: List[str],
where_clause: Optional[Dict] = None) -> List[Dict]:
"""Select specific columns with optional filtering"""
if not column_names or not any(col in self.columns for col in column_names):
return []
# Get row indices that match where clause
if where_clause:
matching_rows = self._apply_where_clause(where_clause)
else:
matching_rows = list(range(self.row_count))
# Fetch requested columns for matching rows
result = []
column_values = {}
for col_name in column_names:
if col_name in self.columns:
column_values[col_name] = self.columns[col_name].get_values(matching_rows)
# Reconstruct rows
for i in range(len(matching_rows)):
row = {}
for col_name in column_names:
if col_name in column_values:
row[col_name] = column_values[col_name][i]
result.append(row)
return result
def aggregate(self, column: str, operation: str,
where_clause: Optional[Dict] = None) -> float:
"""Perform aggregation on a column"""
if column not in self.columns:
return 0.0
# Get matching row indices
if where_clause:
matching_rows = self._apply_where_clause(where_clause)
else:
matching_rows = list(range(self.row_count))
# Get column values for matching rows
values = self.columns[column].get_values(matching_rows)
numeric_values = [float(v) for v in values if v is not None]
if not numeric_values:
return 0.0
if operation.upper() == 'SUM':
return sum(numeric_values)
elif operation.upper() == 'AVG':
return sum(numeric_values) / len(numeric_values)
elif operation.upper() == 'COUNT':
return len(numeric_values)
elif operation.upper() == 'MIN':
return min(numeric_values)
elif operation.upper() == 'MAX':
return max(numeric_values)
else:
return 0.0
class Column:
def __init__(self, name: str, encoder: ColumnEncoder):
self.name = name
self.encoder = encoder
self.compressed_data = b''
self.values_cache = None
self.dirty = False
def append_values(self, values: List[Any]):
"""Append values to column"""
# Decompress existing data if needed
if self.compressed_data and self.values_cache is None:
self.values_cache = self.encoder.decode(self.compressed_data)
elif self.values_cache is None:
self.values_cache = []
# Append new values
self.values_cache.extend(values)
self.dirty = True
def get_values(self, row_indices: List[int]) -> List[Any]:
"""Get values for specific row indices"""
if self.values_cache is None:
self.values_cache = self.encoder.decode(self.compressed_data)
return [self.values_cache[i] for i in row_indices if i < len(self.values_cache)]
def compress(self):
"""Compress column data"""
if self.dirty and self.values_cache:
self.compressed_data = gzip.compress(self.encoder.encode(self.values_cache))
self.values_cache = None # Clear cache to save memory
self.dirty = False
Advanced Compression Techniques
1. Dictionary Encoding
class DictionaryEncoder(ColumnEncoder):
def encode(self, values: List[str]) -> bytes:
"""Dictionary compression for string columns"""
# Build dictionary
unique_values = list(set(values))
dictionary = {value: idx for idx, value in enumerate(unique_values)}
# Encode values as indices
indices = [dictionary[value] for value in values]
# Serialize dictionary and indices
dict_data = b''.join(f"{value}\n".encode('utf-8') for value in unique_values)
dict_size = struct.pack('I', len(dict_data))
indices_data = b''.join(struct.pack('H', idx) for idx in indices) # 2-byte indices
return dict_size + dict_data + indices_data
def decode(self, data: bytes) -> List[str]:
"""Decode dictionary-compressed data"""
# Read dictionary size
dict_size = struct.unpack('I', data[:4])[0]
# Read dictionary
dict_data = data[4:4+dict_size].decode('utf-8')
dictionary = dict_data.strip().split('\n')
# Read indices
indices_data = data[4+dict_size:]
indices = []
for i in range(0, len(indices_data), 2):
idx = struct.unpack('H', indices_data[i:i+2])[0]
indices.append(idx)
# Reconstruct values
return [dictionary[idx] for idx in indices]
2. Bit Packing
class BitPackedEncoder(ColumnEncoder):
def encode(self, values: List[int]) -> bytes:
"""Bit packing for small integer ranges"""
if not values:
return b''
# Determine bits needed
max_value = max(values)
bits_needed = max_value.bit_length()
# Pack values
packed_data = []
current_byte = 0
bits_in_current_byte = 0
for value in values:
# Add value to current accumulation
current_byte |= (value << bits_in_current_byte)
bits_in_current_byte += bits_needed
# Flush complete bytes
while bits_in_current_byte >= 8:
packed_data.append(current_byte & 0xFF)
current_byte >>= 8
bits_in_current_byte -= 8
# Flush remaining bits
if bits_in_current_byte > 0:
packed_data.append(current_byte & 0xFF)
# Prepend metadata
metadata = struct.pack('II', bits_needed, len(values))
return metadata + bytes(packed_data)
def decode(self, data: bytes) -> List[int]:
"""Decode bit-packed data"""
if len(data) < 8:
return []
# Read metadata
bits_needed, count = struct.unpack('II', data[:8])
packed_data = data[8:]
# Unpack values
values = []
bit_buffer = 0
bits_in_buffer = 0
data_index = 0
mask = (1 << bits_needed) - 1
for _ in range(count):
# Ensure we have enough bits
while bits_in_buffer < bits_needed and data_index < len(packed_data):
bit_buffer |= (packed_data[data_index] << bits_in_buffer)
bits_in_buffer += 8
data_index += 1
# Extract value
value = bit_buffer & mask
values.append(value)
# Remove used bits
bit_buffer >>= bits_needed
bits_in_buffer -= bits_needed
return values
Query Processing Optimizations
Vectorized Execution
import numpy as np
from typing import Union
class VectorizedProcessor:
@staticmethod
def filter_column(column_data: np.ndarray,
condition: str,
value: Union[int, float, str]) -> np.ndarray:
"""Apply vectorized filtering"""
if condition == '=':
return column_data == value
elif condition == '>':
return column_data > value
elif condition == '<':
return column_data < value
elif condition == '>=':
return column_data >= value
elif condition == '<=':
return column_data <= value
elif condition == '!=':
return column_data != value
else:
return np.ones(len(column_data), dtype=bool)
@staticmethod
def aggregate_column(column_data: np.ndarray,
operation: str,
mask: Optional[np.ndarray] = None) -> float:
"""Vectorized aggregation"""
if mask is not None:
column_data = column_data[mask]
if operation.upper() == 'SUM':
return np.sum(column_data)
elif operation.upper() == 'AVG':
return np.mean(column_data)
elif operation.upper() == 'COUNT':
return len(column_data)
elif operation.upper() == 'MIN':
return np.min(column_data)
elif operation.upper() == 'MAX':
return np.max(column_data)
else:
return 0.0
# Example usage
def process_analytical_query():
# Sample data: 1M rows
prices = np.random.normal(1000, 200, 1000000)
categories = np.random.choice(['Phone', 'Laptop', 'Tablet'], 1000000)
dates = np.random.choice(['2023-01', '2023-02', '2023-03'], 1000000)
# Query: Average price of phones in 2023-01
phone_mask = VectorizedProcessor.filter_column(categories, '=', 'Phone')
date_mask = VectorizedProcessor.filter_column(dates, '=', '2023-01')
combined_mask = phone_mask & date_mask
avg_price = VectorizedProcessor.aggregate_column(prices, 'AVG', combined_mask)
return avg_price
Late Materialization
class ColumnStoreQuery:
def __init__(self, column_store: ColumnStore):
self.store = column_store
self.selection_predicates = []
self.projection_columns = []
def where(self, column: str, operator: str, value: Any):
"""Add selection predicate"""
self.selection_predicates.append((column, operator, value))
return self
def select(self, *columns: str):
"""Add projection columns"""
self.projection_columns.extend(columns)
return self
def execute(self) -> List[Dict]:
"""Execute query with late materialization"""
# Step 1: Apply all predicates to get qualifying row IDs
qualifying_rows = None
for column, operator, value in self.selection_predicates:
if column in self.store.columns:
column_mask = self._apply_predicate(column, operator, value)
if qualifying_rows is None:
qualifying_rows = column_mask
else:
# Intersect with previous results
qualifying_rows = [row for row in qualifying_rows if row in column_mask]
if qualifying_rows is None:
qualifying_rows = list(range(self.store.row_count))
# Step 2: Materialize only requested columns for qualifying rows
return self.store.select_columns(self.projection_columns, qualifying_rows)
def _apply_predicate(self, column: str, operator: str, value: Any) -> List[int]:
"""Apply predicate to column and return matching row indices"""
column_obj = self.store.columns[column]
all_values = column_obj.get_values(list(range(self.store.row_count)))
matching_indices = []
for i, col_value in enumerate(all_values):
if self._evaluate_condition(col_value, operator, value):
matching_indices.append(i)
return matching_indices
Performance Characteristics
Compression Ratios
Data Type | Row Store | Column Store | Improvement
-------------------|-----------|--------------|------------
Integers | 4 bytes | 0.8 bytes | 5x
Strings (dict) | 20 bytes | 2 bytes | 10x
Timestamps | 8 bytes | 1.2 bytes | 6.7x
Decimals | 8 bytes | 1.5 bytes | 5.3x
Overall typical compression: 3-10x better than row stores
Query Performance
Analytical Workload Comparison:
Query Type | Row Store | Column Store | Improvement
------------------------|-----------|--------------|------------
Simple aggregation | 2.5s | 0.3s | 8.3x
Filtered aggregation | 4.1s | 0.4s | 10.3x
Multi-column analysis | 6.8s | 0.9s | 7.6x
Group by queries | 5.2s | 0.7s | 7.4x
Real-World Implementations
Apache Parquet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# Write data in Parquet format (columnar)
def write_parquet_example():
data = {
'id': range(1000000),
'price': np.random.normal(1000, 200, 1000000),
'category': np.random.choice(['A', 'B', 'C'], 1000000),
'date': pd.date_range('2023-01-01', periods=1000000, freq='H')
}
df = pd.DataFrame(data)
# Write with optimizations
table = pa.Table.from_pandas(df)
pq.write_table(table, 'sales_data.parquet',
compression='snappy',
use_dictionary=['category'],
row_group_size=50000)
# Read with column pruning and predicate pushdown
def read_parquet_optimized():
# Only read required columns
columns = ['price', 'category']
# Apply predicate pushdown
filters = [('category', '=', 'A')]
df = pd.read_parquet('sales_data.parquet',
columns=columns,
filters=filters)
return df['price'].mean()
Amazon Redshift
-- Redshift uses columnar storage with compression
CREATE TABLE sales (
id INTEGER ENCODE DELTA,
product VARCHAR(100) ENCODE LZO,
category VARCHAR(20) ENCODE BYTEDICT, -- Dictionary compression
price DECIMAL(10,2) ENCODE DELTA32K,
sale_date DATE ENCODE DELTA32K
)
DISTKEY(category) -- Distribution key for parallel processing
SORTKEY(sale_date); -- Sort key for range pruning
ClickHouse
-- ClickHouse MergeTree with columnar storage
CREATE TABLE sales (
id UInt64,
product String,
category LowCardinality(String), -- Dictionary encoding
price Decimal(10, 2),
sale_date Date
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(sale_date) -- Partition pruning
ORDER BY (category, sale_date) -- Primary key for sorting
SETTINGS index_granularity = 8192; -- Index granularity
Trade-offs and Considerations
Advantages
- Analytical performance: 5-100x better for OLAP queries
- Compression: 3-10x better compression ratios
- I/O efficiency: Only read required columns
- Vectorization: CPU-friendly processing
Disadvantages
- OLTP performance: Poor for transactional workloads
- Row reconstruction: Expensive to rebuild full rows
- Update complexity: Modifications require rewriting columns
- Memory usage: May require more memory during processing
When to Use Column Stores
Ideal for:
- Data warehousing and analytics
- OLAP workloads
- Read-heavy scenarios
- Time-series data
- Reporting and business intelligence
Avoid for:
- OLTP systems
- Frequent updates/deletes
- Applications requiring full row access
- Small datasets (overhead not justified)
Column-oriented storage represents a fundamental shift in how we think about data organization, trading OLTP performance for massive gains in analytical processing capabilities.