1. Requirements & Scope (5 min)

Functional Requirements

  1. Collect performance metrics (CPU, memory, disk, network, custom application metrics) from thousands of servers and services
  2. Store time-series data with configurable retention (high-resolution for recent data, downsampled for historical)
  3. Support flexible querying: aggregate metrics across dimensions (host, service, region), with functions (avg, p99, sum, rate)
  4. Real-time alerting: trigger alerts when metrics cross thresholds (e.g., CPU > 90% for 5 minutes)
  5. Dashboard visualization: support building dashboards with graphs, heatmaps, and tables that auto-refresh

Non-Functional Requirements

  • Availability: 99.95% — monitoring must be more reliable than what it monitors. A monitoring outage during a production incident is catastrophic.
  • Latency: Metric ingestion: < 5 seconds end-to-end (from source to queryable). Dashboard queries: < 500ms for recent data (last 1 hour), < 2 seconds for historical data (last 30 days).
  • Scale: 10,000 hosts, each emitting 500 metrics at 10-second intervals = 500K data points/sec ingestion. 10 million active time series. 10TB of metric data in storage.
  • Durability: Metric data must survive single-node failures. 30-day raw retention, 1-year downsampled retention.
  • Cardinality: Handle high-cardinality labels gracefully (up to 1M unique label combinations per metric name without degrading query performance).

2. Estimation (3 min)

Ingestion

  • 10,000 hosts x 500 metrics x 1 sample/10 sec = 500K samples/sec
  • Each sample: metric_name (hashed to 8 bytes) + labels (8 bytes hash) + timestamp (8 bytes) + value (8 bytes) = ~32 bytes
  • Ingestion bandwidth: 500K x 32 bytes = 16 MB/sec — modest, not a bottleneck

Storage

  • Samples per day: 500K/sec x 86,400 = 43.2 billion samples/day
  • Raw storage per day: 43.2B x 16 bytes (timestamp + value, compressed) = 691 GB/day
    • With compression (gorilla encoding): ~1.37 bytes per sample → 59 GB/day
  • 30-day raw retention: 59 x 30 = 1.8 TB
  • 1-year downsampled (5-min resolution): 59 GB/day / 30 (10s → 5min) x 365 = 717 GB
  • Total: ~2.5 TB — surprisingly small thanks to time-series compression

Queries

  • Active dashboards: 1,000 dashboards, each with 10 panels, refreshing every 30 seconds
  • Query QPS: 1,000 x 10 / 30 = ~333 queries/sec
  • Each query scans: ~1 hour of data for 100 time series = 100 x 360 samples = 36K samples per query
  • Query throughput: 333 x 36K = 12M samples scanned/sec — well within SSD IOPS capacity

3. API Design (3 min)

Ingestion API

// Push metrics (agent → collector)
POST /api/v1/write
  Content-Type: application/x-protobuf    // or application/json
  Body: {
    "timeseries": [
      {
        "labels": {"__name__": "cpu_usage", "host": "web-01", "region": "us-east", "service": "api"},
        "samples": [
          {"timestamp": 1708632060, "value": 73.5},
          {"timestamp": 1708632070, "value": 75.2}
        ]
      },
      {
        "labels": {"__name__": "http_request_duration_seconds", "method": "GET", "path": "/api/users", "status": "200"},
        "samples": [
          {"timestamp": 1708632060, "value": 0.042}
        ]
      }
    ]
  }
  Response 200: { "samples_written": 3 }

Query API

// PromQL-compatible query
GET /api/v1/query_range
  Params:
    query = avg(rate(http_request_duration_seconds{service="api"}[5m])) by (path)
    start = 1708628460    // 1 hour ago
    end = 1708632060      // now
    step = 15             // 15-second resolution

  Response 200: {
    "result_type": "matrix",
    "result": [
      {
        "labels": {"path": "/api/users"},
        "values": [[1708628460, "0.035"], [1708628475, "0.038"], ...]
      },
      {
        "labels": {"path": "/api/orders"},
        "values": [[1708628460, "0.122"], [1708628475, "0.118"], ...]
      }
    ]
  }

Alerting API

