1. Requirements & Scope (5 min)

Functional Requirements

  1. Sort a dataset that is significantly larger than available RAM (e.g., 10TB of data with 64GB RAM)
  2. Support configurable sort keys (single column, composite keys, ascending/descending)
  3. Produce a single sorted output file (or set of sorted partitions for distributed consumers)
  4. Support pluggable input/output formats (CSV, Parquet, binary records)
  5. Provide progress reporting and the ability to resume after failures (checkpointing)

Non-Functional Requirements

  • Availability: Batch system — not always-on, but must complete within an SLA (e.g., 10TB sorted within 4 hours)
  • Latency: Optimize for total wall-clock time, not per-record latency. I/O throughput is the bottleneck.
  • Consistency: Output must be perfectly sorted. No approximate or lossy sorting.
  • Scale: Handle datasets from 1GB to 100TB. Single-machine for up to ~5TB, distributed (MapReduce-style) beyond that.
  • Durability: Intermediate state (sorted chunks) persisted to disk. If the process crashes, restart from the last completed chunk, not from scratch.

2. Estimation (3 min)

Single Machine Scenario

  • Dataset: 1TB, RAM: 64GB (usable for sort: ~50GB after OS and buffers)
  • Record size: 100 bytes average, sort key: 20 bytes
  • Total records: 1TB / 100B = 10 billion records
  • Number of sorted chunks: 1TB / 50GB = 20 chunks
  • Each chunk: 50GB, ~500M records, sorted in-memory using quicksort/timsort

I/O Analysis

  • Disk sequential read/write: 500 MB/s (NVMe SSD) or 200 MB/s (HDD)
  • Phase 1 (chunk creation): Read 1TB + write 1TB of sorted chunks = 2TB I/O
    • SSD: 2TB / 500 MB/s = ~67 minutes
    • HDD: 2TB / 200 MB/s = ~167 minutes
  • Phase 2 (K-way merge): Read 1TB of sorted chunks + write 1TB final output = 2TB I/O
    • Same time as Phase 1
  • Total: ~2.2 hours SSD, ~5.5 hours HDD (I/O dominated)

Distributed Scenario (MapReduce)

  • Dataset: 100TB across HDFS
  • 1000 mappers, each sorts 100GB locally
  • Shuffle phase: each reducer pulls its key range from all mappers
  • 100 reducers, each merge-sorts 1TB from 1000 sorted inputs
  • Network bandwidth: 10Gbps per node → 100TB shuffle in ~2.2 hours with pipelining

Key Insight

External sort is I/O bound, not CPU bound. Every optimization must target reducing I/O passes, maximizing sequential I/O, and minimizing random access. The CPU cost of comparisons is negligible compared to disk read/write time.


3. API Design (3 min)

Single-Machine API

// Sort a file on disk
POST /sort
  Body: {
    "input_path": "/data/unsorted/events.parquet",
    "output_path": "/data/sorted/events_sorted.parquet",
    "sort_keys": [
      { "column": "timestamp", "order": "asc" },
      { "column": "user_id", "order": "asc" }
    ],
    "memory_limit_gb": 50,
    "temp_dir": "/scratch/sort_temp/",
    "input_format": "parquet",
    "output_format": "parquet",
    "compression": "snappy",
    "parallelism": 8                   // concurrent I/O threads
  }
  Response 202: {
    "job_id": "sort-abc123",
    "status": "running",
    "estimated_completion": "2024-02-22T16:30:00Z"
  }

// Check progress
GET /sort/sort-abc123
  Response 200: {
    "status": "merging",
    "phase": "merge",
    "chunks_created": 20,
    "chunks_merged": 12,
    "progress_pct": 78.5,
    "bytes_processed": "850GB",
    "elapsed_seconds": 5400
  }

Distributed API (MapReduce-style)

POST /distributed-sort
  Body: {
    "input_path": "hdfs:///data/unsorted/",
    "output_path": "hdfs:///data/sorted/",
    "sort_keys": [ { "column": "timestamp", "order": "asc" } ],
    "num_partitions": 100,            // output partitions (range-partitioned)
    "sample_rate": 0.01,              // for partition boundary estimation
    "cluster": "sort-cluster-01"
  }

