1. Requirements & Scope (5 min)

Functional Requirements

  1. Monitor health of every node in a cluster (CPU, memory, disk, network, process status) via lightweight agents
  2. Detect node failures within seconds using heartbeat mechanism and classify failure type (node crash, network partition, disk failure, OOM)
  3. Define alerting rules (threshold-based and anomaly-based) and route alerts to on-call teams via multiple channels
  4. Provide a real-time dashboard showing cluster topology, node status, and aggregated metrics
  5. Support auto-remediation actions (restart service, drain node, scale up) triggered by specific failure patterns

Non-Functional Requirements

  • Availability: 99.99% — the monitoring system must be more available than the systems it monitors. A monitoring outage is a blind spot.
  • Latency: Failure detection within 10 seconds. Dashboard data staleness < 30 seconds. Alert delivery < 60 seconds from event.
  • Consistency: Heartbeat status must be accurate (no false positives for node death). Metrics can tolerate eventual consistency (30s lag acceptable).
  • Scale: 100,000 nodes, each emitting ~50 metrics every 10 seconds = 500K metric samples/sec.
  • Durability: Metrics retained at full resolution for 7 days, downsampled (1-min aggregates) for 90 days, further downsampled (1-hour) for 2 years.

2. Estimation (3 min)

Traffic

  • Nodes: 100,000
  • Metrics per node: 50 metrics × 1 sample/10s = 5 samples/sec/node
  • Total ingestion: 100K × 5 = 500,000 samples/sec
  • Heartbeats: 100K nodes × 1 heartbeat/5s = 20,000 heartbeats/sec
  • Dashboard queries: 500 concurrent operators × 1 query/5s = 100 QPS (but each query may scan millions of data points)

Storage

  • Per sample: metric_name (8 bytes hashed) + value (8 bytes) + timestamp (8 bytes) + node_id (8 bytes) + tags (16 bytes) = ~48 bytes
  • Raw data (7 days): 500K samples/sec × 86,400 sec/day × 7 days × 48 bytes = 14.5TB
  • 1-min aggregates (90 days): 100K nodes × 50 metrics × 1440 min/day × 90 days × 48 bytes = 31TB
  • 1-hour aggregates (2 years): 100K nodes × 50 metrics × 24 × 730 × 48 bytes = 4.2TB
  • Total: ~50TB active storage

Key Insight

This is a write-heavy time-series system with a 5000:1 write-to-read ratio. The critical challenge is ingesting 500K samples/sec reliably while simultaneously running failure detection with < 10s latency. Storage must be optimized for time-range queries on specific metrics.


3. API Design (3 min)

Agent → Collector (push-based telemetry)

POST /v1/metrics/ingest
  Body (batch, protobuf-encoded for efficiency):
  {
    "node_id": "node-us-east-1a-42",
    "timestamp": 1708632060,
    "metrics": [
      { "name": "cpu.usage", "value": 78.5, "tags": { "core": "0" } },
      { "name": "mem.used_bytes", "value": 17179869184, "tags": {} },
      { "name": "disk.io.read_bytes", "value": 524288, "tags": { "device": "sda1" } }
    ]
  }

POST /v1/heartbeat
  Body: { "node_id": "node-42", "timestamp": 1708632060, "agent_version": "2.1.0" }
  Response 200: { "ack": true }

Dashboard / Query API

GET /v1/query?metric=cpu.usage&node=node-42&from=1708600000&to=1708632000&step=60s
  Response: { "metric": "cpu.usage", "values": [[1708600000, 45.2], [1708600060, 47.1], ...] }

GET /v1/cluster/status
  Response: { "total_nodes": 100000, "healthy": 99847, "unhealthy": 153,
              "by_status": { "healthy": 99847, "degraded": 98, "down": 43, "unreachable": 12 } }

Alerting Rules API

POST /v1/rules
  Body: {
    "name": "high_cpu_sustained",
    "expr": "avg(cpu.usage{cluster='prod'}) by (node) > 90 for 5m",
    "severity": "warning",
    "notify": ["oncall-infra"],
    "auto_remediate": { "action": "drain_node", "cooldown": "30m" }
  }