// Create alert rule
POST /api/v1/alerts
  Body: {
    "name": "HighCPU",
    "expr": "avg(cpu_usage{service='api'}) by (host) > 90",
    "for": "5m",                    // must be true for 5 minutes
    "severity": "critical",
    "annotations": {
      "summary": "Host {{ $labels.host }} CPU is {{ $value }}%",
      "runbook": "https://wiki/runbooks/high-cpu"
    },
    "notify": ["pagerduty-oncall", "slack-infra"]
  }

Key Decisions

  • Pull model (Prometheus-style) vs push model: we use push — agents push to collectors. Better for ephemeral/auto-scaled instances that may disappear before being scraped.
  • PromQL-compatible query language — industry standard, rich aggregation functions
  • Protobuf for ingestion wire format — 50% smaller than JSON, faster serialization

4. Data Model (3 min)

Time Series Identification

A time series is uniquely identified by its label set:
  {__name__="http_requests_total", method="GET", path="/api/users", status="200", host="web-01"}

Internal representation:
  series_id  = hash(sorted_labels) → uint64
  This is the primary key for all operations

Storage Format (custom time-series optimized)

On-disk structure (per series, per time block):
  ┌──────────────────────────────────┐
  │ Block (2-hour time window)       │
  │ ┌──────────────────────────────┐ │
  │ │ Series 1: [timestamps][values]│ │
  │ │ Series 2: [timestamps][values]│ │
  │ │ ...                           │ │
  │ └──────────────────────────────┘ │
  │ Index: series_id → offset        │
  │ Label index: label → series_ids  │
  │ Tombstones (deletions)           │
  └──────────────────────────────────┘

Compression (Gorilla encoding):
  Timestamps: delta-of-delta encoding
    - Most deltas are 0 (fixed 10s interval) → 1 bit per timestamp
    - Occasional jitter → few extra bits
    - Average: 1-2 bits per timestamp

  Values: XOR encoding
    - Consecutive values of the same metric are similar
    - XOR of consecutive values has many leading/trailing zeros
    - Average: 1-2 bytes per value

  Result: ~1.37 bytes per sample (vs 16 bytes uncompressed) → 12x compression

Label Index (inverted index)

Label to series mapping (for query filtering):
  "service=api"   → [series_1, series_2, series_5, series_99, ...]
  "host=web-01"   → [series_1, series_3, series_7, ...]
  "method=GET"    → [series_1, series_2, series_3, ...]

Query: http_requests{service="api", method="GET"}
  → Intersect posting lists: [series_1, series_2, series_5, ...] ∩ [series_1, series_2, series_3, ...]
  → Result: [series_1, series_2]

Stored as: sorted arrays with bitmap intersection (Roaring bitmaps for cardinality > 10K)

Downsampled Data

Table: metrics_5min (5-minute aggregates)
  series_id       | uint64
  timestamp       | int64 (5-min aligned)
  min             | float64
  max             | float64
  sum             | float64
  count           | uint32

Computed by background job: raw data → 5-min aggregates → 1-hour aggregates → 1-day aggregates
Each level retains min/max/sum/count so any aggregation can be reconstructed.

Why This Design

  • Custom time-series format (not SQL) — relational databases are 100x worse at time-series workloads due to row overhead, index maintenance, and lack of columnar compression.
  • Gorilla compression — developed by Facebook for their monitoring system. 12x compression means 10TB logical data fits in ~800GB on disk.
  • Inverted label index — enables fast multi-label query filtering without scanning all series. Critical for high-cardinality environments.

5. High-Level Design (12 min)

Architecture

Data Sources (10,000 hosts)
  │
  │  Each host runs a metrics agent (collectd/telegraf/custom)
  │  Agent collects: CPU, memory, disk, network, app-specific metrics
  │  Agent pushes every 10 seconds
  │
  ▼
┌──────────────────────────────────────────────────┐
│              Ingestion Layer                        │
│                                                     │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐  │
│  │ Collector 1 │  │ Collector 2 │  │ Collector N │  │
│  │ (stateless) │  │             │  │             │  │
│  └──────┬─────┘  └──────┬─────┘  └──────┬─────┘  │
│         │               │               │         │
│         └───────────────┼───────────────┘         │
│                         │                          │
│                         ▼                          │
│              ┌─────────────────┐                   │
│              │  Kafka Cluster   │                   │
│              │  (metrics topic, │                   │
│              │   32 partitions) │                   │
│              └────────┬────────┘                   │
└───────────────────────┼────────────────────────────┘
                        │
         ┌──────────────┼──────────────┐
         ▼              ▼              ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Storage Node │ │ Storage Node │ │ Storage Node │
