1. Requirements & Scope (5 min)

Functional Requirements

  1. Producers publish messages to named topics with optional partitioning keys
  2. Consumers subscribe to topics and read messages in order within each partition
  3. Support consumer groups — each message is delivered to exactly one consumer within a group (load balancing), but to all groups (broadcast)
  4. Messages are durably persisted for a configurable retention period (default 7 days)
  5. Support message replay — consumers can seek to any offset and re-consume

Non-Functional Requirements

  • Availability: 99.99% — the messaging backbone cannot go down without cascading failures across the entire platform
  • Latency: < 10ms end-to-end for p99 publish-to-consume (single datacenter). Throughput is prioritized over single-message latency.
  • Consistency: Messages within a partition are strictly ordered. Across partitions, no ordering guarantee. At-least-once delivery by default; exactly-once semantics available with idempotent producers + transactional consumers.
  • Scale: 1M+ messages/sec ingestion, 10TB+ daily throughput, 1000+ topics, 10,000+ partitions across the cluster
  • Durability: Zero message loss for acknowledged writes. Replication factor of 3 minimum for production topics.

2. Estimation (3 min)

Traffic

  • Write throughput: 1M messages/sec
  • Average message size: 1KB
  • Write bandwidth: 1M x 1KB = 1 GB/sec ingress
  • Replication factor 3: 1 GB/sec x 3 = 3 GB/sec internal replication traffic
  • Read throughput: Assume 5 consumer groups on average → 5 GB/sec egress
  • Total network: ~9 GB/sec across the cluster

Storage

  • Daily ingestion: 1 GB/sec x 86,400 = 86.4 TB/day (before replication)
  • With replication factor 3: 259 TB/day
  • 7-day retention: 1.8 PB raw storage
  • Each broker: 12 x 4TB SSDs = 48TB per broker → need ~38 brokers for storage alone
  • In practice: 50-60 brokers (headroom for CPU, network, rebalancing)

Partitions

  • 1000 topics x average 10 partitions = 10,000 partitions
  • Each partition is an append-only log segment on disk
  • Partition leader handles all reads/writes → must balance leaders evenly across brokers

3. API Design (3 min)

Producer API

// Publish a message
POST /topics/{topic}/messages
  Body: {
    "key": "user_123",              // optional partition key
    "value": "<base64-encoded payload>",
    "headers": {"trace_id": "abc"}, // optional metadata
    "partition": null                // null = use key hash; int = explicit partition
  }
  Response 200: {
    "topic": "user-events",
    "partition": 7,
    "offset": 48291034,
    "timestamp": 1708632060000
  }

// Batch publish (preferred for throughput)
POST /topics/{topic}/messages/batch
  Body: { "messages": [ ... ] }
  Response 200: { "offsets": [ ... ] }

Consumer API

// Subscribe to topic(s) with a consumer group
POST /consumers/{group_id}/subscribe
  Body: { "topics": ["user-events", "page-views"] }

// Poll for messages
GET /consumers/{group_id}/poll?max_records=500&timeout_ms=1000
  Response 200: {
    "messages": [
      {"topic": "user-events", "partition": 7, "offset": 48291034, "key": "user_123", "value": "...", "timestamp": ...},
      ...
    ]
  }

// Commit offsets (acknowledge processing)
POST /consumers/{group_id}/offsets
  Body: { "offsets": {"user-events": {"7": 48291035}} }

// Seek to specific offset (replay)
POST /consumers/{group_id}/seek
  Body: { "topic": "user-events", "partition": 7, "offset": 48000000 }

Key Decisions

  • Batch publishing is the primary path — amortizes network round trips
  • Consumer pull model (not push) — consumers control their own pace, natural backpressure
  • Offsets are committed by consumers, not auto-acked — enables at-least-once and exactly-once semantics

4. Data Model (3 min)

Message (on-disk log format)

Record:
  offset          | int64       -- monotonically increasing per partition
  timestamp       | int64       -- producer-set or broker-set
  key             | bytes       -- nullable, used for partitioning
  value           | bytes       -- the payload
  headers         | map<str,str>-- metadata
  crc32           | int32       -- integrity check
  attributes      | int8        -- compression codec, timestamp type
  batch_offset    | int32       -- offset within producer batch (for idempotency)
  producer_id     | int64       -- for exactly-once (idempotent producer)
  producer_epoch  | int16       -- for exactly-once (fencing)
  sequence_num    | int32       -- for exactly-once (dedup)