Key Decisions

  • Protobuf for metric ingestion (50% smaller than JSON, faster to parse at 500K samples/sec)
  • Push-based model (agents push to collectors) rather than pull-based (Prometheus-style) because at 100K nodes, pull creates thundering herd problems
  • Batch ingestion (agent buffers 10s of metrics, sends one request) to reduce connection overhead

4. Data Model (3 min)

Time-Series Storage (TimescaleDB or ClickHouse)

Table: metrics (hypertable, partitioned by time)
  time             | timestamp       -- partition key (daily chunks)
  node_id          | varchar(100)    -- indexed
  metric_name      | varchar(100)    -- indexed
  value            | double
  tags             | jsonb           -- { "core": "0", "device": "sda1" }

  -- Compression: columnar compression after 1 hour (10x reduction)
  -- Retention policies: auto-drop raw data after 7 days

Aggregated Metrics (materialized views)

Table: metrics_1m (continuous aggregate)
  time_bucket      | timestamp       -- 1-minute buckets
  node_id          | varchar(100)
  metric_name      | varchar(100)
  avg_value        | double
  max_value        | double
  min_value        | double
  count            | int

Table: metrics_1h (continuous aggregate from metrics_1m)
  -- same schema, 1-hour buckets

Node Health State (PostgreSQL)

Table: nodes
  node_id          (PK) | varchar(100)
  cluster_id       (FK) | varchar(50)
  status                | enum('healthy', 'degraded', 'down', 'unreachable', 'maintenance')
  last_heartbeat        | timestamp
  agent_version         | varchar(20)
  metadata              | jsonb         -- { "az": "us-east-1a", "instance_type": "m5.xlarge" }

Table: incidents
  incident_id      (PK) | uuid
  node_id          (FK) | varchar(100)
  failure_type          | enum('node_crash', 'network_partition', 'disk_failure', 'oom', 'cpu_throttle')
  severity              | enum('info', 'warning', 'critical')
  started_at            | timestamp
  resolved_at           | timestamp     -- NULL if ongoing
  auto_remediated       | boolean
  remediation_action    | varchar(100)

Why TimescaleDB (or ClickHouse)?

  • Time-series data has a natural time-ordered write pattern that benefits from time-partitioned storage
  • Columnar compression gives 10-15x storage reduction for metric data (many repeated values)
  • Built-in continuous aggregation for automatic downsampling
  • ClickHouse alternative if query performance at scale matters more than PostgreSQL compatibility

5. High-Level Design (12 min)

Architecture

Cluster Nodes (100K)
  [Agent] → batched metrics/heartbeats every 10s
    → Load Balancer (L4, sticky by node_id hash)
      → Collector Fleet (stateless, 50 instances)
        → Kafka (metrics-raw topic, 128 partitions)
          → Consumer Group 1: Time-Series Writer → TimescaleDB/ClickHouse
          → Consumer Group 2: Alerting Engine (evaluates rules against stream)
          → Consumer Group 3: Anomaly Detector (ML model for unusual patterns)

Heartbeat Path (separate, latency-critical):
  [Agent] → Heartbeat Receiver (dedicated fleet, 20 instances)
    → Heartbeat Store (Redis, key: node_id, value: last_seen, TTL: 30s)
    → Failure Detector (checks for expired heartbeats every 2s)
      → If expired: classify failure → create incident → trigger alert

Dashboard:
  Operator Browser → API Gateway
    → Query Service → TimescaleDB (historical) + Redis (real-time status)
    → WebSocket Server → push live status updates to dashboard

Gossip Protocol for Peer Monitoring

Each node also monitors its k nearest peers (k=3):
  Node A ←→ Node B (bidirectional health check)
  Node A ←→ Node C
  Node A ←→ Node D

If Node B stops responding to Node A, C, and D:
  → All three report B as unreachable
  → Failure Detector has corroborating evidence → high-confidence failure