Key Decisions

  • Temp directory must be on fast local storage (NVMe), not network-attached
  • Compression during sort reduces I/O at the cost of CPU — almost always worth it (Snappy gives 2-4x compression at 500MB/s throughput)
  • Parallelism for overlapping I/O with CPU: while one chunk is being read, the previous one is being sorted

4. Data Model (3 min)

Chunk Metadata (tracked in memory + checkpoint file)

Table: sort_chunks (stored as JSON checkpoint on disk)
  chunk_id            | int             -- sequential chunk number
  status              | enum            -- 'created', 'sorted', 'merged'
  file_path           | string          -- /scratch/sort_temp/chunk_007.bin
  record_count        | int64
  byte_size           | int64
  min_key             | bytes           -- minimum sort key in chunk
  max_key             | bytes           -- maximum sort key in chunk
  checksum            | uint32          -- CRC32 for integrity verification

Sort Run State (for checkpointing)

{
  "job_id": "sort-abc123",
  "phase": "merge",                    // "chunk_creation" or "merge"
  "input_bytes_read": 850000000000,
  "chunks": [ ... ],                   // list of chunk metadata
  "merge_state": {
    "output_bytes_written": 420000000000,
    "active_merge_level": 2,
    "completed_merges": [ [0,1,2,3], [4,5,6,7] ]
  }
}

On-Disk Format for Chunks

  • Binary format: [4-byte key length][key bytes][4-byte value length][value bytes] repeated
  • Or: columnar (Parquet) for each chunk — better compression, supports column pruning
  • Chunks are individually compressed (Snappy or LZ4) in 64KB blocks for random access within a chunk during merge

Why These Choices

  • No database needed — this is a batch processing system. All state is file-based.
  • Checkpoint file enables crash recovery. On restart, read checkpoint, skip completed chunks, resume.
  • Min/max keys per chunk enable optimization during merge (can skip chunks that don’t overlap with the current merge range).

5. High-Level Design (12 min)

Two-Phase External Merge Sort

Phase 1: Chunk Creation (Sort Phase)
═══════════════════════════════════════
Input File (1TB on disk)
  → Read 50GB into RAM
  → Sort in-memory (quicksort on sort key)
  → Write sorted chunk to temp dir
  → Repeat until entire input is consumed
  → Result: 20 sorted chunks, each 50GB

Phase 2: K-Way Merge
═══════════════════════════════════════
20 sorted chunks on disk
  → Open all 20 files simultaneously
  → Read buffer: allocate 50GB / 20 = 2.5GB per chunk
  → Min-heap of size 20 (one entry per chunk, keyed by sort key)
  → Repeatedly:
       1. Pop minimum from heap → write to output buffer
       2. Read next record from that chunk → push to heap
       3. When output buffer full → flush to output file
  → Result: single sorted output file (1TB)

Optimized Pipeline with I/O Overlap

                    ┌─────────────────┐
                    │   Input File    │
                    └────────┬────────┘
                             │ sequential read
                    ┌────────▼────────┐
                    │  Read Buffer    │ (double-buffered)
                    │  (2 × 25GB)    │
                    └────────┬────────┘
                             │
              ┌──────────────▼──────────────┐
              │    In-Memory Sort (CPU)      │
              │    quicksort / radix sort    │
              └──────────────┬──────────────┘
                             │
                    ┌────────▼────────┐
                    │  Write Buffer   │ (double-buffered)
                    │  (2 × 25GB)    │
                    └────────┬────────┘
                             │ sequential write
                    ┌────────▼────────┐
                    │  Sorted Chunks  │
                    │  (temp dir)     │
                    └─────────────────┘

Double buffering: while buffer A is being sorted, buffer B is reading the next 25GB from disk. CPU and I/O work in parallel.

Multi-Level Merge (when K is too large)

If chunks = 200 and RAM = 50GB:
  Buffer per chunk = 50GB / 200 = 256MB → too small for efficient sequential I/O

Solution: Multi-level merge
  Level 1: Merge chunks [0..19] → temp_merge_0
           Merge chunks [20..39] → temp_merge_1
           ... (10 merges of 20 chunks each)
  Level 2: Merge temp_merge_0..9 → final output

Rule of thumb: K ≤ RAM / (4 × block_size)
  With 50GB RAM and 4MB blocks: K ≤ 50GB / 16MB ≈ 3200
  So for most cases, single-pass merge works fine.

