1. Requirements & Scope (5 min)

Functional Requirements

  1. Track the top-K most frequent items (heavy hitters) across a stream of events in real-time (e.g., top 100 trending hashtags, most searched queries, most purchased products)
  2. Support time-windowed queries — top-K in the last 1 minute, 1 hour, 1 day
  3. Provide approximate counts for each item in the top-K list, with bounded error guarantees
  4. Support multiple independent top-K lists (per category, per region, global)
  5. Allow querying both the current top-K snapshot and historical top-K at any past timestamp

Non-Functional Requirements

  • Availability: 99.99% — the system is used for real-time dashboards and recommendation feeds
  • Latency: Event processing < 10ms. Top-K query response < 50ms.
  • Consistency: Approximate counts are acceptable (within 0.1% of true count). The top-K list may briefly lag by a few seconds.
  • Scale: 1M events/sec ingestion, 100K unique items per time window, top-K where K ≤ 1000
  • Memory efficiency: Must not grow linearly with the number of unique items. A stream with 100M unique items should still use bounded memory.

2. Estimation (3 min)

Traffic

  • Event stream: 1M events/sec (e.g., search queries, clicks, purchases)
  • Each event: item_id (8 bytes) + timestamp (8 bytes) + metadata (32 bytes) = ~48 bytes
  • Ingestion bandwidth: 1M × 48B = 48 MB/sec
  • Query QPS: ~10K queries/sec for top-K reads (cached aggressively)

Memory for Exact Counting

  • 100M unique items per day × 8 bytes (item_id) + 8 bytes (count) = 1.6 GB — feasible for daily, but maintaining sliding windows of 1-minute granularity is harder
  • 100M items × 1440 minutes × 16 bytes = 2.3 TB — impossible for exact per-minute counts

Memory for Approximate Counting

  • Count-Min Sketch: 4 hash functions × 1M counters × 4 bytes = 16 MB per time window
  • Space-Saving (top-K tracker): K=1000 items × (8 bytes key + 8 bytes count + 8 bytes error) = 24 KB
  • Per-minute windows for 24 hours: 1440 windows × 16 MB = 23 GB — manageable
  • With exponential decay (approximate sliding window): Single sketch, no windowing needed = 16 MB total

Key Insight

The core challenge is the memory vs accuracy trade-off. Exact counting requires O(N) memory where N is unique items. Probabilistic data structures (Count-Min Sketch, Space-Saving) provide O(1/ε²) memory for ε-approximate answers. For top-K, we need both frequency estimation AND identification of the top items.


3. API Design (3 min)

Event Ingestion

POST /events
Content-Type: application/json

Body: {
  "events": [
    {
      "item_id": "hashtag_taylor_swift",
      "namespace": "trending_hashtags",
      "timestamp": 1708632000,
      "weight": 1                      // optional, default 1
    },
    ...
  ]
}
Response 204 No Content

Top-K Query

GET /top-k?namespace=trending_hashtags
  &k=100
  &window=1h                           // last 1 hour
  &timestamp=1708632000                // optional: historical query

Response 200: {
  "window": { "start": 1708628400, "end": 1708632000 },
  "items": [
    { "item_id": "hashtag_taylor_swift", "estimated_count": 284500, "rank": 1 },
    { "item_id": "hashtag_super_bowl", "estimated_count": 195200, "rank": 2 },
    ...
  ],
  "accuracy": {
    "type": "count_min_sketch",
    "epsilon": 0.001,                  // error bound: actual ≤ estimated ≤ actual + ε×N
    "confidence": 0.9999               // probability the bound holds
  }
}

Admin API

POST /namespaces
Body: {
  "name": "trending_hashtags",
  "windows": ["1m", "1h", "1d"],
  "k": 100,
  "sketch_width": 1000000,            // Count-Min Sketch width
  "sketch_depth": 4                    // number of hash functions
}