│ (Ingester)   │ │ (Ingester)   │ │ (Ingester)   │
│              │ │              │ │              │
│ In-memory    │ │ In-memory    │ │ In-memory    │
│ head block   │ │ head block   │ │ head block   │
│ (last 2 hrs) │ │ (last 2 hrs) │ │ (last 2 hrs) │
│      │       │ │      │       │ │      │       │
│      ▼       │ │      ▼       │ │      ▼       │
│ Persistent   │ │ Persistent   │ │ Persistent   │
│ blocks (SSD) │ │ blocks (SSD) │ │ blocks (SSD) │
└──────────────┘ └──────────────┘ └──────────────┘
         │              │              │
         └──────────────┼──────────────┘
                        │
                        ▼
              ┌─────────────────┐
              │  Query Layer     │
              │  (Queriers,      │
              │   fan-out to     │
              │   storage nodes) │
              └────────┬────────┘
                       │
              ┌────────┴────────┐
              ▼                 ▼
     ┌──────────────┐  ┌──────────────┐
     │ Alert Manager │  │ Dashboard    │
     │ (evaluates    │  │ (Grafana,    │
     │  rules every  │  │  custom UI)  │
     │  15 seconds)  │  │              │
     └──────────────┘  └──────────────┘

Ingestion Flow

Agent (on host)
  → Collect metrics every 10 seconds
  → Batch into protobuf payload (~500 metrics = ~16KB)
  → Push to any Collector instance (round-robin via load balancer)

Collector (stateless)
  → Validate payload (label format, value range, cardinality check)
  → Route to correct Kafka partition based on hash(series_id) % 32
  → Respond 200 to agent

Kafka → Storage Node (Ingester)
  → Consume from assigned partitions
  → Append samples to in-memory "head block" (current 2-hour window)
  → WAL for crash recovery
  → Every 2 hours: flush head block to disk as compressed block
  → Replication: 2 ingesters consume same partition (RF=2)

Query Flow

Dashboard query: avg(cpu_usage{service="api"}) by (host) [last 1 hour]

Querier:
  1. Parse PromQL expression
  2. Label matching: which series match {service="api"}?
     → Query label index on storage nodes → get list of series_ids
  3. Time range: last 1 hour → need head block (in-memory) + possibly 1 on-disk block
  4. Fan-out: send sub-queries to storage nodes that hold these series
  5. Each storage node:
     → Decompress relevant samples
     → Apply rate/avg functions locally
     → Return partial result
  6. Querier merges partial results
  7. Return to dashboard

Components

  1. Metrics Agents (10,000): Lightweight daemons on each host. Collect system metrics (CPU, memory, disk, network) and application-specific metrics (HTTP latency, queue depth). Push to collectors. Buffered locally for 1 hour if collectors are unreachable.
  2. Collectors (10, stateless): Receive pushed metrics, validate, route to Kafka. No state — purely a fan-in and routing layer. Horizontal scaling trivial.
  3. Kafka: Durability buffer between collectors and storage. Handles burst traffic, provides replay capability. 32 partitions, 3x replication, 24-hour retention.
  4. Storage Nodes / Ingesters (6-10): Each owns a subset of time series (by hash). In-memory head block for recent data (fast queries), compressed on-disk blocks for historical data. 2TB SSD per node.
  5. Queriers (stateless): Parse queries, fan-out to storage nodes, merge results. Auto-scaled based on dashboard load.
  6. Alert Manager: Evaluates alert rules every 15 seconds by running PromQL queries. Deduplication, grouping, silencing, routing to notification channels (PagerDuty, Slack, email).
  7. Downsampler (background job): Computes 5-min, 1-hour, and 1-day rollups. Runs continuously, processing blocks as they are flushed to disk.

6. Deep Dives (15 min)

Deep Dive 1: Time-Series Compression (Gorilla Encoding)

The problem: At 500K samples/sec, naive storage (16 bytes per sample: 8 bytes timestamp + 8 bytes float64 value) would require 691 GB/day. We need at least 10x compression to keep costs manageable.

Timestamp compression (delta-of-delta):

Observation: Metric samples arrive at near-fixed intervals (every 10 seconds)
  Timestamps: 1000, 1010, 1020, 1030, 1041, 1051, ...
  Deltas:     10, 10, 10, 11, 10, ...
  Delta-of-deltas: 0, 0, 0, 1, -1, ...