Distributed External Sort (MapReduce)

Step 1: Sample (determine partition boundaries)
  → Read 1% of input → sort → pick 99 evenly-spaced keys
  → These 99 keys define 100 key ranges (partitions)

Step 2: Map Phase (1000 mappers)
  → Each mapper reads its input split (~100GB)
  → Sorts locally
  → Partitions output into 100 buckets by key range
  → Writes 100 files to distributed storage

Step 3: Shuffle
  → Each reducer pulls its partition from all 1000 mappers
  → Reducer 0 gets all records with key < boundary[0]
  → Reducer 50 gets records with boundary[49] ≤ key < boundary[50]

Step 4: Reduce Phase (100 reducers)
  → Each reducer does K-way merge of 1000 sorted inputs
  → Writes sorted partition to output
  → Final output: 100 sorted partitions (globally sorted across partitions)

Components

  1. Chunk Creator: Reads input in RAM-sized pieces, sorts in-memory, writes to temp. Handles double-buffering and compression.
  2. K-Way Merger: Opens all sorted chunks, uses min-heap for merge, manages read/write buffers. Supports multi-level merge if needed.
  3. Checkpoint Manager: Writes progress to checkpoint file after each chunk/merge completion. Enables crash recovery.
  4. I/O Scheduler: Manages read-ahead and write-behind buffers. Uses direct I/O to bypass OS page cache (we manage our own buffers).
  5. Partition Sampler (distributed): Samples input to determine range partition boundaries. Uses reservoir sampling for uniform distribution.

6. Deep Dives (15 min)

Deep Dive 1: Optimizing I/O — The Real Bottleneck

Problem: External sort is I/O bound. A naive implementation does 4 passes over the data (read → write chunks → read chunks → write output). Can we do better?

Optimization 1: Replacement Selection (generate longer runs) Instead of reading exactly RAM-sized chunks and sorting:

1. Fill RAM with records (50GB)
2. Build a min-heap
3. While input is not exhausted:
   a. Pop minimum record → write to current run
   b. Read next record from input
   c. If new record ≥ last written record:
        Push to heap (same run)
      Else:
        Mark as "next run" (don't include in current heap)
   d. When heap is empty of current-run records, start new run
4. On average, this produces runs of size 2 × RAM = 100GB
   → 10 chunks instead of 20 → simpler merge

Expected run length is 2× RAM for random input, potentially much longer for partially sorted data.

Optimization 2: Minimize I/O passes

If runs = R and we can merge K-way in one pass:
  Passes = 1 (create runs) + ceil(log_K(R)) (merge levels)

With 50GB RAM, 1TB input:
  Replacement selection: R = 10 runs of ~100GB
  K = 10 (each gets 5GB buffer — plenty for sequential I/O)
  Merge passes = ceil(log_10(10)) = 1
  Total: 2 passes (create + merge) = 4× I/O = 4TB transferred

Without replacement selection: R = 20
  K = 20, merge passes = 1
  Total: still 2 passes, but 20-way merge has smaller buffers

Optimization 3: Compression

Snappy compression ratio: ~2.5× for typical data
  Without compression: 4TB of I/O at 500MB/s = 8000 seconds
  With compression: 4TB / 2.5 = 1.6TB of I/O at 500MB/s = 3200 seconds
  Compression CPU cost: ~500MB/s (Snappy) — matches disk speed
  Net effect: ~2× faster (I/O reduced, CPU is "free" since it's idle during I/O)

LZ4 is even faster (compression speed ~800MB/s) with slightly lower ratio.
Use LZ4 for temp chunks, Snappy or Zstd for final output.

Optimization 4: Direct I/O + aligned buffers

  • Bypass OS page cache (O_DIRECT flag) — we manage our own buffers and don’t want the OS caching data we’ll never re-read
  • Align buffers to 4KB page boundaries
  • Use large sequential reads (4-16MB) to maximize disk throughput
  • On NVMe SSDs, use io_uring for asynchronous I/O with zero syscall overhead

Deep Dive 2: Handling Skewed Data in Distributed Sort

The problem: In MapReduce-style distributed sort, partition boundaries are determined by sampling. If the key distribution is skewed (e.g., 50% of records have the same country code), some reducers get overwhelmed while others sit idle.

Example skew scenario:

Sorting 100TB of user events by country:
  US: 40% of records = 40TB
  Ideal: 100 reducers × 1TB each
  With naive partitioning: Reducer for "US" gets 40TB, others get 600GB

Solution 1: Histogram-based partitioning

1. Sample phase: build a histogram of key frequencies (not just boundaries)
   - Sample 0.1% of data → 100M records
   - Build frequency histogram with 10,000 buckets
2. Partition assignment: assign buckets to reducers to equalize data volume
   - If "US" spans 40% of data, assign it across 40 reducers
   - Sub-partition by a secondary key (user_id hash) within "US"
3. Result: each reducer gets ~1TB regardless of key distribution

Solution 2: Dynamic splitting

During shuffle phase, monitor data received per reducer.
If reducer R has received 2× the average:
  1. Split R's key range in half
  2. Spawn a new reducer for the upper half
  3. Redirect future shuffle data to the new reducer
  4. Reducer R re-partitions data already received

Solution 3: Salted keys (for extreme skew)

For known hot keys (e.g., country = "US"):
  Mapper appends random salt: key = "US_3" (salt ∈ [0,9])
  → Distributes "US" records across 10 reducers
  Each reducer sorts its portion
  Final step: concatenate the 10 sorted "US" partitions (already globally sorted by salt)
  Consumer must know to strip the salt when reading

Recommendation: Histogram-based partitioning handles most skew without complicating the output format. Dynamic splitting is good for unpredictable skew. Salted keys are a last resort.

Deep Dive 3: K-Way Merge with Min-Heap — Implementation Details

The merge is the most critical operation. A poorly implemented merge bottlenecks the entire sort.

Min-Heap approach:

struct HeapEntry {
  sort_key: bytes,
  record: bytes,
  chunk_index: int       // which chunk this came from
}

heap = MinHeap of size K (one entry per chunk)

// Initialize: read first record from each chunk, push to heap
for i in 0..K:
  record = read_next(chunks[i])
  heap.push(HeapEntry { key: record.sort_key, record, chunk_index: i })

// Merge loop
while heap is not empty:
  entry = heap.pop()
  output_buffer.write(entry.record)

  next_record = read_next(chunks[entry.chunk_index])
  if next_record is not None:
    heap.push(HeapEntry { key: next_record.sort_key, record: next_record, chunk_index: entry.chunk_index })

  if output_buffer.is_full():
    flush(output_buffer)

Optimizations:

  1. Tournament tree (loser tree) instead of binary heap:

    • Binary heap: 2 × log₂(K) comparisons per pop+push
    • Loser tree: log₂(K) comparisons per replacement
    • For K=100: loser tree does 7 comparisons instead of 14
  2. Buffered reads per chunk:

    Each chunk has a read buffer of size B = RAM / (K + 1)
    (reserve one buffer for output)
    When chunk buffer is empty, refill with next B bytes from disk
    This converts random I/O into sequential I/O
    
  3. Prefetching:

    When chunk buffer is 25% full, trigger async read for next block
    Overlaps I/O with merge computation
    With K=20 chunks, at any time ~5 chunks are being prefetched
    
  4. Key-prefix optimization:

    Store first 8 bytes of sort key directly in heap entry
    Compare these 8 bytes first (single 64-bit integer comparison)
    Only dereference full key if prefix matches
    Reduces cache misses dramatically
    

7. Extensions (2 min)

  • External merge join: Two sorted files can be joined in a single pass (merge join). Sort both inputs, then scan in parallel matching keys. This is how most DBMS implement sort-merge joins.
  • Columnar sort: Instead of sorting full records, sort only the (key, record_pointer) pairs. After sorting, rewrite records in sorted order using the pointers. Reduces memory pressure when records are large but keys are small.
  • Incremental sort (online): New data arrives continuously. Maintain sorted runs and periodically merge. Use a tiered compaction strategy (like LSM trees) — merge small runs into larger ones.
  • GPU-accelerated sort: For in-memory chunks, use GPU radix sort (e.g., CUB library). GPUs can sort 1 billion 32-bit keys in ~100ms. Useful when the bottleneck shifts from I/O to CPU (e.g., complex comparison functions).
  • Verified sort: For critical applications (financial data), produce a proof that the output is a permutation of the input and is sorted. Use checksums (XOR of all record hashes must match before and after) plus sortedness verification in a single pass.