1. Requirements & Scope (5 min)

Functional Requirements

  1. put(key, value) — store a key-value pair (upsert semantics)
  2. get(key) — retrieve the value for a given key
  3. delete(key) — remove a key-value pair
  4. Support large value sizes (up to 1MB per value, keys up to 256 bytes)
  5. Automatic data partitioning across nodes with rebalancing on cluster resize

Non-Functional Requirements

  • Availability: 99.99% — the store must remain writable even during node failures and network partitions (AP system by default, tunable toward CP)
  • Latency: < 5ms p99 for both reads and writes (single datacenter)
  • Consistency: Tunable — eventual consistency by default (R=1, W=1), strong consistency optional (R=W=quorum)
  • Scale: 100TB+ total data, 500K+ operations/sec, hundreds of nodes, linear horizontal scaling
  • Durability: No data loss for acknowledged writes. Replication factor of 3 across failure domains.

2. Estimation (3 min)

Traffic

  • Write QPS: 200K ops/sec
  • Read QPS: 300K ops/sec (1.5:1 read-to-write ratio)
  • Average value size: 10KB, average key size: 100 bytes
  • Write bandwidth: 200K x 10KB = 2 GB/sec
  • Read bandwidth: 300K x 10KB = 3 GB/sec

Storage

  • 100 billion key-value pairs in steady state
  • Average size per pair: 10KB value + 100 bytes key + 200 bytes metadata = ~10.3KB
  • Total data: 100B x 10.3KB = 1 PB before replication
  • With RF=3: 3 PB total
  • Per node (assuming 100 nodes): 30TB per node → 8 x 4TB SSDs each

Memory

  • Each node holds ~1B keys
  • In-memory index (key → file offset): 100 bytes key + 16 bytes offset = 116 bytes per key
  • Index per node: 1B x 116 bytes = 116 GB — too large for RAM
  • Solution: Bloom filters + sparse index (see Deep Dive)

3. API Design (3 min)

// Write a key-value pair
PUT /kv/{key}
  Headers: {
    "X-Consistency": "quorum",      // optional: one, quorum, all
    "X-TTL": 86400,                 // optional: TTL in seconds
    "If-Match": "<vector_clock>"    // optional: conditional write (CAS)
  }
  Body: <raw bytes>
  Response 200: {
    "key": "user:123:profile",
    "version": {"node1": 3, "node2": 1},  // vector clock
    "timestamp": 1708632060000
  }

// Read a key
GET /kv/{key}
  Headers: {
    "X-Consistency": "quorum"       // optional: one, quorum, all
  }
  Response 200: {
    "key": "user:123:profile",
    "value": "<base64-encoded>",
    "version": {"node1": 3, "node2": 1},
    "timestamp": 1708632060000
  }
  // If conflicting versions exist (eventual consistency):
  Response 200: {
    "key": "user:123:profile",
    "values": [
      {"value": "...", "version": {"node1": 3}},
      {"value": "...", "version": {"node2": 2}}
    ],
    "conflict": true
  }

// Delete a key
DELETE /kv/{key}
  Response 200: { "deleted": true }
  // Internally: tombstone with TTL (not immediate physical delete)

// Scan keys by prefix
GET /kv?prefix=user:123:&limit=100
  Response 200: { "pairs": [...], "cursor": "..." }

Key Decisions

  • Client receives vector clock on write, passes it back on subsequent writes for conflict detection
  • Conflicts surfaced to client (Dynamo-style) rather than hidden behind last-write-wins
  • Delete uses tombstones — physical deletion happens during compaction

4. Data Model (3 min)

On-Disk Storage (LSM-Tree per node)

SSTable (Sorted String Table):
  ┌─────────────────────────────────┐
  │ Data Block 1 (4KB, sorted KVs) │
  │ Data Block 2                    │
  │ ...                             │
  │ Data Block N                    │
  │ Index Block (key → block offset)│
  │ Bloom Filter Block              │
  │ Footer (offsets to index/filter)│
  └─────────────────────────────────┘

Each KV entry:
  key             | bytes (up to 256B)
  value           | bytes (up to 1MB)
  timestamp       | int64
  vector_clock    | map<node_id, counter>
  tombstone       | boolean
  ttl_expiry      | int64 (0 = no expiry)

In-Memory Structures (per node)

Memtable (Red-Black Tree or Skip List):
  - All recent writes, sorted by key
  - Flushed to SSTable when size exceeds 64MB

Write-Ahead Log (WAL):
  - Sequential append of every write
  - Replayed on crash recovery
  - Truncated after memtable flush

Bloom Filters:
  - One per SSTable, loaded in memory
  - 10 bits per key → ~1% false positive rate
  - Avoids reading SSTables that definitely don't contain the key

Why LSM-Tree (not B-Tree)

  • Write-optimized: All writes are sequential appends (memtable → flush → SSTable). No random I/O on writes.
  • Trade-off: Reads may check multiple SSTables (L0, L1, …, Ln). Mitigated by bloom filters, level compaction, and caching.
  • Compaction: Background process merges SSTables, removes tombstones, reduces read amplification.
  • Space amplification: Leveled compaction keeps it at ~1.1x; size-tiered compaction can be ~2x but has better write throughput.