Encoding (variable bit length):
  - Delta-of-delta = 0: encode as '0' (1 bit)           ← ~95% of samples
  - |dod| ≤ 63: encode as '10' + 7-bit value (9 bits)
  - |dod| ≤ 255: encode as '110' + 9-bit value (12 bits)
  - |dod| ≤ 2047: encode as '1110' + 12-bit value (16 bits)
  - Else: '1111' + full 32-bit value (36 bits)

Average: ~1.06 bits per timestamp (vs 64 bits uncompressed) → 60x compression

Value compression (XOR encoding):

Observation: Consecutive values of the same metric are similar
  CPU usage: 73.5, 73.8, 74.1, 73.9, 74.2, ...

XOR of consecutive float64 values:
  73.5 XOR 73.8 = 0x00000000000C0000...  (many leading and trailing zeros)

Encoding:
  - XOR = 0 (same value): encode as '0' (1 bit)
  - XOR has same leading/trailing zeros as previous XOR:
    encode as '10' + meaningful_bits (2 bits + N bits)
  - Otherwise:
    encode as '11' + 5-bit leading_zeros + 6-bit meaningful_length + meaningful_bits

Average: ~8-12 bits per value (vs 64 bits) → 6-8x compression

Combined result:

Per sample: ~1 bit (timestamp) + ~10 bits (value) + overhead = ~1.37 bytes
Compression ratio: 16 bytes / 1.37 bytes ≈ 12x

This is why 43.2 billion samples/day = only 59 GB/day on disk.
Facebook's paper reports 1.37 bytes/sample on real production data.

Block layout for query efficiency:

Each 2-hour block:
  ┌─────────────────────────────────────┐
  │ Block metadata (time range, stats)  │
  │ ┌─────────────────────────────────┐ │
  │ │ Chunk: series_1 timestamps      │ │  ← columnar: all timestamps together
  │ │ Chunk: series_1 values          │ │  ← all values together
  │ │ Chunk: series_2 timestamps      │ │
  │ │ Chunk: series_2 values          │ │
  │ │ ...                             │ │
  │ └─────────────────────────────────┘ │
  │ Index: series_id → chunk offsets    │
  │ Label index: label → series_ids     │
  └─────────────────────────────────────┘

Query for series_1 over 30 minutes:
  1. Index lookup: series_1 → chunk offset (O(1))
  2. Seek to offset, decompress timestamp chunk
  3. Binary search for start/end of 30-min range
  4. Decompress only the relevant value chunk portion
  5. No full-block scan needed

Deep Dive 2: Downsampling and Retention Policies

The problem: 30 days of raw data at 10-second resolution. But most historical queries don’t need 10-second granularity for data from 3 weeks ago. How do we reduce storage while preserving query accuracy?

Multi-resolution storage:

Tier 1 — Raw (10-second resolution):
  Retention: 30 days
  Storage: 59 GB/day x 30 = 1.8 TB
  Use: Recent dashboards, incident investigation, alerting

Tier 2 — 5-minute aggregates:
  Retention: 6 months
  Storage: 59 GB/day / 30 x 180 = 354 GB
  Use: Weekly trends, capacity planning

Tier 3 — 1-hour aggregates:
  Retention: 2 years
  Storage: 59 GB/day / 360 x 730 = 120 GB
  Use: Long-term trends, year-over-year comparisons

Tier 4 — 1-day aggregates:
  Retention: 5 years
  Storage: 59 GB/day / 8640 x 1825 = 12.5 GB
  Use: Historical baselines

Downsampling process:

Every 2-hour block, after flush to disk:
  For each series in the block:
    1. Group samples into 5-minute windows
    2. For each window, compute: min, max, sum, count, last
    3. Store as a new series in the 5min tier

Why min/max/sum/count (not just avg)?
  - avg(cpu) over 5 min = sum/count → reconstructable
  - max(cpu) over 5 min → cannot reconstruct from avg alone
  - p99(latency) → stored as separate histogram metrics (not aggregatable)

Example:
  Raw: [73.5, 74.1, 89.2, 73.8, 74.0, 73.9, ...]  (30 samples in 5 min)
  5min aggregate: {min: 73.5, max: 89.2, sum: 2247.6, count: 30}
  → avg = 74.92, range = 15.7