Topic Metadata (ZooKeeper/KRaft)

Topic:
  topic_name      | string (PK)
  num_partitions  | int
  replication_factor | int
  config_overrides   | map       -- retention.ms, segment.bytes, etc.

Partition Assignment:
  topic_name      | string
  partition_id    | int
  leader_broker   | int
  isr             | list<int>   -- in-sync replicas
  replicas        | list<int>   -- all assigned replicas

Consumer Group Offsets

__consumer_offsets (internal compacted topic):
  Key: {group_id, topic, partition}
  Value: {committed_offset, metadata, timestamp}

Why This Storage Model

  • Append-only log on local disk — sequential writes are the fastest possible I/O pattern. HDDs do 200MB/s sequential, SSDs do 2GB/s+. No random seeks.
  • No traditional database — the log IS the database. No index overhead, no B-tree maintenance.
  • Consumer offsets stored as a compacted topic — self-hosted, replicated, no external DB dependency.
  • KRaft (replacing ZooKeeper) — metadata managed via Raft consensus among controller brokers, eliminating the ZK dependency.

5. High-Level Design (12 min)

Architecture

Producers (thousands)
  │
  ├─ Producer 1 ──┐
  ├─ Producer 2 ──┤    (messages partitioned by key hash)
  └─ Producer N ──┤
                  ▼
         ┌──────────────────────────────────────┐
         │          Kafka Cluster                │
         │                                       │
         │  Broker 1         Broker 2            │
         │  ┌────────────┐   ┌────────────┐     │
         │  │ Topic-A P0 │   │ Topic-A P1 │     │
         │  │  (Leader)  │   │  (Leader)  │     │
         │  │ Topic-A P1 │   │ Topic-A P0 │     │
         │  │  (Replica) │   │  (Replica) │     │
         │  └────────────┘   └────────────┘     │
         │                                       │
         │  Broker 3 (Controller)                │
         │  ┌────────────┐                      │
         │  │ Topic-A P0 │  KRaft Metadata      │
         │  │  (Replica) │  (Raft consensus)    │
         │  │ Topic-A P1 │                      │
         │  │  (Replica) │                      │
         │  └────────────┘                      │
         └──────────────────────────────────────┘
                  │
                  ▼
         Consumer Groups
         ┌─────────────────────┐
         │ Group "analytics"   │
         │  Consumer A → P0    │
         │  Consumer B → P1    │
         └─────────────────────┘
         ┌─────────────────────┐
         │ Group "search-index"│
         │  Consumer C → P0,P1 │
         └─────────────────────┘

Write Path (Producer → Broker)

Producer
  → Serialize message (Avro/Protobuf/JSON)
  → Compute partition: hash(key) % num_partitions (or round-robin if no key)
  → Batch messages destined for same partition (linger.ms = 5ms, batch.size = 16KB)
  → Compress batch (LZ4/Snappy/Zstd)
  → Send to partition leader broker
  → Leader:
    → Append to local log segment (sequential write)
    → Wait for ISR replicas to fetch and acknowledge
    → When acks=all: respond to producer only after all ISR replicas confirm
    → When acks=1: respond after local write (faster, risk of data loss on leader crash)
  → Producer receives offset confirmation

Read Path (Broker → Consumer)

Consumer
  → Poll leader broker for partition
  → Broker:
    → Read from page cache (hot data) or disk (cold data)
    → Use sendfile() / zero-copy: data goes directly from page cache → network socket
      (no kernel→user→kernel copy — saves 2 memory copies and 2 context switches)
    → Respond with batch of messages
  → Consumer deserializes and processes
  → Consumer commits offset (async or sync)