5. High-Level Design (12 min)

Architecture

Client SDK (consistent hashing + preference list)
  │
  │  Route to appropriate node(s)
  ▼
┌──────────────────────────────────────────────┐
│              Distributed KV Cluster           │
│                                               │
│  Node A (tokens: 0-90)                       │
│  ┌────────────────────────┐                  │
│  │ Request Handler        │                  │
│  │ ├─ Coordinator logic   │                  │
│  │ ├─ Replication manager │                  │
│  │ └─ Read repair         │                  │
│  │                        │                  │
│  │ Storage Engine          │                  │
│  │ ├─ Memtable (64MB)     │                  │
│  │ ├─ WAL                  │                  │
│  │ ├─ SSTables (L0..Ln)   │                  │
│  │ └─ Bloom filters       │                  │
│  └────────────────────────┘                  │
│                                               │
│  Node B (tokens: 91-180)   ...  Node F       │
│  (similar structure)                          │
│                                               │
│  Cross-cutting:                               │
│  ├─ Gossip Protocol (membership, failure)    │
│  ├─ Anti-Entropy (Merkle trees)              │
│  └─ Hinted Handoff (temporary failed nodes)  │
└──────────────────────────────────────────────┘

Write Path

Client
  → SDK hashes key → identifies coordinator node (owns token range)
  → Sends PUT to coordinator
  → Coordinator:
    1. Determine preference list: [Node A (primary), Node B (replica), Node C (replica)]
    2. Forward write to all 3 nodes in parallel
    3. Each node:
       a. Append to WAL (fsync for durability)
       b. Insert into Memtable
       c. Respond to coordinator
    4. Coordinator waits for W responses (configurable: W=1, W=2 quorum, W=3 all)
    5. Respond to client with vector clock

Read Path

Client
  → SDK hashes key → identifies coordinator
  → Sends GET to coordinator
  → Coordinator:
    1. Send read to R nodes from preference list (R=1, R=2 quorum, R=3 all)
    2. Each node:
       a. Check Memtable → if found, return
       b. Check Bloom filters for each SSTable (L0 first, then L1, L2...)
       c. If bloom filter says "maybe present" → read SSTable index → read data block
       d. Return latest version (by vector clock)
    3. Coordinator:
       a. Compare versions from R responses
       b. If all match → return to client
       c. If conflict → return multiple values (client resolves) OR last-write-wins
       d. Trigger read repair (send latest version to stale nodes)

Components

  1. Nodes (100+): Each node runs the storage engine (LSM-tree), handles coordinator logic, participates in gossip, and manages local replication.
  2. Consistent Hash Ring: 256 virtual nodes per physical node for balanced distribution. Token assignment stored in gossip state.
  3. Gossip Protocol: Every node pings 3 random nodes every second, exchanging membership state (heartbeats, token assignments, failure suspicions).
  4. Anti-Entropy Service: Background process comparing Merkle trees between replica nodes to detect and repair divergence.
  5. Hinted Handoff Store: When a target replica is down, the coordinator stores the write locally as a “hint” and replays it when the node recovers.
  6. Compaction Manager: Background threads running leveled or size-tiered compaction to merge SSTables and remove dead data.

6. Deep Dives (15 min)

Deep Dive 1: Consistent Hashing and Data Partitioning

The problem: How to distribute data across N nodes so that adding/removing a node only moves ~1/N of the data (not a full reshuffle)?

Basic consistent hashing:

Hash ring: 0 to 2^128 - 1
Each node assigned a position on the ring: hash(node_id)
Each key assigned to the first node clockwise from hash(key)

Adding Node D between A and B:
  Before: keys in range (A, B] → Node B
  After:  keys in range (A, D] → Node D, keys in range (D, B] → Node B
  Only keys in (A, D] move — approximately 1/N of total keys

Problem with basic approach: Non-uniform distribution (some nodes get more keys).

Virtual nodes (vnodes):

Each physical node gets V virtual positions on the ring (V = 256)
Node A → {hash("A-0"), hash("A-1"), ..., hash("A-255")}

Benefits:
1. Uniform distribution (law of large numbers)
2. Heterogeneous hardware: powerful node gets 512 vnodes, weak node gets 128
3. When a node fails, its load spreads across ALL other nodes (not just one neighbor)
4. Faster rebalancing: move vnodes one at a time

Cost: Larger routing table (N x V entries), but trivial at 100 nodes x 256 vnodes = 25,600 entries

Preference list (replication):

For key K with primary node A:
  Preference list = [A, B, C] (next 3 distinct physical nodes clockwise)
  - "Distinct physical" — skip vnodes belonging to same physical node
  - Ensures replicas are on different machines (and ideally different racks/AZs)

Deep Dive 2: Conflict Resolution — Vector Clocks and Convergence

The problem: With eventual consistency, concurrent writes to the same key on different nodes create conflicting versions. How do we detect and resolve conflicts?