Without keeping min/max, the spike to 89.2 would be invisible in the 5-min aggregate.

Query routing by time range:

Automatic tier selection:
  - Query for last 1 hour → raw tier (10s resolution)
  - Query for last 7 days → 5-min tier
  - Query for last 90 days → 1-hour tier
  - Query for last 2 years → 1-day tier

If the query spans multiple tiers:
  last 45 days:
    - Days 1-30: raw tier (if still available) or 5-min tier
    - Days 31-45: 5-min tier
  Querier stitches results from both tiers, aligning on step boundaries.

Deep Dive 3: Alerting Pipeline — Reliability and Deduplication

The problem: Alerting is the most critical feature. A missed alert during an outage is worse than having no monitoring at all. How do we make it reliable?

Alert evaluation pipeline:

Alert Rule: avg(cpu_usage{service="api"}) by (host) > 90 FOR 5m

Evaluation loop (every 15 seconds):
  1. Execute PromQL query: avg(cpu_usage{service="api"}) by (host)
  2. For each result:
     If value > 90:
       - First breach: start "pending" timer
       - If pending for >= 5 minutes: transition to "firing"
       - If already firing: continue (do not re-fire)
     If value <= 90:
       - If was firing: transition to "resolved"
       - If was pending: reset timer

State machine:
  inactive → pending (threshold breached) → firing (duration met) → resolved (recovered)
                ↑                                    │
                └────────────────────────────────────┘ (if breached again)

Deduplication and grouping (Alert Manager):

Problem: 100 hosts all fire HighCPU simultaneously → 100 PagerDuty pages → alert fatigue

Grouping:
  Group alerts by: {alertname, service}
  100 individual HighCPU alerts → 1 grouped notification:
    "HighCPU: 100 hosts in service 'api' have CPU > 90%
     Hosts: web-01, web-02, ..., web-100"

Deduplication:
  - Same alert firing for the same label set → only notify once
  - Re-notify if: resolved then re-fired, or user-configured repeat interval (e.g., every 4 hours)

Inhibition:
  - If "ClusterDown" is firing, suppress all individual "HostDown" alerts for that cluster
  - Reduces noise during cascading failures

Silencing:
  - During planned maintenance: silence alerts matching {host=~"web-0[1-5]"} for 2 hours
  - Silences are time-bounded and auto-expire

High availability for alerting:

Problem: If the Alert Manager crashes, alerts stop.

Solution: Run 3 Alert Manager instances with Raft consensus
  1. All 3 evaluate alert rules independently
  2. Use gossip protocol to share alert state
  3. Only the leader sends notifications (prevent triple-paging)
  4. If leader fails: new leader elected in < 5 seconds
  5. Pending/firing state replicated across all 3 instances

Watchdog alert:
  - A "DeadMansSwitch" alert that is ALWAYS firing
  - If PagerDuty stops receiving the watchdog heartbeat → the monitoring system itself is down
  - PagerDuty escalates the absence of the watchdog as a critical incident

7. Extensions (2 min)

  • Anomaly detection: Instead of static thresholds (CPU > 90%), use ML models to detect anomalous patterns. Train per-metric models on historical data (seasonal decomposition, ARIMA, isolation forest). Alert when the metric deviates significantly from predicted values. Reduces false alarms for metrics with natural variability.
  • Distributed tracing integration: Correlate metrics with traces. When a latency alert fires, automatically link to distributed traces showing slow requests during that window. Requires trace-id → metric label mapping and a trace storage backend (Jaeger/Zipkin).
  • Multi-tenant metrics platform: Serve multiple teams from a single cluster. Per-tenant namespacing, quota enforcement (max cardinality, max ingestion rate), and chargeback reporting. Tenant isolation at the storage layer (separate blocks per tenant) to prevent noisy-neighbor issues.
  • Exemplars: Attach trace IDs to metric samples at recording time. When a user clicks on a spike in a dashboard graph, jump directly to an example trace from that exact moment. Bridges the gap between aggregate metrics and individual request details.
  • Long-term storage tiering to object storage: After local SSD retention expires, move blocks to S3/GCS. Querier transparently queries both local and remote storage. Blocks in S3 are indexed locally (label index cached) but data blocks fetched on-demand. Reduces per-node storage cost by 10x.