Components

  1. Brokers (50-60): Store partition log segments, serve produce/fetch requests, replicate data. Each broker is a partition leader for some partitions and a follower for others.
  2. Controller (KRaft): 3-5 controller nodes running Raft consensus. Manages cluster metadata: topic creation, partition assignment, leader election, ISR management.
  3. Producers: Client libraries that batch, compress, and route messages to the correct partition leader.
  4. Consumers: Client libraries that poll partition leaders, track offsets, and handle rebalancing within consumer groups.
  5. Schema Registry: External service storing Avro/Protobuf schemas. Producers register schemas, consumers validate. Prevents breaking changes.
  6. Monitoring (JMX/Prometheus): Under-replicated partitions, consumer lag, broker throughput, request latency percentiles.

6. Deep Dives (15 min)

Deep Dive 1: Replication and ISR (In-Sync Replicas)

The problem: How do we ensure durability without sacrificing throughput? Traditional quorum (majority must ack) wastes 1/3 of write capacity.

Kafka’s ISR approach:

Partition P0: Leader = Broker 1, Replicas = [Broker 1, Broker 2, Broker 3]
ISR = [Broker 1, Broker 2, Broker 3]  (initially all in sync)

Write flow:
1. Producer sends message to Leader (Broker 1)
2. Leader appends to local log, records LEO (Log End Offset) = 100
3. Followers (Broker 2, 3) fetch from leader (pull-based replication)
4. Each follower appends to local log, sends fetch response
5. Leader tracks each replica's LEO:
   - Broker 1 LEO: 100
   - Broker 2 LEO: 100
   - Broker 3 LEO: 98 (lagging)
6. High-Water Mark (HWM) = min(LEO of all ISR members) = 98
   - Only messages up to HWM are visible to consumers
   - This prevents consumers from reading data that might be lost on leader failure

ISR shrinking:

If Broker 3 falls behind by > replica.lag.time.max.ms (default 30s):
  ISR shrinks: [Broker 1, Broker 2, Broker 3] → [Broker 1, Broker 2]
  - Writes now only need ack from Broker 1 and Broker 2
  - min.insync.replicas = 2 → writes still succeed
  - If ISR drops below min.insync.replicas → producer gets NotEnoughReplicasException
  - This is the safety valve: we refuse writes rather than risk data loss

Leader failure and election:

Broker 1 (leader) crashes:
  1. Controller detects failure (session timeout)
  2. Controller picks new leader from ISR: Broker 2
  3. Broker 2 becomes leader, truncates any messages beyond the last committed HWM
  4. Consumers and producers discover new leader via metadata refresh
  5. Total failover time: ~5-10 seconds

Unclean leader election (all ISR dead):
  - unclean.leader.election.enable = false (default): partition goes offline, no data loss
  - unclean.leader.election.enable = true: out-of-sync replica becomes leader, some messages lost
  - Trade-off: availability vs durability

Why ISR > Quorum:

  • Quorum (majority): RF=3 needs 2 acks. RF=5 needs 3 acks. Wastes replicas.
  • ISR: RF=3, all 3 in ISR → all 3 ack → stronger durability. If one falls behind, ISR shrinks to 2, writes continue with 2 acks. Same durability guarantee, but more adaptive.

Deep Dive 2: Exactly-Once Semantics (EOS)

The problem: At-least-once delivery means duplicates when producers retry on timeout. At-most-once means data loss when producers don’t retry. We need exactly-once for financial transactions, deduplication pipelines, and stream processing.

Three layers to EOS:

Layer 1: Idempotent Producer (per-partition dedup)

Producer config: enable.idempotence = true

How it works:
1. Broker assigns each producer a unique ProducerID (PID)
2. Producer tracks a sequence number per partition, incrementing with each message
3. Broker maintains: Map<(PID, Partition) → last_sequence_number>
4. On receive:
   - If seq == expected: accept, increment expected
   - If seq < expected: duplicate, silently ack (idempotent)
   - If seq > expected: out-of-order, reject (producer bug or PID reuse)

This eliminates duplicates from network retries within a single producer session.
Limitation: PID changes on producer restart → cannot dedup across restarts.

Layer 2: Transactional Producer (cross-partition atomicity)

Producer config: transactional.id = "payment-processor-1"

How it works:
1. Producer registers transactional.id with Transaction Coordinator (a broker)
2. Coordinator assigns PID + epoch (epoch increments on restart → fences zombies)