Key Decisions

  • Batched ingestion — clients send events in batches of 100-1000 for throughput
  • Namespace isolation — each top-K list is independent (different sketch, different K)
  • Weight parameter — supports weighted counting (e.g., revenue per product instead of just purchase count)
  • Historical queries — enabled by snapshotting sketches at regular intervals

4. Data Model (3 min)

In-Memory Structures (Redis or custom service)

Per namespace, per time window:

1. Count-Min Sketch (frequency estimation)
   Key: cms:{namespace}:{window_id}
   Value: 2D array of counters [depth × width]

   Example: cms:trending_hashtags:2024022218
   depth = 4 hash functions
   width = 1,000,000 counters
   Size: 4 × 1M × 4 bytes = 16 MB

2. Space-Saving / Min-Heap (top-K tracker)
   Key: topk:{namespace}:{window_id}
   Value: sorted list of (item_id, count, error_bound)

   Example: topk:trending_hashtags:2024022218
   K = 100 entries
   Size: ~2.4 KB

Persistent Storage (for historical queries)

Table: topk_snapshots (ClickHouse or PostgreSQL)
  namespace           | string
  window_start        | datetime
  window_duration     | string        -- '1m', '1h', '1d'
  snapshot_time       | datetime
  items               | JSON          -- [ { "item_id": ..., "count": ..., "rank": ... } ]
  sketch_blob         | binary        -- serialized Count-Min Sketch (for re-queries)

Namespace Configuration (PostgreSQL)

Table: namespaces
  namespace_id        | string (PK)
  display_name        | string
  windows             | array<string>  -- ['1m', '1h', '1d']
  k                   | int
  sketch_width        | int
  sketch_depth        | int
  created_at          | datetime

DB Choice

  • In-memory (Redis or custom): All hot sketches and top-K heaps live in memory. Redis supports custom data structures via modules (e.g., RedisBloom for Count-Min Sketch).
  • ClickHouse: Historical snapshots for time-series queries. “What were the top 100 hashtags at 3 PM yesterday?”
  • PostgreSQL: Configuration and metadata only.

5. High-Level Design (12 min)

Architecture

Event Producers (web/mobile/services)
  → Load Balancer
  → Ingestion Service (stateless, many instances)
  → Kafka (partitioned by namespace)
  → Top-K Aggregation Workers (stateful, one per namespace partition):
       ├── Update Count-Min Sketch (frequency estimation)
       ├── Update Space-Saving heap (top-K tracking)
       └── Periodically flush to persistent storage
  → Query Service (stateless):
       ← Read from in-memory sketches for real-time
       ← Read from ClickHouse for historical

Detailed Event Flow

Event: { item: "hashtag_X", namespace: "trending", ts: T }
  → Kafka topic: "events-trending" (partition by hash(item_id))
  → Aggregation Worker:
      1. Determine time windows this event falls into:
         - 1-minute window: floor(T / 60)
         - 1-hour window: floor(T / 3600)
         - 1-day window: floor(T / 86400)
      2. For each window:
         a. Count-Min Sketch update:
            for i in 0..depth:
              index = hash_i(item_id) % width
              sketch[i][index] += 1
         b. Estimated frequency = min(sketch[i][hash_i(item_id) % width] for i in 0..depth)
         c. Space-Saving update:
            if item_id in top-K heap:
              increment its count
            elif heap.size < K:
              insert item_id with count = estimated_frequency
            elif estimated_frequency > heap.min().count:
              evict minimum, insert item_id with count = estimated_frequency
      3. Every 10 seconds: snapshot top-K list to ClickHouse
      4. Every minute: rotate 1-minute window, archive old sketch

