1. Requirements & Scope (5 min)
Functional Requirements
- Sort a dataset that is significantly larger than available RAM (e.g., 10TB of data with 64GB RAM)
- Support configurable sort keys (single column, composite keys, ascending/descending)
- Produce a single sorted output file (or set of sorted partitions for distributed consumers)
- Support pluggable input/output formats (CSV, Parquet, binary records)
- Provide progress reporting and the ability to resume after failures (checkpointing)
Non-Functional Requirements
- Availability: Batch system — not always-on, but must complete within an SLA (e.g., 10TB sorted within 4 hours)
- Latency: Optimize for total wall-clock time, not per-record latency. I/O throughput is the bottleneck.
- Consistency: Output must be perfectly sorted. No approximate or lossy sorting.
- Scale: Handle datasets from 1GB to 100TB. Single-machine for up to ~5TB, distributed (MapReduce-style) beyond that.
- Durability: Intermediate state (sorted chunks) persisted to disk. If the process crashes, restart from the last completed chunk, not from scratch.
2. Estimation (3 min)
Single Machine Scenario
- Dataset: 1TB, RAM: 64GB (usable for sort: ~50GB after OS and buffers)
- Record size: 100 bytes average, sort key: 20 bytes
- Total records: 1TB / 100B = 10 billion records
- Number of sorted chunks: 1TB / 50GB = 20 chunks
- Each chunk: 50GB, ~500M records, sorted in-memory using quicksort/timsort
I/O Analysis
- Disk sequential read/write: 500 MB/s (NVMe SSD) or 200 MB/s (HDD)
- Phase 1 (chunk creation): Read 1TB + write 1TB of sorted chunks = 2TB I/O
- SSD: 2TB / 500 MB/s = ~67 minutes
- HDD: 2TB / 200 MB/s = ~167 minutes
- Phase 2 (K-way merge): Read 1TB of sorted chunks + write 1TB final output = 2TB I/O
- Same time as Phase 1
- Total: ~2.2 hours SSD, ~5.5 hours HDD (I/O dominated)
Distributed Scenario (MapReduce)
- Dataset: 100TB across HDFS
- 1000 mappers, each sorts 100GB locally
- Shuffle phase: each reducer pulls its key range from all mappers
- 100 reducers, each merge-sorts 1TB from 1000 sorted inputs
- Network bandwidth: 10Gbps per node → 100TB shuffle in ~2.2 hours with pipelining
Key Insight
External sort is I/O bound, not CPU bound. Every optimization must target reducing I/O passes, maximizing sequential I/O, and minimizing random access. The CPU cost of comparisons is negligible compared to disk read/write time.
3. API Design (3 min)
Single-Machine API
// Sort a file on disk
POST /sort
Body: {
"input_path": "/data/unsorted/events.parquet",
"output_path": "/data/sorted/events_sorted.parquet",
"sort_keys": [
{ "column": "timestamp", "order": "asc" },
{ "column": "user_id", "order": "asc" }
],
"memory_limit_gb": 50,
"temp_dir": "/scratch/sort_temp/",
"input_format": "parquet",
"output_format": "parquet",
"compression": "snappy",
"parallelism": 8 // concurrent I/O threads
}
Response 202: {
"job_id": "sort-abc123",
"status": "running",
"estimated_completion": "2024-02-22T16:30:00Z"
}
// Check progress
GET /sort/sort-abc123
Response 200: {
"status": "merging",
"phase": "merge",
"chunks_created": 20,
"chunks_merged": 12,
"progress_pct": 78.5,
"bytes_processed": "850GB",
"elapsed_seconds": 5400
}
Distributed API (MapReduce-style)
POST /distributed-sort
Body: {
"input_path": "hdfs:///data/unsorted/",
"output_path": "hdfs:///data/sorted/",
"sort_keys": [ { "column": "timestamp", "order": "asc" } ],
"num_partitions": 100, // output partitions (range-partitioned)
"sample_rate": 0.01, // for partition boundary estimation
"cluster": "sort-cluster-01"
}
Key Decisions
- Temp directory must be on fast local storage (NVMe), not network-attached
- Compression during sort reduces I/O at the cost of CPU — almost always worth it (Snappy gives 2-4x compression at 500MB/s throughput)
- Parallelism for overlapping I/O with CPU: while one chunk is being read, the previous one is being sorted
4. Data Model (3 min)
Chunk Metadata (tracked in memory + checkpoint file)
Table: sort_chunks (stored as JSON checkpoint on disk)
chunk_id | int -- sequential chunk number
status | enum -- 'created', 'sorted', 'merged'
file_path | string -- /scratch/sort_temp/chunk_007.bin
record_count | int64
byte_size | int64
min_key | bytes -- minimum sort key in chunk
max_key | bytes -- maximum sort key in chunk
checksum | uint32 -- CRC32 for integrity verification
Sort Run State (for checkpointing)
{
"job_id": "sort-abc123",
"phase": "merge", // "chunk_creation" or "merge"
"input_bytes_read": 850000000000,
"chunks": [ ... ], // list of chunk metadata
"merge_state": {
"output_bytes_written": 420000000000,
"active_merge_level": 2,
"completed_merges": [ [0,1,2,3], [4,5,6,7] ]
}
}
On-Disk Format for Chunks
- Binary format:
[4-byte key length][key bytes][4-byte value length][value bytes]repeated - Or: columnar (Parquet) for each chunk — better compression, supports column pruning
- Chunks are individually compressed (Snappy or LZ4) in 64KB blocks for random access within a chunk during merge
Why These Choices
- No database needed — this is a batch processing system. All state is file-based.
- Checkpoint file enables crash recovery. On restart, read checkpoint, skip completed chunks, resume.
- Min/max keys per chunk enable optimization during merge (can skip chunks that don’t overlap with the current merge range).
5. High-Level Design (12 min)
Two-Phase External Merge Sort
Phase 1: Chunk Creation (Sort Phase)
═══════════════════════════════════════
Input File (1TB on disk)
→ Read 50GB into RAM
→ Sort in-memory (quicksort on sort key)
→ Write sorted chunk to temp dir
→ Repeat until entire input is consumed
→ Result: 20 sorted chunks, each 50GB
Phase 2: K-Way Merge
═══════════════════════════════════════
20 sorted chunks on disk
→ Open all 20 files simultaneously
→ Read buffer: allocate 50GB / 20 = 2.5GB per chunk
→ Min-heap of size 20 (one entry per chunk, keyed by sort key)
→ Repeatedly:
1. Pop minimum from heap → write to output buffer
2. Read next record from that chunk → push to heap
3. When output buffer full → flush to output file
→ Result: single sorted output file (1TB)
Optimized Pipeline with I/O Overlap
┌─────────────────┐
│ Input File │
└────────┬────────┘
│ sequential read
┌────────▼────────┐
│ Read Buffer │ (double-buffered)
│ (2 × 25GB) │
└────────┬────────┘
│
┌──────────────▼──────────────┐
│ In-Memory Sort (CPU) │
│ quicksort / radix sort │
└──────────────┬──────────────┘
│
┌────────▼────────┐
│ Write Buffer │ (double-buffered)
│ (2 × 25GB) │
└────────┬────────┘
│ sequential write
┌────────▼────────┐
│ Sorted Chunks │
│ (temp dir) │
└─────────────────┘
Double buffering: while buffer A is being sorted, buffer B is reading the next 25GB from disk. CPU and I/O work in parallel.
Multi-Level Merge (when K is too large)
If chunks = 200 and RAM = 50GB:
Buffer per chunk = 50GB / 200 = 256MB → too small for efficient sequential I/O
Solution: Multi-level merge
Level 1: Merge chunks [0..19] → temp_merge_0
Merge chunks [20..39] → temp_merge_1
... (10 merges of 20 chunks each)
Level 2: Merge temp_merge_0..9 → final output
Rule of thumb: K ≤ RAM / (4 × block_size)
With 50GB RAM and 4MB blocks: K ≤ 50GB / 16MB ≈ 3200
So for most cases, single-pass merge works fine.
Distributed External Sort (MapReduce)
Step 1: Sample (determine partition boundaries)
→ Read 1% of input → sort → pick 99 evenly-spaced keys
→ These 99 keys define 100 key ranges (partitions)
Step 2: Map Phase (1000 mappers)
→ Each mapper reads its input split (~100GB)
→ Sorts locally
→ Partitions output into 100 buckets by key range
→ Writes 100 files to distributed storage
Step 3: Shuffle
→ Each reducer pulls its partition from all 1000 mappers
→ Reducer 0 gets all records with key < boundary[0]
→ Reducer 50 gets records with boundary[49] ≤ key < boundary[50]
Step 4: Reduce Phase (100 reducers)
→ Each reducer does K-way merge of 1000 sorted inputs
→ Writes sorted partition to output
→ Final output: 100 sorted partitions (globally sorted across partitions)
Components
- Chunk Creator: Reads input in RAM-sized pieces, sorts in-memory, writes to temp. Handles double-buffering and compression.
- K-Way Merger: Opens all sorted chunks, uses min-heap for merge, manages read/write buffers. Supports multi-level merge if needed.
- Checkpoint Manager: Writes progress to checkpoint file after each chunk/merge completion. Enables crash recovery.
- I/O Scheduler: Manages read-ahead and write-behind buffers. Uses direct I/O to bypass OS page cache (we manage our own buffers).
- Partition Sampler (distributed): Samples input to determine range partition boundaries. Uses reservoir sampling for uniform distribution.
6. Deep Dives (15 min)
Deep Dive 1: Optimizing I/O — The Real Bottleneck
Problem: External sort is I/O bound. A naive implementation does 4 passes over the data (read → write chunks → read chunks → write output). Can we do better?
Optimization 1: Replacement Selection (generate longer runs) Instead of reading exactly RAM-sized chunks and sorting:
1. Fill RAM with records (50GB)
2. Build a min-heap
3. While input is not exhausted:
a. Pop minimum record → write to current run
b. Read next record from input
c. If new record ≥ last written record:
Push to heap (same run)
Else:
Mark as "next run" (don't include in current heap)
d. When heap is empty of current-run records, start new run
4. On average, this produces runs of size 2 × RAM = 100GB
→ 10 chunks instead of 20 → simpler merge
Expected run length is 2× RAM for random input, potentially much longer for partially sorted data.
Optimization 2: Minimize I/O passes
If runs = R and we can merge K-way in one pass:
Passes = 1 (create runs) + ceil(log_K(R)) (merge levels)
With 50GB RAM, 1TB input:
Replacement selection: R = 10 runs of ~100GB
K = 10 (each gets 5GB buffer — plenty for sequential I/O)
Merge passes = ceil(log_10(10)) = 1
Total: 2 passes (create + merge) = 4× I/O = 4TB transferred
Without replacement selection: R = 20
K = 20, merge passes = 1
Total: still 2 passes, but 20-way merge has smaller buffers
Optimization 3: Compression
Snappy compression ratio: ~2.5× for typical data
Without compression: 4TB of I/O at 500MB/s = 8000 seconds
With compression: 4TB / 2.5 = 1.6TB of I/O at 500MB/s = 3200 seconds
Compression CPU cost: ~500MB/s (Snappy) — matches disk speed
Net effect: ~2× faster (I/O reduced, CPU is "free" since it's idle during I/O)
LZ4 is even faster (compression speed ~800MB/s) with slightly lower ratio.
Use LZ4 for temp chunks, Snappy or Zstd for final output.
Optimization 4: Direct I/O + aligned buffers
- Bypass OS page cache (
O_DIRECTflag) — we manage our own buffers and don’t want the OS caching data we’ll never re-read - Align buffers to 4KB page boundaries
- Use large sequential reads (4-16MB) to maximize disk throughput
- On NVMe SSDs, use
io_uringfor asynchronous I/O with zero syscall overhead
Deep Dive 2: Handling Skewed Data in Distributed Sort
The problem: In MapReduce-style distributed sort, partition boundaries are determined by sampling. If the key distribution is skewed (e.g., 50% of records have the same country code), some reducers get overwhelmed while others sit idle.
Example skew scenario:
Sorting 100TB of user events by country:
US: 40% of records = 40TB
Ideal: 100 reducers × 1TB each
With naive partitioning: Reducer for "US" gets 40TB, others get 600GB
Solution 1: Histogram-based partitioning
1. Sample phase: build a histogram of key frequencies (not just boundaries)
- Sample 0.1% of data → 100M records
- Build frequency histogram with 10,000 buckets
2. Partition assignment: assign buckets to reducers to equalize data volume
- If "US" spans 40% of data, assign it across 40 reducers
- Sub-partition by a secondary key (user_id hash) within "US"
3. Result: each reducer gets ~1TB regardless of key distribution
Solution 2: Dynamic splitting
During shuffle phase, monitor data received per reducer.
If reducer R has received 2× the average:
1. Split R's key range in half
2. Spawn a new reducer for the upper half
3. Redirect future shuffle data to the new reducer
4. Reducer R re-partitions data already received
Solution 3: Salted keys (for extreme skew)
For known hot keys (e.g., country = "US"):
Mapper appends random salt: key = "US_3" (salt ∈ [0,9])
→ Distributes "US" records across 10 reducers
Each reducer sorts its portion
Final step: concatenate the 10 sorted "US" partitions (already globally sorted by salt)
Consumer must know to strip the salt when reading
Recommendation: Histogram-based partitioning handles most skew without complicating the output format. Dynamic splitting is good for unpredictable skew. Salted keys are a last resort.
Deep Dive 3: K-Way Merge with Min-Heap — Implementation Details
The merge is the most critical operation. A poorly implemented merge bottlenecks the entire sort.
Min-Heap approach:
struct HeapEntry {
sort_key: bytes,
record: bytes,
chunk_index: int // which chunk this came from
}
heap = MinHeap of size K (one entry per chunk)
// Initialize: read first record from each chunk, push to heap
for i in 0..K:
record = read_next(chunks[i])
heap.push(HeapEntry { key: record.sort_key, record, chunk_index: i })
// Merge loop
while heap is not empty:
entry = heap.pop()
output_buffer.write(entry.record)
next_record = read_next(chunks[entry.chunk_index])
if next_record is not None:
heap.push(HeapEntry { key: next_record.sort_key, record: next_record, chunk_index: entry.chunk_index })
if output_buffer.is_full():
flush(output_buffer)
Optimizations:
-
Tournament tree (loser tree) instead of binary heap:
- Binary heap: 2 × log₂(K) comparisons per pop+push
- Loser tree: log₂(K) comparisons per replacement
- For K=100: loser tree does 7 comparisons instead of 14
-
Buffered reads per chunk:
Each chunk has a read buffer of size B = RAM / (K + 1) (reserve one buffer for output) When chunk buffer is empty, refill with next B bytes from disk This converts random I/O into sequential I/O -
Prefetching:
When chunk buffer is 25% full, trigger async read for next block Overlaps I/O with merge computation With K=20 chunks, at any time ~5 chunks are being prefetched -
Key-prefix optimization:
Store first 8 bytes of sort key directly in heap entry Compare these 8 bytes first (single 64-bit integer comparison) Only dereference full key if prefix matches Reduces cache misses dramatically
7. Extensions (2 min)
- External merge join: Two sorted files can be joined in a single pass (merge join). Sort both inputs, then scan in parallel matching keys. This is how most DBMS implement sort-merge joins.
- Columnar sort: Instead of sorting full records, sort only the (key, record_pointer) pairs. After sorting, rewrite records in sorted order using the pointers. Reduces memory pressure when records are large but keys are small.
- Incremental sort (online): New data arrives continuously. Maintain sorted runs and periodically merge. Use a tiered compaction strategy (like LSM trees) — merge small runs into larger ones.
- GPU-accelerated sort: For in-memory chunks, use GPU radix sort (e.g., CUB library). GPUs can sort 1 billion 32-bit keys in ~100ms. Useful when the bottleneck shifts from I/O to CPU (e.g., complex comparison functions).
- Verified sort: For critical applications (financial data), produce a proof that the output is a permutation of the input and is sorted. Use checksums (XOR of all record hashes must match before and after) plus sortedness verification in a single pass.