Transaction flow:
  producer.beginTransaction()
  producer.send(topic="orders", key="order_1", value="...")
  producer.send(topic="inventory", key="item_42", value="...")
  producer.sendOffsetsToTransaction(consumer_offsets)  // for consume-transform-produce
  producer.commitTransaction()

Under the hood:
  a. Producer → Coordinator: begin transaction
  b. Producer → Partition leaders: send messages (marked as "transactional, uncommitted")
  c. Producer → Coordinator: commit transaction
  d. Coordinator → writes COMMIT marker to transaction log
  e. Coordinator → Partition leaders: write COMMIT marker to each partition
  f. Consumers with isolation.level=read_committed skip uncommitted messages

Layer 3: Consumer isolation

Consumer config: isolation.level = read_committed

Behavior:
- Consumer only sees messages up to the Last Stable Offset (LSO)
- LSO = min(HWM, earliest_open_transaction_offset)
- Uncommitted transactional messages are buffered but not delivered
- On COMMIT: messages become visible
- On ABORT: messages are skipped

Deep Dive 3: Performance — Zero-Copy and Sequential I/O

Why Kafka is fast (not despite being on disk, but because of it):

1. Sequential I/O

Random disk write: ~100 IOPS (HDD), ~10,000 IOPS (SSD)
Sequential disk write: ~200 MB/s (HDD), ~2 GB/s (SSD)

Kafka only does sequential appends:
- Producer write → append to end of log segment file
- No random seeks, no in-place updates, no B-tree rebalancing
- OS page cache naturally handles hot data in RAM

2. Zero-Copy (sendfile syscall)

Traditional read + network send:
  1. Disk → Kernel page cache (DMA)
  2. Kernel page cache → User space buffer (CPU copy)
  3. User space buffer → Kernel socket buffer (CPU copy)
  4. Kernel socket buffer → NIC (DMA)
  = 4 copies, 2 context switches

Zero-copy with sendfile():
  1. Disk → Kernel page cache (DMA)
  2. Kernel page cache → NIC (DMA, with scatter-gather)
  = 2 copies, 0 context switches, data never enters user space

For Kafka consuming: data goes directly from page cache to network.
This is why Kafka consumer throughput can saturate the NIC (10 Gbps+).

3. Batching and Compression

Producer batching:
  - linger.ms = 5: wait up to 5ms to accumulate messages into a batch
  - batch.size = 16KB: send when batch reaches 16KB
  - One network round trip per batch, not per message
  - Compression (LZ4/Zstd) applied per batch → better compression ratio than per-message

Result: 1M messages/sec with ~2000 network requests/sec (500 messages per batch)

4. Log Compaction (for changelog topics)

Normal retention: delete segments older than retention.ms
Log compaction: keep only the latest value per key

Use case: KTable / changelog / state store
  Key: user_123, Value: {name: "Alice", city: "NYC"}     offset 10
  Key: user_123, Value: {name: "Alice", city: "London"}   offset 50
  Key: user_456, Value: {name: "Bob", city: "SF"}         offset 30

After compaction:
  Key: user_123, Value: {name: "Alice", city: "London"}   offset 50
  Key: user_456, Value: {name: "Bob", city: "SF"}         offset 30

Tombstone: Key: user_123, Value: null → deletes the key after delete.retention.ms

7. Extensions (2 min)

  • Kafka Streams / ksqlDB: Stream processing library that enables joins, aggregations, windowed computations, and stateful transformations directly on Kafka topics without a separate processing cluster (Spark/Flink).
  • Tiered Storage: Move old log segments from local broker disks to object storage (S3/GCS). Enables infinite retention without scaling broker storage. Hot segments stay on local SSDs, cold segments served from S3.
  • Multi-datacenter replication (MirrorMaker 2): Async replication across regions using source-topic → remote-topic mirroring. Handles offset translation so consumers can failover across DCs.
  • Rack-aware replica assignment: Ensure replicas for the same partition are placed on different racks/AZs. Survives rack-level failures without data loss.
  • Quotas and multi-tenancy: Per-client-id produce/consume byte-rate throttling. Prevents one noisy tenant from starving others on a shared cluster. Network and request quotas enforced at the broker level.