Design Walkthrough
Problem Statement
The Question: Design a system to monitor the health of 100,000+ nodes in a distributed cluster, detecting failures within 60 seconds and providing dashboards for debugging.
Cluster health monitoring must: - Detect failures quickly: Node crashes, process deaths, resource exhaustion - Collect metrics: CPU, memory, disk, network, application-specific - Alert on anomalies: Not just thresholds, but unusual patterns - Provide visibility: Dashboards for debugging and capacity planning
What to say first
Before designing, I want to clarify: What types of health checks are needed? What is the detection SLA? And importantly, how should the monitoring system itself be monitored?
Hidden requirements interviewers test: - Do you understand the irony that the monitoring system must be more reliable than what it monitors? - Can you handle the scale of metrics (millions per second)? - Do you know the difference between push and pull models? - Can you reason about distributed failure detection?
Clarifying Questions
Ask these questions to demonstrate senior thinking. Each shapes your architecture.
Question 1: Scale
How many nodes are we monitoring? How many metrics per node? What is the scrape/collection interval?
Why this matters: Determines storage and ingestion architecture. Typical answer: 100K nodes, 500 metrics per node, 15-second intervals Architecture impact: 100K x 500 / 15 = 3.3 million metrics per second ingestion
Question 2: Detection SLA
How quickly must we detect a node failure? Is 60 seconds acceptable, or do we need sub-10-second detection?
Why this matters: Sub-10-second requires different architecture (streaming vs batch). Typical answer: 60 seconds for most failures, faster for critical services Architecture impact: Can use polling with reasonable intervals, no need for streaming for basic health
Question 3: Push vs Pull
Should nodes push metrics to collectors, or should collectors pull from nodes?
Why this matters: Fundamental architectural decision. Typical answer: Pull is preferred for control, but push for ephemeral nodes Architecture impact: Pull requires service discovery; push requires backpressure handling
Question 4: Retention
How long do we need to retain metrics? Full resolution vs downsampled?
Why this matters: Storage costs grow linearly with retention. Typical answer: 15 days full resolution, 1 year downsampled Architecture impact: Need tiered storage with automatic downsampling
Stating assumptions
I will assume: 100K nodes, 500 metrics each, 15-second intervals (3.3M metrics/sec), 60-second detection SLA, pull-based with push for ephemeral, 15-day full retention.
The Hard Part
Say this out loud
The hard part here is distinguishing between a node being actually down versus the network path to the monitoring system being broken. We cannot have a single point of observation.
Why this is genuinely hard:
- 1.Observer Problem: If monitor A cannot reach node X, is X down, or is the network between A and X broken?
- 2.Scale of Ingestion: 3.3 million metrics per second is non-trivial. Each metric needs to be parsed, validated, stored, and indexed.
- 3.Alert Fatigue: Too many alerts and operators ignore them. Too few and real issues are missed. Finding the balance is hard.
- 4.Monitoring the Monitor: The monitoring system must be more reliable than what it monitors. How do you monitor the monitoring system?
Common mistake
Candidates design a single monitoring server that polls all nodes. This creates a single point of failure - if that server goes down, all monitoring is blind.
The fundamental insight:
You need multiple independent observers to distinguish node failure from network partition:
- If Observer A and Observer B both cannot reach Node X -> Node X is likely down - If only Observer A cannot reach Node X -> Network issue between A and X
This is why production systems use multiple monitoring paths.
Scale and Access Patterns
Let me estimate the scale and understand access patterns.
| Dimension | Value | Impact |
|---|---|---|
| Nodes monitored | 100,000 | Need distributed collectors |
| Metrics per node | 500 | 50 million unique time series |
What to say
At 3.3 million metrics per second, we need a distributed ingestion layer. The storage is 14 TB per day at full resolution, which requires time-series optimized storage with compression.
Access Pattern Analysis:
- Write-heavy: 3.3M writes/sec vs maybe 1000 queries/sec (3000:1 ratio) - Time-series data: Always queried by time range + metric name + labels - Recent bias: 90% of queries are for last 1 hour - High cardinality: 50 million unique time series (node x metric combinations) - Bursty reads: During incidents, query load spikes 100x
Ingestion rate:
100K nodes x 500 metrics / 15 sec = 3,333,333 samples/sec
High-Level Architecture
Let me start with the core components and then detail each.
What to say
I will design a pull-based system with distributed collectors. We scale by sharding nodes across collectors based on consistent hashing. Each collector is responsible for a subset of targets.
Cluster Health Monitoring Architecture
Component Responsibilities:
- 1.Service Discovery: Knows all nodes that should be monitored. Collectors query this to get their targets.
- 2.Collectors: Pull metrics from assigned nodes. Stateless, horizontally scalable. Use consistent hashing to determine ownership.
- 3.Ingestion Queue (Kafka): Buffers metrics between collection and storage. Handles backpressure during spikes.
- 4.Time-Series Database: Stores metrics with timestamps. Optimized for time-range queries. Sharded by metric name hash.
- 5.Query Engine: Executes PromQL/similar queries. Aggregates across shards. Caches frequent queries.
- 6.Alert Rules Engine: Evaluates alert conditions continuously. Fires alerts when thresholds breached.
- 7.Alert Manager: Deduplicates, groups, and routes alerts. Handles silencing and escalation.
Real-world reference
This is similar to how Prometheus + Thanos works. Prometheus collectors scrape targets, Thanos provides long-term storage and global querying across multiple Prometheus instances.
Data Model and Storage
Time-series data has unique characteristics that require specialized storage.
What to say
The data model is metric name plus labels plus timestamp plus value. We store in a time-series database optimized for append-only writes and time-range queries.
Metric format (OpenMetrics/Prometheus style):
metric_name{label1="value1", label2="value2"} value timestampStorage Schema Design:
-- Time-series table (conceptual - actual TSDBs use columnar storage)
CREATE TABLE metrics (
metric_id BIGINT, -- Hash of metric name + labelsWhy Time-Series Databases (not regular DBs):
- 1.Compression: Delta-of-delta encoding for timestamps, XOR for values. 10-20x compression.
- 2.Write optimization: Append-only, no updates. Batch writes to chunks.
- 3.Time partitioning: Auto-drop old partitions instead of DELETE.
- 4.Downsampling: Built-in aggregation for long-term storage.
Tier 1: Hot (0-2 hours)
- In-memory + WAL
- Full resolution (15-second)Important detail
High cardinality labels (like request_id or user_id) can explode storage. Always validate labels and reject unbounded cardinality at ingestion.
Health Check Deep Dive
There are multiple types of health checks, each with different characteristics.
| Check Type | What It Tests | Frequency | Failure Response |
|---|---|---|---|
| Liveness | Is the process running? | 10-15 sec | Restart container |
| Readiness | Can it serve traffic? | 5-10 sec | Remove from load balancer |
| Startup | Has it finished initializing? | Once | Wait or fail deployment |
| Deep health | Are dependencies healthy? | 30-60 sec | Alert, do not auto-restart |
| Synthetic | End-to-end user journey | 1-5 min | Page on-call |
Pull-based Health Check Flow:
Health Check State Machine
class NodeHealthChecker:
def __init__(self, node_id: str):
self.node_id = node_idWhy consecutive failures?
A single failed health check could be network blip. Requiring N consecutive failures (typically 2-3) reduces false positives while keeping detection time reasonable.
Distributed Failure Detection:
To handle the observer problem, use multiple independent checkers:
def determine_node_status(node_id: str, observers: List[str]) -> str:
"""
Query multiple observers to determine true node status.Consistency and Invariants
System Invariants
Must detect actual node failures within 60 seconds. False negative rate (missing real failures) must be below 0.1%. False positive rate should be below 1% to avoid alert fatigue.
Why these specific numbers?
- 60-second detection: Balances quick detection with avoiding false positives. 4 check cycles at 15-second intervals.
- 0.1% false negative: Missing 1 in 1000 real failures is too many for production. Operators must trust the system.
- 1% false positive: More tolerance here because humans can dismiss false alerts, but cannot resurrect services they did not know were down.
| Scenario | What Happens | Acceptable? |
|---|---|---|
| Node down, detected in 45s | Alert fires, on-call responds | Yes - within SLA |
| Node down, detected in 90s | Alert fires late | No - SLA violation |
| Node healthy, alert fires | False positive, wasted investigation | Tolerable if rare |
| Node down, no alert | False negative, extended outage | Never acceptable |
| Network blip, brief alert | Flapping alert | Bad UX, need hysteresis |
Business impact mapping
A missed failure (false negative) means extended customer impact. A false alert means waking someone up unnecessarily. Both are bad, but missing real failures is worse for the business.
Consistency Model for Metrics:
Metrics storage uses eventual consistency:
- Writes may take 1-5 seconds to be queryable - Queries across shards may see slightly different timestamps - This is acceptable because dashboards refresh every 15-30 seconds anyway
Alerting uses stronger consistency:
- Alert rules evaluated on single shard (no cross-shard race conditions) - Alert state machine is consistent within single alert manager - Multi-observer consensus for critical alerts
Failure Modes and Resilience
Proactively discuss failures
The monitoring system failing is catastrophic - you are blind to all other failures. Let me discuss how we handle monitoring system failures.
| Failure | Impact | Mitigation | Why It Works |
|---|---|---|---|
| Collector down | Gap in metrics for its targets | Consistent hashing with failover | Other collectors take over orphaned targets |
| TSDB shard down | Cannot write/query that shard | Replication + write-ahead log | Replica serves reads, WAL recovers writes |
Monitoring the Monitoring System:
The hardest problem - how do you know if your monitoring is broken?
1. Synthetic Canaries
- Deploy fake nodes that always respond healthy
- If monitoring says canary is unhealthy -> monitoring is brokenWhat to say
I use a deadman switch pattern: the monitoring system must actively send heartbeats to an external service. If heartbeats stop, alert via a completely separate path like SMS gateway.
Meta-Monitoring Architecture
Evolution and Scaling
What to say
This design works well for 100K nodes in a single region. Let me discuss how it evolves for multi-region and 10x scale.
Evolution Path:
Stage 1: Single Prometheus (up to 10K nodes) - Simple, well-understood - Single point of failure - Query everything directly
Stage 2: Sharded Prometheus + Thanos (up to 500K nodes) - Multiple Prometheus instances, each with subset of targets - Thanos for global querying and long-term storage - This is our current design
Stage 3: Streaming Architecture (millions of nodes) - Replace pull with push (agents push to Kafka) - Stream processing for real-time alerting (Flink/Spark) - Separate hot path (alerting) from cold path (dashboards)
Multi-Region Architecture
Multi-Region Considerations:
- 1.Data Locality: Keep metrics in region where nodes live. Cross-region queries are slow.
- 2.Global Aggregates: Some queries need global view (total CPU across all regions). Use federated queries or replicate to global store.
- 3.Alert Routing: Route alerts to on-call in the affected region first.
- 4.Failover: If a regional monitoring system fails, can another region take over temporarily?
Alternative approach
If I needed sub-second alerting, I would add a streaming layer. Agents push to Kafka, Flink evaluates alert rules on the stream, alerts fire without waiting for storage. Dashboards still query storage.
| Scale | Architecture | Tradeoff |
|---|---|---|
| 10K nodes | Single Prometheus | Simple but single point of failure |
| 100K nodes | Sharded Prometheus + Thanos | Complex but scalable |
| 1M nodes | Push + Streaming + TSDB | Very complex, sub-second alerting |
| 10M nodes | Hierarchical aggregation | Sampling, approximate queries |
What I would do differently for...
Sub-second alerting: Add stream processing (Kafka + Flink) for hot path alerts. Storage query is too slow.
Cost optimization: Aggressive downsampling, move old data to S3, query S3 directly for historical.
Multi-tenant: Shard by tenant, enforce quotas, prevent noisy neighbor (one tenant overwhelming system).