Components

  1. Ingestion Service: HTTP/gRPC endpoint. Validates events, enriches with server timestamp, publishes to Kafka. Stateless, horizontally scalable.
  2. Kafka: Event bus partitioned by namespace. Ensures all events for one namespace go to the same partition (required for consistent sketch updates).
  3. Aggregation Workers: One worker per namespace partition. Maintains in-memory Count-Min Sketch and Space-Saving data structure. This is the core stateful component.
  4. Snapshot Service: Periodically serializes sketches to ClickHouse. Enables historical queries and crash recovery.
  5. Query Service: For real-time: reads directly from Aggregation Worker memory (via gRPC or shared Redis). For historical: queries ClickHouse snapshots.
  6. Redis (optional): If Aggregation Workers are distributed, publish top-K updates to Redis for fast reads by Query Service. Alternatively, workers expose a read API directly.

Scaling Strategy

Namespace with 1M events/sec:
  → Partition by item_id hash across 10 sub-workers
  → Each worker maintains a local sketch and local top-K
  → Coordinator merges local top-K lists every second:
       global_top_k = merge(local_top_k_1, ..., local_top_k_10)
       (merge = union of items, query global sketch for true counts)
  → Count-Min Sketches are mergeable: global_sketch = sum(local_sketches)

6. Deep Dives (15 min)

Deep Dive 1: Count-Min Sketch — How It Works and Tuning

The data structure:

Width (w) = ⌈e/ε⌉     where ε = desired error bound
Depth (d) = ⌈ln(1/δ)⌉  where δ = failure probability

Example: ε = 0.001 (0.1% error), δ = 0.0001 (99.99% confidence)
  w = ⌈2.718 / 0.001⌉ = 2,718
  d = ⌈ln(10,000)⌉ = 10

Practical choice: w = 1,000,000, d = 4
  ε = e / 1,000,000 ≈ 0.0000027
  For N = 1B total events: max overcount = 0.0000027 × 1B = 2,718
  Memory: 4 × 1M × 4 bytes = 16 MB

Update:

def update(item, count=1):
  for i in range(depth):
    j = hash_i(item) % width
    sketch[i][j] += count

Time complexity: O(depth) = O(1) since depth is constant

Query:

def estimate(item):
  return min(sketch[i][hash_i(item) % width] for i in range(depth))

# Returns: actual_count ≤ estimate ≤ actual_count + ε × N
# The estimate NEVER underestimates (only overcounts due to hash collisions)

Why CMS works for top-K:

  • Heavy hitters (items with count > ε×N) are always identified correctly
  • Items with low frequency may have inflated estimates, but they won’t be in the top-K because the threshold is high
  • For K=100 and N=1B: any item in the true top-100 has count > 10M. CMS error ≤ 2,718. Error is negligible compared to the count.

Conservative Update (reduces overestimation):

def update_conservative(item, count=1):
  current_estimate = estimate(item)
  for i in range(depth):
    j = hash_i(item) % width
    sketch[i][j] = max(sketch[i][j], current_estimate + count)

This dramatically reduces false inflation. In practice, conservative update reduces error by 5-10×.

Deep Dive 2: Space-Saving Algorithm for Top-K Tracking

The problem: Count-Min Sketch tells you the approximate frequency of a given item, but it doesn’t tell you which items are the most frequent. You need a separate data structure to track the top-K candidates.

Space-Saving algorithm:

Maintain a fixed-size summary of K items, each with:
  - item_id
  - count (estimated frequency)
  - overestimation (error bound)

On new event for item X:
  if X is in the summary:
    summary[X].count += 1
  elif summary.size < K:
    summary.insert(X, count=1, overestimation=0)
  else:
    min_item = summary.min()  // item with smallest count
    evicted_count = min_item.count
    summary.evict(min_item)
    summary.insert(X, count=evicted_count + 1, overestimation=evicted_count)

Key properties:

  • Memory: O(K) — stores exactly K items
  • Any item with true frequency > N/K is guaranteed to be in the summary
  • The overestimation field bounds the error: true_count ∈ [count - overestimation, count]
  • In practice, if K = 10 × (desired top-K), the results are highly accurate

Example:

K = 1000 (track 1000 candidates to report top 100)
Stream: A A B A C B A D A B C A B ...

After many events:
  A: count=15000, overestimation=0    (never evicted, so exact)
  B: count=8200,  overestimation=12   (true count ∈ [8188, 8200])
  C: count=3100,  overestimation=45   (true count ∈ [3055, 3100])
  ...

Items that were once in top-K but fell off: evicted with their count
  reassigned to new items, causing overestimation in new items

Combining CMS + Space-Saving (best approach):

On new event for item X:
  1. Update Count-Min Sketch: cms.update(X)
  2. Get CMS estimate: est = cms.estimate(X)
  3. Update Space-Saving:
     if X in heap: heap[X].count = est
     elif heap.size < K: heap.insert(X, est)
     elif est > heap.min().count: heap.replace_min(X, est)

Using CMS estimates instead of Space-Saving’s native counting gives more accurate counts (CMS error is bounded, Space-Saving accumulates error through evictions).

Deep Dive 3: Time-Windowed Heavy Hitters

The problem: “Top-K in the last hour” requires a sliding window, but maintaining a separate sketch per second (3600 sketches) is expensive.

Approach 1: Tumbling windows with merge

Maintain one CMS per minute (60 sketches for 1-hour window)
  Memory: 60 × 16MB = 960MB per namespace

Hourly top-K query:
  merged_sketch = sum(minute_sketches[now-60..now])  // CMS is additive
  for each candidate in Space-Saving heap:
    count = merged_sketch.estimate(candidate)
  return top K by count

On minute boundary:
  Discard oldest minute sketch
  Create new empty sketch for current minute

Approach 2: Exponential decay (sliding window approximation)

Single sketch, but counters decay over time:
  effective_count(item, t) = Σ count_i × e^(-λ(t - t_i))

Implementation: multiply all counters by decay factor periodically
  Every second: sketch *= e^(-λ)   where λ = ln(2) / half_life

Half-life = 30 minutes → recent events weighted much more than old events
Memory: single 16MB sketch (no per-window copies)
Trade-off: no sharp window boundary; uses exponential decay instead

Approach 3: Hierarchical windows (most practical)

Level 0: 60 per-second sketches    (last minute at 1-second granularity)
Level 1: 60 per-minute sketches    (last hour at 1-minute granularity)
Level 2: 24 per-hour sketches      (last day at 1-hour granularity)

On second boundary: merge 60 second-sketches → 1 minute-sketch
On minute boundary: merge 60 minute-sketches → 1 hour-sketch

Query "last 5 minutes":
  merge(5 minute-sketches from Level 1)

Query "last 3 hours":
  merge(3 hour-sketches from Level 2)

Memory: (60 + 60 + 24) × 16MB = 2.3GB per namespace

Recommendation: Hierarchical windows for production. Balances memory, accuracy, and query flexibility. Exponential decay when memory is very tight and approximate windows are acceptable.


7. Extensions (2 min)

  • Distributed top-K across data centers: Each data center maintains local sketches. A global coordinator periodically merges sketches (CMS is additive) and computes global top-K. Cross-DC merge runs every 5-10 seconds.
  • Top-K with exclusion lists: Filter out known items (e.g., exclude “the”, “a” from trending search terms). Apply bloom filter before updating sketch to skip excluded items.
  • Real-time anomaly detection: Compare current top-K with the same time window last week. If an item’s rank jumps by more than 50 positions, flag it as trending/anomalous. Useful for breaking news detection.
  • Per-user personalized top-K: Combine global top-K with user-specific interaction history. Weighted score = α × global_popularity + (1-α) × user_relevance. Requires a lightweight per-user signal (last 100 interactions).
  • Accuracy monitoring: Periodically run exact counts on a sample of items and compare with sketch estimates. Track and alert on error distribution. If error exceeds SLA (e.g., > 1% for top-10 items), increase sketch dimensions dynamically.