If only Node A loses contact with Node B:
  → Likely network partition between A and B, not a B failure
  → Failure Detector classifies as "partial network partition"

Components

  1. Agent: Lightweight daemon (< 50MB RAM) on each node. Collects system metrics via /proc, disk I/O, network stats. Sends batched metrics + heartbeats.
  2. Collector Fleet: Stateless HTTP/gRPC servers that accept metric batches, validate, and publish to Kafka. Horizontally scalable.
  3. Kafka: Durability buffer between collection and processing. Decouples ingestion rate from processing rate. 128 partitions keyed by node_id for ordering.
  4. Time-Series Writer: Consumes from Kafka, bulk-inserts into TimescaleDB. Batch size: 10,000 rows per insert for throughput.
  5. Heartbeat Receiver: Dedicated path (not through Kafka) for low-latency heartbeat processing. Writes to Redis with TTL.
  6. Failure Detector: Polls Redis every 2 seconds for expired heartbeats. Uses gossip protocol corroboration to classify failures.
  7. Alerting Engine: Stream processor that evaluates alerting rules against incoming metrics. Supports threshold alerts and rate-of-change alerts.
  8. Query Service: Translates dashboard queries into optimized TimescaleDB queries. Caches hot queries in Redis.
  9. Auto-Remediation Controller: Receives classified incidents, matches against remediation playbooks, executes actions (via Kubernetes API, SSH, or cloud provider API).

6. Deep Dives (15 min)

Deep Dive 1: Failure Detection and Classification

The problem: Distinguishing between “node is down” vs. “network is partitioned” vs. “agent crashed but node is fine” vs. “node is overloaded and heartbeats are delayed.” Each requires a different response.

Multi-signal failure detection:

  1. Heartbeat timeout (primary signal)

    • Agent sends heartbeat every 5 seconds
    • Redis key TTL: 15 seconds (3 missed heartbeats)
    • Failure Detector scans for expired keys every 2 seconds
    • Time to detect: 15-17 seconds worst case
  2. Gossip protocol (corroboration)

    • Each node pings 3 peers every 5 seconds (SWIM-style protocol)
    • If a node fails to respond to peer pings, peers report it to the Failure Detector
    • Quorum: if 2+ peers report a node as unreachable, confidence is high
  3. Classification logic:

if heartbeat_expired AND all_peers_report_unreachable:
    → NODE_CRASH (high confidence)
    → Action: alert + reschedule workloads

if heartbeat_expired AND some_peers_report_unreachable:
    → NETWORK_PARTITION (partial)
    → Action: alert + investigate network path

if heartbeat_expired AND no_peers_report_unreachable:
    → AGENT_CRASH (node is fine, agent died)
    → Action: attempt agent restart via SSH/out-of-band

if heartbeat_alive AND disk_metrics.errors > 0:
    → DISK_FAILURE (degraded)
    → Action: alert + drain node + replace disk

if heartbeat_alive AND mem.available < 100MB AND oom_killer_count > 0:
    → OOM (out of memory)
    → Action: alert + identify memory-hungry process + restart
  1. Preventing false positives:
    • Never declare a node dead based on a single missed heartbeat
    • Require 3 consecutive missed heartbeats (15 seconds)
    • Cross-reference with gossip protocol
    • During network events (e.g., switch maintenance), temporarily increase tolerance to 30 seconds
    • Maintain a “suspected” state before “confirmed down” — gives 10 more seconds for recovery

Flapping prevention:

  • If a node alternates between healthy and unhealthy more than 3 times in 5 minutes, mark as “flapping”
  • Flapping nodes get a different alert (likely hardware issue or unstable network)
  • Suppress individual up/down alerts during flapping, send a single “node is flapping” alert

Deep Dive 2: Alerting Rules Engine

The problem: Evaluating complex alerting rules against 500K metric samples/sec with low latency. Rules can reference windowed aggregates (“avg CPU > 90% for 5 minutes”), rate of change (“disk usage growing > 1GB/hour”), and cross-node conditions (“more than 10% of nodes in cluster X are unhealthy”).

