Inverted Indexes
Core Concept
Full-text search and information retrieval using inverted indexes
Inverted Indexes
Overview
An inverted index is a data structure that maps each unique term in a document collection to a list of documents containing that term. It's the fundamental building block behind modern search engines and full-text search systems, enabling fast text retrieval across massive document collections.
Originally developed for library cataloging systems in the 1950s, inverted indexes became the cornerstone of web search engines like Google, Altavista, and modern systems like Elasticsearch and Solr. The "inverted" name comes from inverting the normal document → terms relationship to create a terms → documents mapping.
Core Architecture
Basic Structure
An inverted index consists of two main components:
1. Dictionary (Term Dictionary)
- Contains all unique terms in the collection
- Usually implemented as a sorted array or B-tree for fast lookups
- Each term points to its posting list
2. Posting Lists
- For each term, maintains a list of documents containing that term
- Often includes additional metadata like term frequency, positions
Term Dictionary:
"algorithm" → [doc1, doc5, doc12, doc23]
"database" → [doc3, doc7, doc12, doc18, doc25]
"search" → [doc1, doc3, doc8, doc15, doc20]
Advanced Posting List Structure
Modern inverted indexes store rich metadata in posting lists:
// Basic posting list entry
{
docId: 12,
termFreq: 3, // How many times term appears in document
positions: [45, 127, 203] // Character positions in document
}
// With additional metadata
{
docId: 12,
termFreq: 3,
positions: [45, 127, 203],
fieldId: "title", // Which field contains the term
proximity: 0.8, // Distance from other query terms
boost: 1.5 // Field-specific boost factor
}
Implementation Approaches
1. In-Memory Implementation
class InvertedIndex:
def __init__(self):
self.index = {} # term -> posting list
self.documents = {} # docId -> document content
def add_document(self, doc_id, content):
"""Add document to index"""
self.documents[doc_id] = content
terms = self.tokenize(content)
for position, term in enumerate(terms):
if term not in self.index:
self.index[term] = []
# Find or create posting for this document
posting = self.find_posting(term, doc_id)
if posting:
posting['positions'].append(position)
posting['termFreq'] += 1
else:
self.index[term].append({
'docId': doc_id,
'termFreq': 1,
'positions': [position]
})
def search(self, query_terms):
"""Search for documents containing query terms"""
if not query_terms:
return []
# Get posting lists for all query terms
posting_lists = []
for term in query_terms:
if term in self.index:
posting_lists.append(self.index[term])
else:
return [] # Term not found
# Intersect posting lists to find common documents
return self.intersect_posting_lists(posting_lists)
def intersect_posting_lists(self, lists):
"""Find documents present in all posting lists"""
if not lists:
return []
result = set(p['docId'] for p in lists[0])
for posting_list in lists[1:]:
doc_ids = set(p['docId'] for p in posting_list)
result = result.intersection(doc_ids)
return list(result)
2. Disk-Based Implementation
For large collections, indexes are stored on disk with careful optimization:
class DiskBasedIndex:
def __init__(self, index_path):
self.index_path = index_path
self.term_dict = self.load_dictionary()
def search_term(self, term):
"""Load posting list from disk"""
if term not in self.term_dict:
return []
offset, length = self.term_dict[term]
with open(f"{self.index_path}/postings.dat", 'rb') as f:
f.seek(offset)
compressed_data = f.read(length)
return self.decompress_posting_list(compressed_data)
def compress_posting_list(self, postings):
"""Compress posting list using delta encoding"""
if not postings:
return b''
# Sort by document ID
postings.sort(key=lambda x: x['docId'])
# Delta encode document IDs
compressed = []
prev_doc_id = 0
for posting in postings:
delta = posting['docId'] - prev_doc_id
compressed.append(self.encode_varint(delta))
compressed.append(self.encode_varint(posting['termFreq']))
prev_doc_id = posting['docId']
return b''.join(compressed)
Search Query Processing
Boolean Queries
class BooleanQueryProcessor:
def __init__(self, index):
self.index = index
def process_and_query(self, terms):
"""Process AND query: all terms must be present"""
posting_lists = [self.index.get_posting_list(term) for term in terms]
return self.intersect_all(posting_lists)
def process_or_query(self, terms):
"""Process OR query: any term can be present"""
all_docs = set()
for term in terms:
postings = self.index.get_posting_list(term)
all_docs.update(p['docId'] for p in postings)
return list(all_docs)
def process_phrase_query(self, phrase_terms):
"""Process phrase query: terms must appear in sequence"""
if len(phrase_terms) < 2:
return self.process_and_query(phrase_terms)
# Get documents containing all terms
candidate_docs = self.process_and_query(phrase_terms)
# Check for phrase matches
phrase_matches = []
for doc_id in candidate_docs:
if self.has_phrase_match(doc_id, phrase_terms):
phrase_matches.append(doc_id)
return phrase_matches
def has_phrase_match(self, doc_id, terms):
"""Check if document contains terms as a phrase"""
term_positions = {}
# Get positions for each term in the document
for term in terms:
postings = self.index.get_posting_list(term)
for posting in postings:
if posting['docId'] == doc_id:
term_positions[term] = posting['positions']
break
# Check if terms appear consecutively
first_term_positions = term_positions[terms[0]]
for start_pos in first_term_positions:
is_phrase_match = True
for i, term in enumerate(terms[1:], 1):
expected_pos = start_pos + i
if expected_pos not in term_positions[term]:
is_phrase_match = False
break
if is_phrase_match:
return True
return False
Ranked Retrieval
import math
class RankedRetrieval:
def __init__(self, index, doc_count):
self.index = index
self.doc_count = doc_count
def tf_idf_search(self, query_terms, max_results=10):
"""Search using TF-IDF scoring"""
doc_scores = {}
for term in query_terms:
postings = self.index.get_posting_list(term)
if not postings:
continue
# Calculate IDF (Inverse Document Frequency)
df = len(postings) # Document frequency
idf = math.log(self.doc_count / df)
for posting in postings:
doc_id = posting['docId']
tf = posting['termFreq']
# TF-IDF score for this term in this document
score = (1 + math.log(tf)) * idf
if doc_id in doc_scores:
doc_scores[doc_id] += score
else:
doc_scores[doc_id] = score
# Sort by score and return top results
ranked_results = sorted(doc_scores.items(),
key=lambda x: x[1], reverse=True)
return ranked_results[:max_results]
def bm25_search(self, query_terms, k1=1.5, b=0.75):
"""BM25 ranking algorithm"""
doc_scores = {}
avg_doc_length = self.calculate_avg_doc_length()
for term in query_terms:
postings = self.index.get_posting_list(term)
if not postings:
continue
df = len(postings)
idf = math.log((self.doc_count - df + 0.5) / (df + 0.5))
for posting in postings:
doc_id = posting['docId']
tf = posting['termFreq']
doc_length = self.get_doc_length(doc_id)
# BM25 formula
score = idf * (tf * (k1 + 1)) / (
tf + k1 * (1 - b + b * (doc_length / avg_doc_length))
)
if doc_id in doc_scores:
doc_scores[doc_id] += score
else:
doc_scores[doc_id] = score
return sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
Index Construction
Batch Construction
class IndexBuilder:
def __init__(self, memory_limit=1000000): # 1MB chunks
self.memory_limit = memory_limit
self.temp_indexes = []
def build_index(self, document_stream):
"""Build index from large document collection"""
current_index = {}
current_memory = 0
for doc_id, content in document_stream:
terms = self.tokenize_and_process(content)
for term, postings in terms.items():
if term not in current_index:
current_index[term] = []
current_index[term].extend(postings)
current_memory += len(term) + len(postings) * 32 # Estimate
# Write to disk when memory limit reached
if current_memory > self.memory_limit:
temp_file = self.write_temp_index(current_index)
self.temp_indexes.append(temp_file)
current_index = {}
current_memory = 0
# Handle remaining data
if current_index:
temp_file = self.write_temp_index(current_index)
self.temp_indexes.append(temp_file)
# Merge all temporary indexes
return self.merge_temp_indexes()
def merge_temp_indexes(self):
"""Merge multiple sorted index files"""
merged_index = {}
# Open all temp files
file_handles = [open(f, 'r') for f in self.temp_indexes]
current_terms = [self.read_next_term(f) for f in file_handles]
while any(term for term in current_terms if term is not None):
# Find minimum term lexicographically
min_term = min(term for term in current_terms if term is not None)
# Merge posting lists for this term from all files
merged_postings = []
for i, term in enumerate(current_terms):
if term == min_term:
postings = self.read_posting_list(file_handles[i])
merged_postings.extend(postings)
current_terms[i] = self.read_next_term(file_handles[i])
# Sort and deduplicate merged postings
merged_postings.sort(key=lambda x: x['docId'])
merged_index[min_term] = self.merge_duplicate_postings(merged_postings)
# Close files and cleanup
for f in file_handles:
f.close()
return merged_index
Performance Optimizations
1. Compression Techniques
Variable Byte Encoding:
def encode_varint(value):
"""Variable-length integer encoding"""
result = []
while value >= 128:
result.append((value & 127) | 128)
value >>= 7
result.append(value)
return bytes(result)
def decode_varint(data, offset):
"""Decode variable-length integer"""
result = 0
shift = 0
i = offset
while i < len(data):
byte_val = data[i]
result |= (byte_val & 127) << shift
if (byte_val & 128) == 0:
break
shift += 7
i += 1
return result, i + 1
Delta Compression:
def compress_doc_ids(doc_ids):
"""Delta compress sorted document IDs"""
if not doc_ids:
return []
compressed = [doc_ids[0]] # First ID as-is
for i in range(1, len(doc_ids)):
delta = doc_ids[i] - doc_ids[i-1]
compressed.append(delta)
return compressed
2. Skip Lists for Fast Intersection
class SkipList:
def __init__(self, postings, skip_distance=10):
self.postings = postings
self.skip_distance = skip_distance
self.skip_pointers = self.build_skip_pointers()
def build_skip_pointers(self):
"""Build skip pointers for faster intersection"""
skip_pointers = {}
for i in range(0, len(self.postings), self.skip_distance):
skip_pointers[i] = min(i + self.skip_distance, len(self.postings) - 1)
return skip_pointers
def intersect(self, other_skip_list):
"""Fast intersection using skip pointers"""
result = []
i = j = 0
while i < len(self.postings) and j < len(other_skip_list.postings):
doc1 = self.postings[i]['docId']
doc2 = other_skip_list.postings[j]['docId']
if doc1 == doc2:
result.append(self.postings[i])
i += 1
j += 1
elif doc1 < doc2:
# Use skip pointer if possible
if i in self.skip_pointers and self.postings[self.skip_pointers[i]]['docId'] <= doc2:
i = self.skip_pointers[i]
else:
i += 1
else:
# Use skip pointer if possible
if j in other_skip_list.skip_pointers and other_skip_list.postings[other_skip_list.skip_pointers[j]]['docId'] <= doc1:
j = other_skip_list.skip_pointers[j]
else:
j += 1
return result
Real-World Applications
Elasticsearch Implementation
Elasticsearch uses inverted indexes with several optimizations:
# Elasticsearch index mapping
{
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"content": {
"type": "text",
"analyzer": "english"
}
}
},
"settings": {
"analysis": {
"analyzer": {
"custom_analyzer": {
"tokenizer": "standard",
"filter": ["lowercase", "stop", "snowball"]
}
}
}
}
}
Database Full-Text Search
PostgreSQL's full-text search uses inverted indexes:
-- Create GIN index for full-text search
CREATE INDEX idx_documents_content ON documents
USING GIN (to_tsvector('english', content));
-- Search query
SELECT title, ts_rank_cd(to_tsvector('english', content), query) AS rank
FROM documents, plainto_tsquery('database indexing') query
WHERE to_tsvector('english', content) @@ query
ORDER BY rank DESC;
Performance Characteristics
Time Complexity:
- Index construction: O(N × M) where N = documents, M = avg terms per document
- Term lookup: O(log V) where V = vocabulary size
- Boolean AND query: O(k₁ + k₂ + ... + kₙ) where kᵢ = posting list size
- Phrase query: O(k × P) where P = average positions per term
Space Complexity:
- Dictionary: O(V) where V = unique terms
- Posting lists: O(N × M) total postings
- With compression: 30-50% reduction typically
Benchmark Results:
Collection: 1M documents, 100K unique terms
Index size: 2.5GB uncompressed, 1.2GB compressed
Single term lookup: 0.1ms
Two-term AND query: 1.5ms
Complex phrase query: 15ms
Index construction time: 45 minutes
Trade-offs and Considerations
Advantages
- Fast text search: Sub-millisecond term lookups
- Flexible queries: Boolean, phrase, proximity searches
- Scalable: Handles millions of documents efficiently
- Space efficient: With compression techniques
Disadvantages
- Storage overhead: 50-200% of original document size
- Update complexity: Adding/removing documents requires index maintenance
- Memory requirements: Dictionary must fit in memory for good performance
- Phrase queries: More expensive than simple term queries
Design Decisions
Memory vs Disk Trade-offs:
- In-memory: Faster but limited by RAM
- Disk-based: Scales better but requires careful optimization
- Hybrid: Hot terms in memory, cold terms on disk
Update Strategies:
- Batch updates: Rebuild portions periodically
- Incremental updates: Maintain delta indexes
- Log-structured: Append-only with periodic compaction
Best Practices
- Text Processing Pipeline:
- Proper tokenization (handle punctuation, Unicode)
- Normalization (lowercase, stemming)
- Stop word removal for common terms
- Index Design:
- Choose appropriate compression for your access patterns
- Use skip lists for frequent intersection operations
- Consider field-specific indexes for structured data
- Query Optimization:
- Process most selective terms first
- Cache frequent queries
- Use query expansion for better recall
- Monitoring and Maintenance:
- Track index fragmentation
- Monitor query performance
- Regular index optimization
Inverted indexes are the foundation of modern information retrieval, enabling fast, flexible search across massive document collections while maintaining reasonable storage requirements.