1. Requirements & Scope (5 min)
Functional Requirements
- Track the top-K most frequent items (heavy hitters) across a stream of events in real-time (e.g., top 100 trending hashtags, most searched queries, most purchased products)
- Support time-windowed queries — top-K in the last 1 minute, 1 hour, 1 day
- Provide approximate counts for each item in the top-K list, with bounded error guarantees
- Support multiple independent top-K lists (per category, per region, global)
- Allow querying both the current top-K snapshot and historical top-K at any past timestamp
Non-Functional Requirements
- Availability: 99.99% — the system is used for real-time dashboards and recommendation feeds
- Latency: Event processing < 10ms. Top-K query response < 50ms.
- Consistency: Approximate counts are acceptable (within 0.1% of true count). The top-K list may briefly lag by a few seconds.
- Scale: 1M events/sec ingestion, 100K unique items per time window, top-K where K ≤ 1000
- Memory efficiency: Must not grow linearly with the number of unique items. A stream with 100M unique items should still use bounded memory.
2. Estimation (3 min)
Traffic
- Event stream: 1M events/sec (e.g., search queries, clicks, purchases)
- Each event: item_id (8 bytes) + timestamp (8 bytes) + metadata (32 bytes) = ~48 bytes
- Ingestion bandwidth: 1M × 48B = 48 MB/sec
- Query QPS: ~10K queries/sec for top-K reads (cached aggressively)
Memory for Exact Counting
- 100M unique items per day × 8 bytes (item_id) + 8 bytes (count) = 1.6 GB — feasible for daily, but maintaining sliding windows of 1-minute granularity is harder
- 100M items × 1440 minutes × 16 bytes = 2.3 TB — impossible for exact per-minute counts
Memory for Approximate Counting
- Count-Min Sketch: 4 hash functions × 1M counters × 4 bytes = 16 MB per time window
- Space-Saving (top-K tracker): K=1000 items × (8 bytes key + 8 bytes count + 8 bytes error) = 24 KB
- Per-minute windows for 24 hours: 1440 windows × 16 MB = 23 GB — manageable
- With exponential decay (approximate sliding window): Single sketch, no windowing needed = 16 MB total
Key Insight
The core challenge is the memory vs accuracy trade-off. Exact counting requires O(N) memory where N is unique items. Probabilistic data structures (Count-Min Sketch, Space-Saving) provide O(1/ε²) memory for ε-approximate answers. For top-K, we need both frequency estimation AND identification of the top items.
3. API Design (3 min)
Event Ingestion
POST /events
Content-Type: application/json
Body: {
"events": [
{
"item_id": "hashtag_taylor_swift",
"namespace": "trending_hashtags",
"timestamp": 1708632000,
"weight": 1 // optional, default 1
},
...
]
}
Response 204 No Content
Top-K Query
GET /top-k?namespace=trending_hashtags
&k=100
&window=1h // last 1 hour
×tamp=1708632000 // optional: historical query
Response 200: {
"window": { "start": 1708628400, "end": 1708632000 },
"items": [
{ "item_id": "hashtag_taylor_swift", "estimated_count": 284500, "rank": 1 },
{ "item_id": "hashtag_super_bowl", "estimated_count": 195200, "rank": 2 },
...
],
"accuracy": {
"type": "count_min_sketch",
"epsilon": 0.001, // error bound: actual ≤ estimated ≤ actual + ε×N
"confidence": 0.9999 // probability the bound holds
}
}
Admin API
POST /namespaces
Body: {
"name": "trending_hashtags",
"windows": ["1m", "1h", "1d"],
"k": 100,
"sketch_width": 1000000, // Count-Min Sketch width
"sketch_depth": 4 // number of hash functions
}
Key Decisions
- Batched ingestion — clients send events in batches of 100-1000 for throughput
- Namespace isolation — each top-K list is independent (different sketch, different K)
- Weight parameter — supports weighted counting (e.g., revenue per product instead of just purchase count)
- Historical queries — enabled by snapshotting sketches at regular intervals
4. Data Model (3 min)
In-Memory Structures (Redis or custom service)
Per namespace, per time window:
1. Count-Min Sketch (frequency estimation)
Key: cms:{namespace}:{window_id}
Value: 2D array of counters [depth × width]
Example: cms:trending_hashtags:2024022218
depth = 4 hash functions
width = 1,000,000 counters
Size: 4 × 1M × 4 bytes = 16 MB
2. Space-Saving / Min-Heap (top-K tracker)
Key: topk:{namespace}:{window_id}
Value: sorted list of (item_id, count, error_bound)
Example: topk:trending_hashtags:2024022218
K = 100 entries
Size: ~2.4 KB
Persistent Storage (for historical queries)
Table: topk_snapshots (ClickHouse or PostgreSQL)
namespace | string
window_start | datetime
window_duration | string -- '1m', '1h', '1d'
snapshot_time | datetime
items | JSON -- [ { "item_id": ..., "count": ..., "rank": ... } ]
sketch_blob | binary -- serialized Count-Min Sketch (for re-queries)
Namespace Configuration (PostgreSQL)
Table: namespaces
namespace_id | string (PK)
display_name | string
windows | array<string> -- ['1m', '1h', '1d']
k | int
sketch_width | int
sketch_depth | int
created_at | datetime
DB Choice
- In-memory (Redis or custom): All hot sketches and top-K heaps live in memory. Redis supports custom data structures via modules (e.g., RedisBloom for Count-Min Sketch).
- ClickHouse: Historical snapshots for time-series queries. “What were the top 100 hashtags at 3 PM yesterday?”
- PostgreSQL: Configuration and metadata only.
5. High-Level Design (12 min)
Architecture
Event Producers (web/mobile/services)
→ Load Balancer
→ Ingestion Service (stateless, many instances)
→ Kafka (partitioned by namespace)
→ Top-K Aggregation Workers (stateful, one per namespace partition):
├── Update Count-Min Sketch (frequency estimation)
├── Update Space-Saving heap (top-K tracking)
└── Periodically flush to persistent storage
→ Query Service (stateless):
← Read from in-memory sketches for real-time
← Read from ClickHouse for historical
Detailed Event Flow
Event: { item: "hashtag_X", namespace: "trending", ts: T }
→ Kafka topic: "events-trending" (partition by hash(item_id))
→ Aggregation Worker:
1. Determine time windows this event falls into:
- 1-minute window: floor(T / 60)
- 1-hour window: floor(T / 3600)
- 1-day window: floor(T / 86400)
2. For each window:
a. Count-Min Sketch update:
for i in 0..depth:
index = hash_i(item_id) % width
sketch[i][index] += 1
b. Estimated frequency = min(sketch[i][hash_i(item_id) % width] for i in 0..depth)
c. Space-Saving update:
if item_id in top-K heap:
increment its count
elif heap.size < K:
insert item_id with count = estimated_frequency
elif estimated_frequency > heap.min().count:
evict minimum, insert item_id with count = estimated_frequency
3. Every 10 seconds: snapshot top-K list to ClickHouse
4. Every minute: rotate 1-minute window, archive old sketch
Components
- Ingestion Service: HTTP/gRPC endpoint. Validates events, enriches with server timestamp, publishes to Kafka. Stateless, horizontally scalable.
- Kafka: Event bus partitioned by namespace. Ensures all events for one namespace go to the same partition (required for consistent sketch updates).
- Aggregation Workers: One worker per namespace partition. Maintains in-memory Count-Min Sketch and Space-Saving data structure. This is the core stateful component.
- Snapshot Service: Periodically serializes sketches to ClickHouse. Enables historical queries and crash recovery.
- Query Service: For real-time: reads directly from Aggregation Worker memory (via gRPC or shared Redis). For historical: queries ClickHouse snapshots.
- Redis (optional): If Aggregation Workers are distributed, publish top-K updates to Redis for fast reads by Query Service. Alternatively, workers expose a read API directly.
Scaling Strategy
Namespace with 1M events/sec:
→ Partition by item_id hash across 10 sub-workers
→ Each worker maintains a local sketch and local top-K
→ Coordinator merges local top-K lists every second:
global_top_k = merge(local_top_k_1, ..., local_top_k_10)
(merge = union of items, query global sketch for true counts)
→ Count-Min Sketches are mergeable: global_sketch = sum(local_sketches)
6. Deep Dives (15 min)
Deep Dive 1: Count-Min Sketch — How It Works and Tuning
The data structure:
Width (w) = ⌈e/ε⌉ where ε = desired error bound
Depth (d) = ⌈ln(1/δ)⌉ where δ = failure probability
Example: ε = 0.001 (0.1% error), δ = 0.0001 (99.99% confidence)
w = ⌈2.718 / 0.001⌉ = 2,718
d = ⌈ln(10,000)⌉ = 10
Practical choice: w = 1,000,000, d = 4
ε = e / 1,000,000 ≈ 0.0000027
For N = 1B total events: max overcount = 0.0000027 × 1B = 2,718
Memory: 4 × 1M × 4 bytes = 16 MB
Update:
def update(item, count=1):
for i in range(depth):
j = hash_i(item) % width
sketch[i][j] += count
Time complexity: O(depth) = O(1) since depth is constant
Query:
def estimate(item):
return min(sketch[i][hash_i(item) % width] for i in range(depth))
# Returns: actual_count ≤ estimate ≤ actual_count + ε × N
# The estimate NEVER underestimates (only overcounts due to hash collisions)
Why CMS works for top-K:
- Heavy hitters (items with count > ε×N) are always identified correctly
- Items with low frequency may have inflated estimates, but they won’t be in the top-K because the threshold is high
- For K=100 and N=1B: any item in the true top-100 has count > 10M. CMS error ≤ 2,718. Error is negligible compared to the count.
Conservative Update (reduces overestimation):
def update_conservative(item, count=1):
current_estimate = estimate(item)
for i in range(depth):
j = hash_i(item) % width
sketch[i][j] = max(sketch[i][j], current_estimate + count)
This dramatically reduces false inflation. In practice, conservative update reduces error by 5-10×.
Deep Dive 2: Space-Saving Algorithm for Top-K Tracking
The problem: Count-Min Sketch tells you the approximate frequency of a given item, but it doesn’t tell you which items are the most frequent. You need a separate data structure to track the top-K candidates.
Space-Saving algorithm:
Maintain a fixed-size summary of K items, each with:
- item_id
- count (estimated frequency)
- overestimation (error bound)
On new event for item X:
if X is in the summary:
summary[X].count += 1
elif summary.size < K:
summary.insert(X, count=1, overestimation=0)
else:
min_item = summary.min() // item with smallest count
evicted_count = min_item.count
summary.evict(min_item)
summary.insert(X, count=evicted_count + 1, overestimation=evicted_count)
Key properties:
- Memory: O(K) — stores exactly K items
- Any item with true frequency > N/K is guaranteed to be in the summary
- The overestimation field bounds the error:
true_count ∈ [count - overestimation, count] - In practice, if K = 10 × (desired top-K), the results are highly accurate
Example:
K = 1000 (track 1000 candidates to report top 100)
Stream: A A B A C B A D A B C A B ...
After many events:
A: count=15000, overestimation=0 (never evicted, so exact)
B: count=8200, overestimation=12 (true count ∈ [8188, 8200])
C: count=3100, overestimation=45 (true count ∈ [3055, 3100])
...
Items that were once in top-K but fell off: evicted with their count
reassigned to new items, causing overestimation in new items
Combining CMS + Space-Saving (best approach):
On new event for item X:
1. Update Count-Min Sketch: cms.update(X)
2. Get CMS estimate: est = cms.estimate(X)
3. Update Space-Saving:
if X in heap: heap[X].count = est
elif heap.size < K: heap.insert(X, est)
elif est > heap.min().count: heap.replace_min(X, est)
Using CMS estimates instead of Space-Saving’s native counting gives more accurate counts (CMS error is bounded, Space-Saving accumulates error through evictions).
Deep Dive 3: Time-Windowed Heavy Hitters
The problem: “Top-K in the last hour” requires a sliding window, but maintaining a separate sketch per second (3600 sketches) is expensive.
Approach 1: Tumbling windows with merge
Maintain one CMS per minute (60 sketches for 1-hour window)
Memory: 60 × 16MB = 960MB per namespace
Hourly top-K query:
merged_sketch = sum(minute_sketches[now-60..now]) // CMS is additive
for each candidate in Space-Saving heap:
count = merged_sketch.estimate(candidate)
return top K by count
On minute boundary:
Discard oldest minute sketch
Create new empty sketch for current minute
Approach 2: Exponential decay (sliding window approximation)
Single sketch, but counters decay over time:
effective_count(item, t) = Σ count_i × e^(-λ(t - t_i))
Implementation: multiply all counters by decay factor periodically
Every second: sketch *= e^(-λ) where λ = ln(2) / half_life
Half-life = 30 minutes → recent events weighted much more than old events
Memory: single 16MB sketch (no per-window copies)
Trade-off: no sharp window boundary; uses exponential decay instead
Approach 3: Hierarchical windows (most practical)
Level 0: 60 per-second sketches (last minute at 1-second granularity)
Level 1: 60 per-minute sketches (last hour at 1-minute granularity)
Level 2: 24 per-hour sketches (last day at 1-hour granularity)
On second boundary: merge 60 second-sketches → 1 minute-sketch
On minute boundary: merge 60 minute-sketches → 1 hour-sketch
Query "last 5 minutes":
merge(5 minute-sketches from Level 1)
Query "last 3 hours":
merge(3 hour-sketches from Level 2)
Memory: (60 + 60 + 24) × 16MB = 2.3GB per namespace
Recommendation: Hierarchical windows for production. Balances memory, accuracy, and query flexibility. Exponential decay when memory is very tight and approximate windows are acceptable.
7. Extensions (2 min)
- Distributed top-K across data centers: Each data center maintains local sketches. A global coordinator periodically merges sketches (CMS is additive) and computes global top-K. Cross-DC merge runs every 5-10 seconds.
- Top-K with exclusion lists: Filter out known items (e.g., exclude “the”, “a” from trending search terms). Apply bloom filter before updating sketch to skip excluded items.
- Real-time anomaly detection: Compare current top-K with the same time window last week. If an item’s rank jumps by more than 50 positions, flag it as trending/anomalous. Useful for breaking news detection.
- Per-user personalized top-K: Combine global top-K with user-specific interaction history. Weighted score = α × global_popularity + (1-α) × user_relevance. Requires a lightweight per-user signal (last 100 interactions).
- Accuracy monitoring: Periodically run exact counts on a sample of items and compare with sketch estimates. Track and alert on error distribution. If error exceeds SLA (e.g., > 1% for top-10 items), increase sketch dimensions dynamically.