1. Requirements & Scope (5 min)
Functional Requirements
- Monitor health of every node in a cluster (CPU, memory, disk, network, process status) via lightweight agents
- Detect node failures within seconds using heartbeat mechanism and classify failure type (node crash, network partition, disk failure, OOM)
- Define alerting rules (threshold-based and anomaly-based) and route alerts to on-call teams via multiple channels
- Provide a real-time dashboard showing cluster topology, node status, and aggregated metrics
- Support auto-remediation actions (restart service, drain node, scale up) triggered by specific failure patterns
Non-Functional Requirements
- Availability: 99.99% — the monitoring system must be more available than the systems it monitors. A monitoring outage is a blind spot.
- Latency: Failure detection within 10 seconds. Dashboard data staleness < 30 seconds. Alert delivery < 60 seconds from event.
- Consistency: Heartbeat status must be accurate (no false positives for node death). Metrics can tolerate eventual consistency (30s lag acceptable).
- Scale: 100,000 nodes, each emitting ~50 metrics every 10 seconds = 500K metric samples/sec.
- Durability: Metrics retained at full resolution for 7 days, downsampled (1-min aggregates) for 90 days, further downsampled (1-hour) for 2 years.
2. Estimation (3 min)
Traffic
- Nodes: 100,000
- Metrics per node: 50 metrics × 1 sample/10s = 5 samples/sec/node
- Total ingestion: 100K × 5 = 500,000 samples/sec
- Heartbeats: 100K nodes × 1 heartbeat/5s = 20,000 heartbeats/sec
- Dashboard queries: 500 concurrent operators × 1 query/5s = 100 QPS (but each query may scan millions of data points)
Storage
- Per sample: metric_name (8 bytes hashed) + value (8 bytes) + timestamp (8 bytes) + node_id (8 bytes) + tags (16 bytes) = ~48 bytes
- Raw data (7 days): 500K samples/sec × 86,400 sec/day × 7 days × 48 bytes = 14.5TB
- 1-min aggregates (90 days): 100K nodes × 50 metrics × 1440 min/day × 90 days × 48 bytes = 31TB
- 1-hour aggregates (2 years): 100K nodes × 50 metrics × 24 × 730 × 48 bytes = 4.2TB
- Total: ~50TB active storage
Key Insight
This is a write-heavy time-series system with a 5000:1 write-to-read ratio. The critical challenge is ingesting 500K samples/sec reliably while simultaneously running failure detection with < 10s latency. Storage must be optimized for time-range queries on specific metrics.
3. API Design (3 min)
Agent → Collector (push-based telemetry)
POST /v1/metrics/ingest
Body (batch, protobuf-encoded for efficiency):
{
"node_id": "node-us-east-1a-42",
"timestamp": 1708632060,
"metrics": [
{ "name": "cpu.usage", "value": 78.5, "tags": { "core": "0" } },
{ "name": "mem.used_bytes", "value": 17179869184, "tags": {} },
{ "name": "disk.io.read_bytes", "value": 524288, "tags": { "device": "sda1" } }
]
}
POST /v1/heartbeat
Body: { "node_id": "node-42", "timestamp": 1708632060, "agent_version": "2.1.0" }
Response 200: { "ack": true }
Dashboard / Query API
GET /v1/query?metric=cpu.usage&node=node-42&from=1708600000&to=1708632000&step=60s
Response: { "metric": "cpu.usage", "values": [[1708600000, 45.2], [1708600060, 47.1], ...] }
GET /v1/cluster/status
Response: { "total_nodes": 100000, "healthy": 99847, "unhealthy": 153,
"by_status": { "healthy": 99847, "degraded": 98, "down": 43, "unreachable": 12 } }
Alerting Rules API
POST /v1/rules
Body: {
"name": "high_cpu_sustained",
"expr": "avg(cpu.usage{cluster='prod'}) by (node) > 90 for 5m",
"severity": "warning",
"notify": ["oncall-infra"],
"auto_remediate": { "action": "drain_node", "cooldown": "30m" }
}
Key Decisions
- Protobuf for metric ingestion (50% smaller than JSON, faster to parse at 500K samples/sec)
- Push-based model (agents push to collectors) rather than pull-based (Prometheus-style) because at 100K nodes, pull creates thundering herd problems
- Batch ingestion (agent buffers 10s of metrics, sends one request) to reduce connection overhead
4. Data Model (3 min)
Time-Series Storage (TimescaleDB or ClickHouse)
Table: metrics (hypertable, partitioned by time)
time | timestamp -- partition key (daily chunks)
node_id | varchar(100) -- indexed
metric_name | varchar(100) -- indexed
value | double
tags | jsonb -- { "core": "0", "device": "sda1" }
-- Compression: columnar compression after 1 hour (10x reduction)
-- Retention policies: auto-drop raw data after 7 days
Aggregated Metrics (materialized views)
Table: metrics_1m (continuous aggregate)
time_bucket | timestamp -- 1-minute buckets
node_id | varchar(100)
metric_name | varchar(100)
avg_value | double
max_value | double
min_value | double
count | int
Table: metrics_1h (continuous aggregate from metrics_1m)
-- same schema, 1-hour buckets
Node Health State (PostgreSQL)
Table: nodes
node_id (PK) | varchar(100)
cluster_id (FK) | varchar(50)
status | enum('healthy', 'degraded', 'down', 'unreachable', 'maintenance')
last_heartbeat | timestamp
agent_version | varchar(20)
metadata | jsonb -- { "az": "us-east-1a", "instance_type": "m5.xlarge" }
Table: incidents
incident_id (PK) | uuid
node_id (FK) | varchar(100)
failure_type | enum('node_crash', 'network_partition', 'disk_failure', 'oom', 'cpu_throttle')
severity | enum('info', 'warning', 'critical')
started_at | timestamp
resolved_at | timestamp -- NULL if ongoing
auto_remediated | boolean
remediation_action | varchar(100)
Why TimescaleDB (or ClickHouse)?
- Time-series data has a natural time-ordered write pattern that benefits from time-partitioned storage
- Columnar compression gives 10-15x storage reduction for metric data (many repeated values)
- Built-in continuous aggregation for automatic downsampling
- ClickHouse alternative if query performance at scale matters more than PostgreSQL compatibility
5. High-Level Design (12 min)
Architecture
Cluster Nodes (100K)
[Agent] → batched metrics/heartbeats every 10s
→ Load Balancer (L4, sticky by node_id hash)
→ Collector Fleet (stateless, 50 instances)
→ Kafka (metrics-raw topic, 128 partitions)
→ Consumer Group 1: Time-Series Writer → TimescaleDB/ClickHouse
→ Consumer Group 2: Alerting Engine (evaluates rules against stream)
→ Consumer Group 3: Anomaly Detector (ML model for unusual patterns)
Heartbeat Path (separate, latency-critical):
[Agent] → Heartbeat Receiver (dedicated fleet, 20 instances)
→ Heartbeat Store (Redis, key: node_id, value: last_seen, TTL: 30s)
→ Failure Detector (checks for expired heartbeats every 2s)
→ If expired: classify failure → create incident → trigger alert
Dashboard:
Operator Browser → API Gateway
→ Query Service → TimescaleDB (historical) + Redis (real-time status)
→ WebSocket Server → push live status updates to dashboard
Gossip Protocol for Peer Monitoring
Each node also monitors its k nearest peers (k=3):
Node A ←→ Node B (bidirectional health check)
Node A ←→ Node C
Node A ←→ Node D
If Node B stops responding to Node A, C, and D:
→ All three report B as unreachable
→ Failure Detector has corroborating evidence → high-confidence failure
If only Node A loses contact with Node B:
→ Likely network partition between A and B, not a B failure
→ Failure Detector classifies as "partial network partition"
Components
- Agent: Lightweight daemon (< 50MB RAM) on each node. Collects system metrics via /proc, disk I/O, network stats. Sends batched metrics + heartbeats.
- Collector Fleet: Stateless HTTP/gRPC servers that accept metric batches, validate, and publish to Kafka. Horizontally scalable.
- Kafka: Durability buffer between collection and processing. Decouples ingestion rate from processing rate. 128 partitions keyed by node_id for ordering.
- Time-Series Writer: Consumes from Kafka, bulk-inserts into TimescaleDB. Batch size: 10,000 rows per insert for throughput.
- Heartbeat Receiver: Dedicated path (not through Kafka) for low-latency heartbeat processing. Writes to Redis with TTL.
- Failure Detector: Polls Redis every 2 seconds for expired heartbeats. Uses gossip protocol corroboration to classify failures.
- Alerting Engine: Stream processor that evaluates alerting rules against incoming metrics. Supports threshold alerts and rate-of-change alerts.
- Query Service: Translates dashboard queries into optimized TimescaleDB queries. Caches hot queries in Redis.
- Auto-Remediation Controller: Receives classified incidents, matches against remediation playbooks, executes actions (via Kubernetes API, SSH, or cloud provider API).
6. Deep Dives (15 min)
Deep Dive 1: Failure Detection and Classification
The problem: Distinguishing between “node is down” vs. “network is partitioned” vs. “agent crashed but node is fine” vs. “node is overloaded and heartbeats are delayed.” Each requires a different response.
Multi-signal failure detection:
-
Heartbeat timeout (primary signal)
- Agent sends heartbeat every 5 seconds
- Redis key TTL: 15 seconds (3 missed heartbeats)
- Failure Detector scans for expired keys every 2 seconds
- Time to detect: 15-17 seconds worst case
-
Gossip protocol (corroboration)
- Each node pings 3 peers every 5 seconds (SWIM-style protocol)
- If a node fails to respond to peer pings, peers report it to the Failure Detector
- Quorum: if 2+ peers report a node as unreachable, confidence is high
-
Classification logic:
if heartbeat_expired AND all_peers_report_unreachable:
→ NODE_CRASH (high confidence)
→ Action: alert + reschedule workloads
if heartbeat_expired AND some_peers_report_unreachable:
→ NETWORK_PARTITION (partial)
→ Action: alert + investigate network path
if heartbeat_expired AND no_peers_report_unreachable:
→ AGENT_CRASH (node is fine, agent died)
→ Action: attempt agent restart via SSH/out-of-band
if heartbeat_alive AND disk_metrics.errors > 0:
→ DISK_FAILURE (degraded)
→ Action: alert + drain node + replace disk
if heartbeat_alive AND mem.available < 100MB AND oom_killer_count > 0:
→ OOM (out of memory)
→ Action: alert + identify memory-hungry process + restart
- Preventing false positives:
- Never declare a node dead based on a single missed heartbeat
- Require 3 consecutive missed heartbeats (15 seconds)
- Cross-reference with gossip protocol
- During network events (e.g., switch maintenance), temporarily increase tolerance to 30 seconds
- Maintain a “suspected” state before “confirmed down” — gives 10 more seconds for recovery
Flapping prevention:
- If a node alternates between healthy and unhealthy more than 3 times in 5 minutes, mark as “flapping”
- Flapping nodes get a different alert (likely hardware issue or unstable network)
- Suppress individual up/down alerts during flapping, send a single “node is flapping” alert
Deep Dive 2: Alerting Rules Engine
The problem: Evaluating complex alerting rules against 500K metric samples/sec with low latency. Rules can reference windowed aggregates (“avg CPU > 90% for 5 minutes”), rate of change (“disk usage growing > 1GB/hour”), and cross-node conditions (“more than 10% of nodes in cluster X are unhealthy”).
Architecture:
- Rule types:
Threshold: avg(cpu.usage{cluster='prod'}) by (node) > 90 for 5m
Rate: rate(disk.used_bytes{node='X'}[1h]) > 1073741824
Absence: absent(heartbeat{node='X'}) for 30s
Composite: count(node_status == 'down') / count(node_status) > 0.1
-
Evaluation engine:
- Rules are compiled into a DAG of operators (filter → aggregate → threshold → alert)
- Stream processor (Flink or custom) evaluates rules against incoming metric stream
- Windowed aggregates maintained in state stores (RocksDB-backed)
- Evaluation interval: every 10 seconds per rule
-
State management for windowed rules:
Rule: avg(cpu.usage) > 90 for 5m
State per (rule, node):
window_values: circular buffer of last 30 values (10s intervals × 5min)
current_avg: running average
condition_met_since: timestamp when avg first exceeded 90
alert_fired: boolean
On new sample:
1. Push to circular buffer, update running average
2. If avg > 90 AND condition_met_since is NULL:
→ Set condition_met_since = now
3. If avg > 90 AND now - condition_met_since >= 5min AND NOT alert_fired:
→ FIRE ALERT, set alert_fired = true
4. If avg <= 90:
→ Reset condition_met_since, set alert_fired = false → RESOLVE
- Alert deduplication and grouping:
- Same alert for same node → deduplicate (only fire once until resolved)
- If 50 nodes in the same cluster fire the same alert within 1 minute → group into a single “cluster-wide” alert
- Alert routing: severity-based. Critical → PagerDuty page. Warning → Slack notification. Info → dashboard only.
Deep Dive 3: Auto-Remediation
The problem: Many failures have well-known fixes. Waking up a human at 3 AM to restart a crashed service is wasteful. But auto-remediation must be safe — a buggy remediation loop can cause cascading failures.
Safety guardrails:
1. Rate limiting: max 5 remediations per cluster per 10 minutes
2. Blast radius: never remediate > 5% of nodes in a cluster simultaneously
3. Cooldown: after remediating node X, wait 30 minutes before remediating again
4. Dry-run mode: new rules run in dry-run for 7 days (log action, don't execute)
5. Kill switch: global disable for all auto-remediation (one API call)
6. Audit log: every remediation action logged with before/after state
Playbook example:
name: restart_crashed_service
trigger:
failure_type: AGENT_CRASH
confidence: HIGH
min_peers_reporting: 2
actions:
- type: ssh_command
command: "systemctl restart monitoring-agent"
timeout: 30s
- type: verify_heartbeat
wait: 20s
on_failure:
- type: drain_node
reason: "Agent restart failed, possible hardware issue"
- type: escalate
severity: critical
team: infra-oncall
Execution flow:
Incident detected → Match against playbooks → Safety check (rate limit, blast radius)
→ If safe: execute action → wait for verification → update incident
→ If unsafe: skip auto-remediation → escalate to human
→ If action fails: escalate to human with full context
7. Extensions (2 min)
- Anomaly detection with ML: Train per-node baseline models (e.g., seasonal decomposition) to detect abnormal metric patterns before they trigger static thresholds. Useful for catching slow memory leaks or gradual disk degradation.
- Distributed tracing correlation: When a node failure is detected, automatically query the distributed tracing system to identify which services and requests were affected. Attach impacted trace IDs to the incident.
- Capacity planning: Analyze metric trends over weeks/months to predict when nodes will run out of disk/memory. Generate proactive alerts like “Node X will run out of disk in 3 days at current growth rate.”
- Multi-cluster federation: Aggregate health data across multiple clusters (dev, staging, prod, multiple regions) into a single global dashboard. Each cluster runs its own monitoring stack; a federation layer merges views.
- Custom metric plugins: Allow teams to write custom agent plugins (in Python/Go) that collect application-specific metrics (e.g., queue depth, cache hit rate) and feed them into the same pipeline.