1. Requirements & Scope (5 min)
Functional Requirements
- Collect performance metrics (CPU, memory, disk, network, custom application metrics) from thousands of servers and services
- Store time-series data with configurable retention (high-resolution for recent data, downsampled for historical)
- Support flexible querying: aggregate metrics across dimensions (host, service, region), with functions (avg, p99, sum, rate)
- Real-time alerting: trigger alerts when metrics cross thresholds (e.g., CPU > 90% for 5 minutes)
- Dashboard visualization: support building dashboards with graphs, heatmaps, and tables that auto-refresh
Non-Functional Requirements
- Availability: 99.95% — monitoring must be more reliable than what it monitors. A monitoring outage during a production incident is catastrophic.
- Latency: Metric ingestion: < 5 seconds end-to-end (from source to queryable). Dashboard queries: < 500ms for recent data (last 1 hour), < 2 seconds for historical data (last 30 days).
- Scale: 10,000 hosts, each emitting 500 metrics at 10-second intervals = 500K data points/sec ingestion. 10 million active time series. 10TB of metric data in storage.
- Durability: Metric data must survive single-node failures. 30-day raw retention, 1-year downsampled retention.
- Cardinality: Handle high-cardinality labels gracefully (up to 1M unique label combinations per metric name without degrading query performance).
2. Estimation (3 min)
Ingestion
- 10,000 hosts x 500 metrics x 1 sample/10 sec = 500K samples/sec
- Each sample: metric_name (hashed to 8 bytes) + labels (8 bytes hash) + timestamp (8 bytes) + value (8 bytes) = ~32 bytes
- Ingestion bandwidth: 500K x 32 bytes = 16 MB/sec — modest, not a bottleneck
Storage
- Samples per day: 500K/sec x 86,400 = 43.2 billion samples/day
- Raw storage per day: 43.2B x 16 bytes (timestamp + value, compressed) = 691 GB/day
- With compression (gorilla encoding): ~1.37 bytes per sample → 59 GB/day
- 30-day raw retention: 59 x 30 = 1.8 TB
- 1-year downsampled (5-min resolution): 59 GB/day / 30 (10s → 5min) x 365 = 717 GB
- Total: ~2.5 TB — surprisingly small thanks to time-series compression
Queries
- Active dashboards: 1,000 dashboards, each with 10 panels, refreshing every 30 seconds
- Query QPS: 1,000 x 10 / 30 = ~333 queries/sec
- Each query scans: ~1 hour of data for 100 time series = 100 x 360 samples = 36K samples per query
- Query throughput: 333 x 36K = 12M samples scanned/sec — well within SSD IOPS capacity
3. API Design (3 min)
Ingestion API
// Push metrics (agent → collector)
POST /api/v1/write
Content-Type: application/x-protobuf // or application/json
Body: {
"timeseries": [
{
"labels": {"__name__": "cpu_usage", "host": "web-01", "region": "us-east", "service": "api"},
"samples": [
{"timestamp": 1708632060, "value": 73.5},
{"timestamp": 1708632070, "value": 75.2}
]
},
{
"labels": {"__name__": "http_request_duration_seconds", "method": "GET", "path": "/api/users", "status": "200"},
"samples": [
{"timestamp": 1708632060, "value": 0.042}
]
}
]
}
Response 200: { "samples_written": 3 }
Query API
// PromQL-compatible query
GET /api/v1/query_range
Params:
query = avg(rate(http_request_duration_seconds{service="api"}[5m])) by (path)
start = 1708628460 // 1 hour ago
end = 1708632060 // now
step = 15 // 15-second resolution
Response 200: {
"result_type": "matrix",
"result": [
{
"labels": {"path": "/api/users"},
"values": [[1708628460, "0.035"], [1708628475, "0.038"], ...]
},
{
"labels": {"path": "/api/orders"},
"values": [[1708628460, "0.122"], [1708628475, "0.118"], ...]
}
]
}
Alerting API
// Create alert rule
POST /api/v1/alerts
Body: {
"name": "HighCPU",
"expr": "avg(cpu_usage{service='api'}) by (host) > 90",
"for": "5m", // must be true for 5 minutes
"severity": "critical",
"annotations": {
"summary": "Host {{ $labels.host }} CPU is {{ $value }}%",
"runbook": "https://wiki/runbooks/high-cpu"
},
"notify": ["pagerduty-oncall", "slack-infra"]
}
Key Decisions
- Pull model (Prometheus-style) vs push model: we use push — agents push to collectors. Better for ephemeral/auto-scaled instances that may disappear before being scraped.
- PromQL-compatible query language — industry standard, rich aggregation functions
- Protobuf for ingestion wire format — 50% smaller than JSON, faster serialization
4. Data Model (3 min)
Time Series Identification
A time series is uniquely identified by its label set:
{__name__="http_requests_total", method="GET", path="/api/users", status="200", host="web-01"}
Internal representation:
series_id = hash(sorted_labels) → uint64
This is the primary key for all operations
Storage Format (custom time-series optimized)
On-disk structure (per series, per time block):
┌──────────────────────────────────┐
│ Block (2-hour time window) │
│ ┌──────────────────────────────┐ │
│ │ Series 1: [timestamps][values]│ │
│ │ Series 2: [timestamps][values]│ │
│ │ ... │ │
│ └──────────────────────────────┘ │
│ Index: series_id → offset │
│ Label index: label → series_ids │
│ Tombstones (deletions) │
└──────────────────────────────────┘
Compression (Gorilla encoding):
Timestamps: delta-of-delta encoding
- Most deltas are 0 (fixed 10s interval) → 1 bit per timestamp
- Occasional jitter → few extra bits
- Average: 1-2 bits per timestamp
Values: XOR encoding
- Consecutive values of the same metric are similar
- XOR of consecutive values has many leading/trailing zeros
- Average: 1-2 bytes per value
Result: ~1.37 bytes per sample (vs 16 bytes uncompressed) → 12x compression
Label Index (inverted index)
Label to series mapping (for query filtering):
"service=api" → [series_1, series_2, series_5, series_99, ...]
"host=web-01" → [series_1, series_3, series_7, ...]
"method=GET" → [series_1, series_2, series_3, ...]
Query: http_requests{service="api", method="GET"}
→ Intersect posting lists: [series_1, series_2, series_5, ...] ∩ [series_1, series_2, series_3, ...]
→ Result: [series_1, series_2]
Stored as: sorted arrays with bitmap intersection (Roaring bitmaps for cardinality > 10K)
Downsampled Data
Table: metrics_5min (5-minute aggregates)
series_id | uint64
timestamp | int64 (5-min aligned)
min | float64
max | float64
sum | float64
count | uint32
Computed by background job: raw data → 5-min aggregates → 1-hour aggregates → 1-day aggregates
Each level retains min/max/sum/count so any aggregation can be reconstructed.
Why This Design
- Custom time-series format (not SQL) — relational databases are 100x worse at time-series workloads due to row overhead, index maintenance, and lack of columnar compression.
- Gorilla compression — developed by Facebook for their monitoring system. 12x compression means 10TB logical data fits in ~800GB on disk.
- Inverted label index — enables fast multi-label query filtering without scanning all series. Critical for high-cardinality environments.
5. High-Level Design (12 min)
Architecture
Data Sources (10,000 hosts)
│
│ Each host runs a metrics agent (collectd/telegraf/custom)
│ Agent collects: CPU, memory, disk, network, app-specific metrics
│ Agent pushes every 10 seconds
│
▼
┌──────────────────────────────────────────────────┐
│ Ingestion Layer │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Collector 1 │ │ Collector 2 │ │ Collector N │ │
│ │ (stateless) │ │ │ │ │ │
│ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Kafka Cluster │ │
│ │ (metrics topic, │ │
│ │ 32 partitions) │ │
│ └────────┬────────┘ │
└───────────────────────┼────────────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Storage Node │ │ Storage Node │ │ Storage Node │
│ (Ingester) │ │ (Ingester) │ │ (Ingester) │
│ │ │ │ │ │
│ In-memory │ │ In-memory │ │ In-memory │
│ head block │ │ head block │ │ head block │
│ (last 2 hrs) │ │ (last 2 hrs) │ │ (last 2 hrs) │
│ │ │ │ │ │ │ │ │
│ ▼ │ │ ▼ │ │ ▼ │
│ Persistent │ │ Persistent │ │ Persistent │
│ blocks (SSD) │ │ blocks (SSD) │ │ blocks (SSD) │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└──────────────┼──────────────┘
│
▼
┌─────────────────┐
│ Query Layer │
│ (Queriers, │
│ fan-out to │
│ storage nodes) │
└────────┬────────┘
│
┌────────┴────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Alert Manager │ │ Dashboard │
│ (evaluates │ │ (Grafana, │
│ rules every │ │ custom UI) │
│ 15 seconds) │ │ │
└──────────────┘ └──────────────┘
Ingestion Flow
Agent (on host)
→ Collect metrics every 10 seconds
→ Batch into protobuf payload (~500 metrics = ~16KB)
→ Push to any Collector instance (round-robin via load balancer)
Collector (stateless)
→ Validate payload (label format, value range, cardinality check)
→ Route to correct Kafka partition based on hash(series_id) % 32
→ Respond 200 to agent
Kafka → Storage Node (Ingester)
→ Consume from assigned partitions
→ Append samples to in-memory "head block" (current 2-hour window)
→ WAL for crash recovery
→ Every 2 hours: flush head block to disk as compressed block
→ Replication: 2 ingesters consume same partition (RF=2)
Query Flow
Dashboard query: avg(cpu_usage{service="api"}) by (host) [last 1 hour]
Querier:
1. Parse PromQL expression
2. Label matching: which series match {service="api"}?
→ Query label index on storage nodes → get list of series_ids
3. Time range: last 1 hour → need head block (in-memory) + possibly 1 on-disk block
4. Fan-out: send sub-queries to storage nodes that hold these series
5. Each storage node:
→ Decompress relevant samples
→ Apply rate/avg functions locally
→ Return partial result
6. Querier merges partial results
7. Return to dashboard
Components
- Metrics Agents (10,000): Lightweight daemons on each host. Collect system metrics (CPU, memory, disk, network) and application-specific metrics (HTTP latency, queue depth). Push to collectors. Buffered locally for 1 hour if collectors are unreachable.
- Collectors (10, stateless): Receive pushed metrics, validate, route to Kafka. No state — purely a fan-in and routing layer. Horizontal scaling trivial.
- Kafka: Durability buffer between collectors and storage. Handles burst traffic, provides replay capability. 32 partitions, 3x replication, 24-hour retention.
- Storage Nodes / Ingesters (6-10): Each owns a subset of time series (by hash). In-memory head block for recent data (fast queries), compressed on-disk blocks for historical data. 2TB SSD per node.
- Queriers (stateless): Parse queries, fan-out to storage nodes, merge results. Auto-scaled based on dashboard load.
- Alert Manager: Evaluates alert rules every 15 seconds by running PromQL queries. Deduplication, grouping, silencing, routing to notification channels (PagerDuty, Slack, email).
- Downsampler (background job): Computes 5-min, 1-hour, and 1-day rollups. Runs continuously, processing blocks as they are flushed to disk.
6. Deep Dives (15 min)
Deep Dive 1: Time-Series Compression (Gorilla Encoding)
The problem: At 500K samples/sec, naive storage (16 bytes per sample: 8 bytes timestamp + 8 bytes float64 value) would require 691 GB/day. We need at least 10x compression to keep costs manageable.
Timestamp compression (delta-of-delta):
Observation: Metric samples arrive at near-fixed intervals (every 10 seconds)
Timestamps: 1000, 1010, 1020, 1030, 1041, 1051, ...
Deltas: 10, 10, 10, 11, 10, ...
Delta-of-deltas: 0, 0, 0, 1, -1, ...
Encoding (variable bit length):
- Delta-of-delta = 0: encode as '0' (1 bit) ← ~95% of samples
- |dod| ≤ 63: encode as '10' + 7-bit value (9 bits)
- |dod| ≤ 255: encode as '110' + 9-bit value (12 bits)
- |dod| ≤ 2047: encode as '1110' + 12-bit value (16 bits)
- Else: '1111' + full 32-bit value (36 bits)
Average: ~1.06 bits per timestamp (vs 64 bits uncompressed) → 60x compression
Value compression (XOR encoding):
Observation: Consecutive values of the same metric are similar
CPU usage: 73.5, 73.8, 74.1, 73.9, 74.2, ...
XOR of consecutive float64 values:
73.5 XOR 73.8 = 0x00000000000C0000... (many leading and trailing zeros)
Encoding:
- XOR = 0 (same value): encode as '0' (1 bit)
- XOR has same leading/trailing zeros as previous XOR:
encode as '10' + meaningful_bits (2 bits + N bits)
- Otherwise:
encode as '11' + 5-bit leading_zeros + 6-bit meaningful_length + meaningful_bits
Average: ~8-12 bits per value (vs 64 bits) → 6-8x compression
Combined result:
Per sample: ~1 bit (timestamp) + ~10 bits (value) + overhead = ~1.37 bytes
Compression ratio: 16 bytes / 1.37 bytes ≈ 12x
This is why 43.2 billion samples/day = only 59 GB/day on disk.
Facebook's paper reports 1.37 bytes/sample on real production data.
Block layout for query efficiency:
Each 2-hour block:
┌─────────────────────────────────────┐
│ Block metadata (time range, stats) │
│ ┌─────────────────────────────────┐ │
│ │ Chunk: series_1 timestamps │ │ ← columnar: all timestamps together
│ │ Chunk: series_1 values │ │ ← all values together
│ │ Chunk: series_2 timestamps │ │
│ │ Chunk: series_2 values │ │
│ │ ... │ │
│ └─────────────────────────────────┘ │
│ Index: series_id → chunk offsets │
│ Label index: label → series_ids │
└─────────────────────────────────────┘
Query for series_1 over 30 minutes:
1. Index lookup: series_1 → chunk offset (O(1))
2. Seek to offset, decompress timestamp chunk
3. Binary search for start/end of 30-min range
4. Decompress only the relevant value chunk portion
5. No full-block scan needed
Deep Dive 2: Downsampling and Retention Policies
The problem: 30 days of raw data at 10-second resolution. But most historical queries don’t need 10-second granularity for data from 3 weeks ago. How do we reduce storage while preserving query accuracy?
Multi-resolution storage:
Tier 1 — Raw (10-second resolution):
Retention: 30 days
Storage: 59 GB/day x 30 = 1.8 TB
Use: Recent dashboards, incident investigation, alerting
Tier 2 — 5-minute aggregates:
Retention: 6 months
Storage: 59 GB/day / 30 x 180 = 354 GB
Use: Weekly trends, capacity planning
Tier 3 — 1-hour aggregates:
Retention: 2 years
Storage: 59 GB/day / 360 x 730 = 120 GB
Use: Long-term trends, year-over-year comparisons
Tier 4 — 1-day aggregates:
Retention: 5 years
Storage: 59 GB/day / 8640 x 1825 = 12.5 GB
Use: Historical baselines
Downsampling process:
Every 2-hour block, after flush to disk:
For each series in the block:
1. Group samples into 5-minute windows
2. For each window, compute: min, max, sum, count, last
3. Store as a new series in the 5min tier
Why min/max/sum/count (not just avg)?
- avg(cpu) over 5 min = sum/count → reconstructable
- max(cpu) over 5 min → cannot reconstruct from avg alone
- p99(latency) → stored as separate histogram metrics (not aggregatable)
Example:
Raw: [73.5, 74.1, 89.2, 73.8, 74.0, 73.9, ...] (30 samples in 5 min)
5min aggregate: {min: 73.5, max: 89.2, sum: 2247.6, count: 30}
→ avg = 74.92, range = 15.7
Without keeping min/max, the spike to 89.2 would be invisible in the 5-min aggregate.
Query routing by time range:
Automatic tier selection:
- Query for last 1 hour → raw tier (10s resolution)
- Query for last 7 days → 5-min tier
- Query for last 90 days → 1-hour tier
- Query for last 2 years → 1-day tier
If the query spans multiple tiers:
last 45 days:
- Days 1-30: raw tier (if still available) or 5-min tier
- Days 31-45: 5-min tier
Querier stitches results from both tiers, aligning on step boundaries.
Deep Dive 3: Alerting Pipeline — Reliability and Deduplication
The problem: Alerting is the most critical feature. A missed alert during an outage is worse than having no monitoring at all. How do we make it reliable?
Alert evaluation pipeline:
Alert Rule: avg(cpu_usage{service="api"}) by (host) > 90 FOR 5m
Evaluation loop (every 15 seconds):
1. Execute PromQL query: avg(cpu_usage{service="api"}) by (host)
2. For each result:
If value > 90:
- First breach: start "pending" timer
- If pending for >= 5 minutes: transition to "firing"
- If already firing: continue (do not re-fire)
If value <= 90:
- If was firing: transition to "resolved"
- If was pending: reset timer
State machine:
inactive → pending (threshold breached) → firing (duration met) → resolved (recovered)
↑ │
└────────────────────────────────────┘ (if breached again)
Deduplication and grouping (Alert Manager):
Problem: 100 hosts all fire HighCPU simultaneously → 100 PagerDuty pages → alert fatigue
Grouping:
Group alerts by: {alertname, service}
100 individual HighCPU alerts → 1 grouped notification:
"HighCPU: 100 hosts in service 'api' have CPU > 90%
Hosts: web-01, web-02, ..., web-100"
Deduplication:
- Same alert firing for the same label set → only notify once
- Re-notify if: resolved then re-fired, or user-configured repeat interval (e.g., every 4 hours)
Inhibition:
- If "ClusterDown" is firing, suppress all individual "HostDown" alerts for that cluster
- Reduces noise during cascading failures
Silencing:
- During planned maintenance: silence alerts matching {host=~"web-0[1-5]"} for 2 hours
- Silences are time-bounded and auto-expire
High availability for alerting:
Problem: If the Alert Manager crashes, alerts stop.
Solution: Run 3 Alert Manager instances with Raft consensus
1. All 3 evaluate alert rules independently
2. Use gossip protocol to share alert state
3. Only the leader sends notifications (prevent triple-paging)
4. If leader fails: new leader elected in < 5 seconds
5. Pending/firing state replicated across all 3 instances
Watchdog alert:
- A "DeadMansSwitch" alert that is ALWAYS firing
- If PagerDuty stops receiving the watchdog heartbeat → the monitoring system itself is down
- PagerDuty escalates the absence of the watchdog as a critical incident
7. Extensions (2 min)
- Anomaly detection: Instead of static thresholds (CPU > 90%), use ML models to detect anomalous patterns. Train per-metric models on historical data (seasonal decomposition, ARIMA, isolation forest). Alert when the metric deviates significantly from predicted values. Reduces false alarms for metrics with natural variability.
- Distributed tracing integration: Correlate metrics with traces. When a latency alert fires, automatically link to distributed traces showing slow requests during that window. Requires trace-id → metric label mapping and a trace storage backend (Jaeger/Zipkin).
- Multi-tenant metrics platform: Serve multiple teams from a single cluster. Per-tenant namespacing, quota enforcement (max cardinality, max ingestion rate), and chargeback reporting. Tenant isolation at the storage layer (separate blocks per tenant) to prevent noisy-neighbor issues.
- Exemplars: Attach trace IDs to metric samples at recording time. When a user clicks on a spike in a dashboard graph, jump directly to an example trace from that exact moment. Bridges the gap between aggregate metrics and individual request details.
- Long-term storage tiering to object storage: After local SSD retention expires, move blocks to S3/GCS. Querier transparently queries both local and remote storage. Blocks in S3 are indexed locally (label index cached) but data blocks fetched on-demand. Reduces per-node storage cost by 10x.