Architecture:

  1. Rule types:
Threshold:     avg(cpu.usage{cluster='prod'}) by (node) > 90 for 5m
Rate:          rate(disk.used_bytes{node='X'}[1h]) > 1073741824
Absence:       absent(heartbeat{node='X'}) for 30s
Composite:     count(node_status == 'down') / count(node_status) > 0.1
  1. Evaluation engine:

    • Rules are compiled into a DAG of operators (filter → aggregate → threshold → alert)
    • Stream processor (Flink or custom) evaluates rules against incoming metric stream
    • Windowed aggregates maintained in state stores (RocksDB-backed)
    • Evaluation interval: every 10 seconds per rule
  2. State management for windowed rules:

Rule: avg(cpu.usage) > 90 for 5m

State per (rule, node):
  window_values: circular buffer of last 30 values (10s intervals × 5min)
  current_avg: running average
  condition_met_since: timestamp when avg first exceeded 90
  alert_fired: boolean

On new sample:
  1. Push to circular buffer, update running average
  2. If avg > 90 AND condition_met_since is NULL:
       → Set condition_met_since = now
  3. If avg > 90 AND now - condition_met_since >= 5min AND NOT alert_fired:
       → FIRE ALERT, set alert_fired = true
  4. If avg <= 90:
       → Reset condition_met_since, set alert_fired = false → RESOLVE
  1. Alert deduplication and grouping:
    • Same alert for same node → deduplicate (only fire once until resolved)
    • If 50 nodes in the same cluster fire the same alert within 1 minute → group into a single “cluster-wide” alert
    • Alert routing: severity-based. Critical → PagerDuty page. Warning → Slack notification. Info → dashboard only.

Deep Dive 3: Auto-Remediation

The problem: Many failures have well-known fixes. Waking up a human at 3 AM to restart a crashed service is wasteful. But auto-remediation must be safe — a buggy remediation loop can cause cascading failures.

Safety guardrails:

1. Rate limiting: max 5 remediations per cluster per 10 minutes
2. Blast radius: never remediate > 5% of nodes in a cluster simultaneously
3. Cooldown: after remediating node X, wait 30 minutes before remediating again
4. Dry-run mode: new rules run in dry-run for 7 days (log action, don't execute)
5. Kill switch: global disable for all auto-remediation (one API call)
6. Audit log: every remediation action logged with before/after state

Playbook example:

name: restart_crashed_service
trigger:
  failure_type: AGENT_CRASH
  confidence: HIGH
  min_peers_reporting: 2
actions:
  - type: ssh_command
    command: "systemctl restart monitoring-agent"
    timeout: 30s
  - type: verify_heartbeat
    wait: 20s
    on_failure:
      - type: drain_node
        reason: "Agent restart failed, possible hardware issue"
      - type: escalate
        severity: critical
        team: infra-oncall

Execution flow:

Incident detected → Match against playbooks → Safety check (rate limit, blast radius)
  → If safe: execute action → wait for verification → update incident
  → If unsafe: skip auto-remediation → escalate to human
  → If action fails: escalate to human with full context

7. Extensions (2 min)

  • Anomaly detection with ML: Train per-node baseline models (e.g., seasonal decomposition) to detect abnormal metric patterns before they trigger static thresholds. Useful for catching slow memory leaks or gradual disk degradation.
  • Distributed tracing correlation: When a node failure is detected, automatically query the distributed tracing system to identify which services and requests were affected. Attach impacted trace IDs to the incident.
  • Capacity planning: Analyze metric trends over weeks/months to predict when nodes will run out of disk/memory. Generate proactive alerts like “Node X will run out of disk in 3 days at current growth rate.”
  • Multi-cluster federation: Aggregate health data across multiple clusters (dev, staging, prod, multiple regions) into a single global dashboard. Each cluster runs its own monitoring stack; a federation layer merges views.
  • Custom metric plugins: Allow teams to write custom agent plugins (in Python/Go) that collect application-specific metrics (e.g., queue depth, cache hit rate) and feed them into the same pipeline.