1. Requirements & Scope (5 min)
Functional Requirements
put(key, value)— store a key-value pair (upsert semantics)get(key)— retrieve the value for a given keydelete(key)— remove a key-value pair- Support large value sizes (up to 1MB per value, keys up to 256 bytes)
- Automatic data partitioning across nodes with rebalancing on cluster resize
Non-Functional Requirements
- Availability: 99.99% — the store must remain writable even during node failures and network partitions (AP system by default, tunable toward CP)
- Latency: < 5ms p99 for both reads and writes (single datacenter)
- Consistency: Tunable — eventual consistency by default (R=1, W=1), strong consistency optional (R=W=quorum)
- Scale: 100TB+ total data, 500K+ operations/sec, hundreds of nodes, linear horizontal scaling
- Durability: No data loss for acknowledged writes. Replication factor of 3 across failure domains.
2. Estimation (3 min)
Traffic
- Write QPS: 200K ops/sec
- Read QPS: 300K ops/sec (1.5:1 read-to-write ratio)
- Average value size: 10KB, average key size: 100 bytes
- Write bandwidth: 200K x 10KB = 2 GB/sec
- Read bandwidth: 300K x 10KB = 3 GB/sec
Storage
- 100 billion key-value pairs in steady state
- Average size per pair: 10KB value + 100 bytes key + 200 bytes metadata = ~10.3KB
- Total data: 100B x 10.3KB = 1 PB before replication
- With RF=3: 3 PB total
- Per node (assuming 100 nodes): 30TB per node → 8 x 4TB SSDs each
Memory
- Each node holds ~1B keys
- In-memory index (key → file offset): 100 bytes key + 16 bytes offset = 116 bytes per key
- Index per node: 1B x 116 bytes = 116 GB — too large for RAM
- Solution: Bloom filters + sparse index (see Deep Dive)
3. API Design (3 min)
// Write a key-value pair
PUT /kv/{key}
Headers: {
"X-Consistency": "quorum", // optional: one, quorum, all
"X-TTL": 86400, // optional: TTL in seconds
"If-Match": "<vector_clock>" // optional: conditional write (CAS)
}
Body: <raw bytes>
Response 200: {
"key": "user:123:profile",
"version": {"node1": 3, "node2": 1}, // vector clock
"timestamp": 1708632060000
}
// Read a key
GET /kv/{key}
Headers: {
"X-Consistency": "quorum" // optional: one, quorum, all
}
Response 200: {
"key": "user:123:profile",
"value": "<base64-encoded>",
"version": {"node1": 3, "node2": 1},
"timestamp": 1708632060000
}
// If conflicting versions exist (eventual consistency):
Response 200: {
"key": "user:123:profile",
"values": [
{"value": "...", "version": {"node1": 3}},
{"value": "...", "version": {"node2": 2}}
],
"conflict": true
}
// Delete a key
DELETE /kv/{key}
Response 200: { "deleted": true }
// Internally: tombstone with TTL (not immediate physical delete)
// Scan keys by prefix
GET /kv?prefix=user:123:&limit=100
Response 200: { "pairs": [...], "cursor": "..." }
Key Decisions
- Client receives vector clock on write, passes it back on subsequent writes for conflict detection
- Conflicts surfaced to client (Dynamo-style) rather than hidden behind last-write-wins
- Delete uses tombstones — physical deletion happens during compaction
4. Data Model (3 min)
On-Disk Storage (LSM-Tree per node)
SSTable (Sorted String Table):
┌─────────────────────────────────┐
│ Data Block 1 (4KB, sorted KVs) │
│ Data Block 2 │
│ ... │
│ Data Block N │
│ Index Block (key → block offset)│
│ Bloom Filter Block │
│ Footer (offsets to index/filter)│
└─────────────────────────────────┘
Each KV entry:
key | bytes (up to 256B)
value | bytes (up to 1MB)
timestamp | int64
vector_clock | map<node_id, counter>
tombstone | boolean
ttl_expiry | int64 (0 = no expiry)
In-Memory Structures (per node)
Memtable (Red-Black Tree or Skip List):
- All recent writes, sorted by key
- Flushed to SSTable when size exceeds 64MB
Write-Ahead Log (WAL):
- Sequential append of every write
- Replayed on crash recovery
- Truncated after memtable flush
Bloom Filters:
- One per SSTable, loaded in memory
- 10 bits per key → ~1% false positive rate
- Avoids reading SSTables that definitely don't contain the key
Why LSM-Tree (not B-Tree)
- Write-optimized: All writes are sequential appends (memtable → flush → SSTable). No random I/O on writes.
- Trade-off: Reads may check multiple SSTables (L0, L1, …, Ln). Mitigated by bloom filters, level compaction, and caching.
- Compaction: Background process merges SSTables, removes tombstones, reduces read amplification.
- Space amplification: Leveled compaction keeps it at ~1.1x; size-tiered compaction can be ~2x but has better write throughput.
5. High-Level Design (12 min)
Architecture
Client SDK (consistent hashing + preference list)
│
│ Route to appropriate node(s)
▼
┌──────────────────────────────────────────────┐
│ Distributed KV Cluster │
│ │
│ Node A (tokens: 0-90) │
│ ┌────────────────────────┐ │
│ │ Request Handler │ │
│ │ ├─ Coordinator logic │ │
│ │ ├─ Replication manager │ │
│ │ └─ Read repair │ │
│ │ │ │
│ │ Storage Engine │ │
│ │ ├─ Memtable (64MB) │ │
│ │ ├─ WAL │ │
│ │ ├─ SSTables (L0..Ln) │ │
│ │ └─ Bloom filters │ │
│ └────────────────────────┘ │
│ │
│ Node B (tokens: 91-180) ... Node F │
│ (similar structure) │
│ │
│ Cross-cutting: │
│ ├─ Gossip Protocol (membership, failure) │
│ ├─ Anti-Entropy (Merkle trees) │
│ └─ Hinted Handoff (temporary failed nodes) │
└──────────────────────────────────────────────┘
Write Path
Client
→ SDK hashes key → identifies coordinator node (owns token range)
→ Sends PUT to coordinator
→ Coordinator:
1. Determine preference list: [Node A (primary), Node B (replica), Node C (replica)]
2. Forward write to all 3 nodes in parallel
3. Each node:
a. Append to WAL (fsync for durability)
b. Insert into Memtable
c. Respond to coordinator
4. Coordinator waits for W responses (configurable: W=1, W=2 quorum, W=3 all)
5. Respond to client with vector clock
Read Path
Client
→ SDK hashes key → identifies coordinator
→ Sends GET to coordinator
→ Coordinator:
1. Send read to R nodes from preference list (R=1, R=2 quorum, R=3 all)
2. Each node:
a. Check Memtable → if found, return
b. Check Bloom filters for each SSTable (L0 first, then L1, L2...)
c. If bloom filter says "maybe present" → read SSTable index → read data block
d. Return latest version (by vector clock)
3. Coordinator:
a. Compare versions from R responses
b. If all match → return to client
c. If conflict → return multiple values (client resolves) OR last-write-wins
d. Trigger read repair (send latest version to stale nodes)
Components
- Nodes (100+): Each node runs the storage engine (LSM-tree), handles coordinator logic, participates in gossip, and manages local replication.
- Consistent Hash Ring: 256 virtual nodes per physical node for balanced distribution. Token assignment stored in gossip state.
- Gossip Protocol: Every node pings 3 random nodes every second, exchanging membership state (heartbeats, token assignments, failure suspicions).
- Anti-Entropy Service: Background process comparing Merkle trees between replica nodes to detect and repair divergence.
- Hinted Handoff Store: When a target replica is down, the coordinator stores the write locally as a “hint” and replays it when the node recovers.
- Compaction Manager: Background threads running leveled or size-tiered compaction to merge SSTables and remove dead data.
6. Deep Dives (15 min)
Deep Dive 1: Consistent Hashing and Data Partitioning
The problem: How to distribute data across N nodes so that adding/removing a node only moves ~1/N of the data (not a full reshuffle)?
Basic consistent hashing:
Hash ring: 0 to 2^128 - 1
Each node assigned a position on the ring: hash(node_id)
Each key assigned to the first node clockwise from hash(key)
Adding Node D between A and B:
Before: keys in range (A, B] → Node B
After: keys in range (A, D] → Node D, keys in range (D, B] → Node B
Only keys in (A, D] move — approximately 1/N of total keys
Problem with basic approach: Non-uniform distribution (some nodes get more keys).
Virtual nodes (vnodes):
Each physical node gets V virtual positions on the ring (V = 256)
Node A → {hash("A-0"), hash("A-1"), ..., hash("A-255")}
Benefits:
1. Uniform distribution (law of large numbers)
2. Heterogeneous hardware: powerful node gets 512 vnodes, weak node gets 128
3. When a node fails, its load spreads across ALL other nodes (not just one neighbor)
4. Faster rebalancing: move vnodes one at a time
Cost: Larger routing table (N x V entries), but trivial at 100 nodes x 256 vnodes = 25,600 entries
Preference list (replication):
For key K with primary node A:
Preference list = [A, B, C] (next 3 distinct physical nodes clockwise)
- "Distinct physical" — skip vnodes belonging to same physical node
- Ensures replicas are on different machines (and ideally different racks/AZs)
Deep Dive 2: Conflict Resolution — Vector Clocks and Convergence
The problem: With eventual consistency, concurrent writes to the same key on different nodes create conflicting versions. How do we detect and resolve conflicts?
Vector clocks:
A vector clock is a map: {node_id: counter}
Example:
Client 1 writes key "X" to Node A:
Version: {A: 1}
Client 2 reads version {A: 1}, writes update to Node A:
Version: {A: 2}
Network partition: Client 3 writes key "X" to Node B:
Version: {A: 1, B: 1} (B saw {A:1} via replication before partition)
Partition heals. Node A has {A: 2}, Node B has {A: 1, B: 1}
Are these in causal order? NO.
{A: 2} is NOT >= {A: 1, B: 1} (missing B component)
{A: 1, B: 1} is NOT >= {A: 2} (A component is less)
→ CONFLICT. Both versions are returned to the client.
Resolution strategies:
1. Client-side resolution (Amazon shopping cart):
- Return both versions to client
- Client merges (e.g., union of cart items)
- Client writes merged result with new vector clock
2. Last-Write-Wins (LWW):
- Each version has a wall-clock timestamp
- Highest timestamp wins
- Simple but can lose data (clock skew, concurrent writes)
- Acceptable for caches, session stores — NOT for shopping carts
3. CRDTs (Conflict-free Replicated Data Types):
- Data structures that mathematically converge (G-Counter, OR-Set)
- No conflicts by construction
- Limited to specific data types
Vector clock pruning:
Problem: Vector clocks grow unbounded (one entry per node that ever wrote)
Solution: Truncate entries older than a threshold (e.g., 7 days)
- Risk: loss of causal ordering for ancient writes → false conflicts
- In practice: acceptable because very old concurrent writes are rare
- Amazon Dynamo: truncate when clock exceeds 10 entries
Deep Dive 3: Anti-Entropy, Merkle Trees, and Repair
The problem: After a node was temporarily down (or a network partition healed), how do we efficiently find and repair all divergent keys between two replicas?
Naive approach: Compare all keys between two nodes — O(N) network transfer, unacceptable for billions of keys.
Merkle tree approach:
Each node maintains a Merkle tree per token range:
Level 0 (leaves): hash of each key-value pair
Level 1: hash(child_left + child_right)
...
Root: single hash
Comparison:
Node A and Node B exchange root hashes
If roots match → data is identical, done (O(1))
If roots differ → recurse:
Compare left/right children
Descend only into subtrees that differ
Eventually reach leaves → those are the divergent keys
Cost: O(log N) hash comparisons to find all divergent keys
Transfer: Only the divergent keys are synced — minimal bandwidth
Implementation details:
Merkle tree per token range (not per entire node):
- Allows comparing specific ranges between replicas
- Tree updated incrementally on each write (update leaf, recompute path to root)
- Rebuild tree periodically to handle deletes/compaction
Anti-entropy schedule:
- Each node runs anti-entropy with one random replica every 10 minutes
- Staggered to avoid thundering herd
- Priority: recently-recovered nodes sync first
Hinted Handoff (complementary to anti-entropy):
When Node C is down and a write arrives for its token range:
1. Coordinator writes to Node A (primary) and Node B (replica) — success
2. For Node C: coordinator stores a "hint" locally:
Hint: {target: C, key: "user:456", value: "...", vector_clock: {...}}
3. When Node C comes back online (detected via gossip):
Coordinator replays all hints to Node C
4. Hints are deleted after successful replay (or after TTL if C never recovers)
Limitation: Hints are stored on the coordinator node. If THAT node also fails, hints are lost.
→ Anti-entropy (Merkle trees) is the ultimate safety net.
Read Repair (opportunistic):
On every quorum read:
1. Coordinator reads from R=2 nodes
2. If versions differ:
- Return latest to client
- Asynchronously send latest version to the stale node
3. Cost: essentially free (already doing the read)
4. Benefit: hot keys get repaired quickly, cold keys wait for anti-entropy
7. Extensions (2 min)
- Cross-datacenter replication: Async replication across regions with conflict resolution. Each DC has a full replica of the dataset. Writes are local (low latency) and replicated asynchronously (eventual cross-DC consistency). Use vector clocks with DC-aware conflict resolution.
- Range queries and secondary indexes: Support scan operations with prefix keys (e.g., all keys starting with “user:123:”). Implement local secondary indexes per node (each node indexes its own partition) or global secondary indexes (scatter-gather query across all nodes).
- Adaptive consistency: Automatically switch between eventual and strong consistency based on context. For example, read-your-writes consistency by routing a user’s reads to the same node that handled their write (session affinity via sticky coordinator).
- Hot key mitigation: Detect keys with disproportionate read traffic (e.g., celebrity profile). Add read replicas for hot keys only, or use a client-side cache with short TTL. Alternatively, add a random suffix to the key to spread reads across partitions (trade-off: scatter-gather on read).
- TTL and compaction optimization: Native TTL support with efficient expiration. Tombstones cleaned during compaction. Time-window compaction strategy for TTL workloads: SSTables organized by time window, entire SSTables dropped when all keys have expired.