Vector clocks:

A vector clock is a map: {node_id: counter}

Example:
  Client 1 writes key "X" to Node A:
    Version: {A: 1}

  Client 2 reads version {A: 1}, writes update to Node A:
    Version: {A: 2}

  Network partition: Client 3 writes key "X" to Node B:
    Version: {A: 1, B: 1}  (B saw {A:1} via replication before partition)

  Partition heals. Node A has {A: 2}, Node B has {A: 1, B: 1}
  Are these in causal order? NO.
    {A: 2} is NOT >= {A: 1, B: 1} (missing B component)
    {A: 1, B: 1} is NOT >= {A: 2} (A component is less)
  → CONFLICT. Both versions are returned to the client.

Resolution strategies:

1. Client-side resolution (Amazon shopping cart):
   - Return both versions to client
   - Client merges (e.g., union of cart items)
   - Client writes merged result with new vector clock

2. Last-Write-Wins (LWW):
   - Each version has a wall-clock timestamp
   - Highest timestamp wins
   - Simple but can lose data (clock skew, concurrent writes)
   - Acceptable for caches, session stores — NOT for shopping carts

3. CRDTs (Conflict-free Replicated Data Types):
   - Data structures that mathematically converge (G-Counter, OR-Set)
   - No conflicts by construction
   - Limited to specific data types

Vector clock pruning:

Problem: Vector clocks grow unbounded (one entry per node that ever wrote)
Solution: Truncate entries older than a threshold (e.g., 7 days)
  - Risk: loss of causal ordering for ancient writes → false conflicts
  - In practice: acceptable because very old concurrent writes are rare
  - Amazon Dynamo: truncate when clock exceeds 10 entries

Deep Dive 3: Anti-Entropy, Merkle Trees, and Repair

The problem: After a node was temporarily down (or a network partition healed), how do we efficiently find and repair all divergent keys between two replicas?

Naive approach: Compare all keys between two nodes — O(N) network transfer, unacceptable for billions of keys.

Merkle tree approach:

Each node maintains a Merkle tree per token range:

Level 0 (leaves): hash of each key-value pair
Level 1: hash(child_left + child_right)
...
Root: single hash

Comparison:
  Node A and Node B exchange root hashes
  If roots match → data is identical, done (O(1))
  If roots differ → recurse:
    Compare left/right children
    Descend only into subtrees that differ
    Eventually reach leaves → those are the divergent keys

Cost: O(log N) hash comparisons to find all divergent keys
Transfer: Only the divergent keys are synced — minimal bandwidth

Implementation details:

Merkle tree per token range (not per entire node):
  - Allows comparing specific ranges between replicas
  - Tree updated incrementally on each write (update leaf, recompute path to root)
  - Rebuild tree periodically to handle deletes/compaction

Anti-entropy schedule:
  - Each node runs anti-entropy with one random replica every 10 minutes
  - Staggered to avoid thundering herd
  - Priority: recently-recovered nodes sync first

Hinted Handoff (complementary to anti-entropy):

When Node C is down and a write arrives for its token range:
  1. Coordinator writes to Node A (primary) and Node B (replica) — success
  2. For Node C: coordinator stores a "hint" locally:
     Hint: {target: C, key: "user:456", value: "...", vector_clock: {...}}
  3. When Node C comes back online (detected via gossip):
     Coordinator replays all hints to Node C
  4. Hints are deleted after successful replay (or after TTL if C never recovers)

Limitation: Hints are stored on the coordinator node. If THAT node also fails, hints are lost.
→ Anti-entropy (Merkle trees) is the ultimate safety net.

Read Repair (opportunistic):

On every quorum read:
  1. Coordinator reads from R=2 nodes
  2. If versions differ:
     - Return latest to client
     - Asynchronously send latest version to the stale node
  3. Cost: essentially free (already doing the read)
  4. Benefit: hot keys get repaired quickly, cold keys wait for anti-entropy

7. Extensions (2 min)

  • Cross-datacenter replication: Async replication across regions with conflict resolution. Each DC has a full replica of the dataset. Writes are local (low latency) and replicated asynchronously (eventual cross-DC consistency). Use vector clocks with DC-aware conflict resolution.
  • Range queries and secondary indexes: Support scan operations with prefix keys (e.g., all keys starting with “user:123:”). Implement local secondary indexes per node (each node indexes its own partition) or global secondary indexes (scatter-gather query across all nodes).
  • Adaptive consistency: Automatically switch between eventual and strong consistency based on context. For example, read-your-writes consistency by routing a user’s reads to the same node that handled their write (session affinity via sticky coordinator).
  • Hot key mitigation: Detect keys with disproportionate read traffic (e.g., celebrity profile). Add read replicas for hot keys only, or use a client-side cache with short TTL. Alternatively, add a random suffix to the key to spread reads across partitions (trade-off: scatter-gather on read).
  • TTL and compaction optimization: Native TTL support with efficient expiration. Tombstones cleaned during compaction. Time-window compaction strategy for TTL workloads: SSTables organized by time window, entire SSTables dropped when all keys have expired.