Design Walkthrough
Problem Statement
The Question: Design a distributed metrics and logging system that can ingest petabytes of data per day from thousands of sources, store it cost-effectively, and support real-time dashboards and alerting.
Metrics systems are essential for: - Observability: Understanding system behavior through metrics, logs, and traces - Alerting: Detecting anomalies and triggering notifications - Debugging: Investigating incidents with historical data - Capacity Planning: Analyzing trends for infrastructure decisions - Business Intelligence: Tracking KPIs and user behavior
What to say first
Before I design, let me clarify the types of data we are handling - metrics vs logs vs traces have very different characteristics. I also want to understand the query patterns and retention requirements.
Hidden requirements interviewers are testing: - Do you understand time-series data characteristics? - Can you design for extreme write throughput? - Do you know about cardinality and its impact? - Can you balance storage cost vs query performance? - Do you understand aggregation and rollup strategies?
Clarifying Questions
Ask these questions to shape your architecture. Each answer significantly impacts design decisions.
Question 1: Data Types
Are we handling metrics (numeric time-series), logs (text events), or both? What about traces?
Why this matters: Metrics are fixed-schema numeric data, logs are variable-schema text. Very different storage and query patterns. Typical answer: Primarily metrics with some structured logs Architecture impact: Optimize for time-series storage, add log indexing as secondary concern
Question 2: Scale
How many metrics per second? How many unique time series (cardinality)? How much log data per day?
Why this matters: Cardinality (unique series count) is often the limiting factor, not raw throughput. Typical answer: 10M metrics/sec, 100M unique series, 10TB logs/day Architecture impact: Need distributed storage, cardinality limits, log sampling
Question 3: Query Patterns
What are the common query patterns? Real-time dashboards? Ad-hoc investigation? Alerting queries?
Why this matters: Determines indexing strategy and pre-aggregation needs. Typical answer: 80% dashboard queries (known patterns), 20% ad-hoc Architecture impact: Pre-aggregate common queries, keep raw data for ad-hoc
Question 4: Retention
How long do we keep high-resolution data? What about aggregated data? Compliance requirements?
Why this matters: Storage costs grow linearly with retention. Aggregation reduces long-term storage. Typical answer: 7 days high-res, 90 days hourly, 2 years daily Architecture impact: Multi-tier storage with automatic rollup and downsampling
Stating assumptions
I will assume: 10M metrics/sec, 100M unique time series, 10TB logs/day, 7-day high-resolution retention with rollups to 2 years, 80% dashboard queries with sub-5-second latency requirement.
The Hard Part
Say this out loud
The hard part here is cardinality explosion. When users add high-cardinality dimensions like user_id or request_id to metrics, the number of unique time series explodes, breaking storage and query performance.
Why cardinality is the real challenge:
Consider a simple metric: http_requests_total
With dimensions: method, status, endpoint, host - 5 methods x 10 statuses x 100 endpoints x 1000 hosts = 5 million series
Now add user_id with 10 million users: - 5 million x 10 million = 50 TRILLION series (impossible)
This is called cardinality explosion.
Low cardinality (1K series):
Storage: ~100 MB/day
Query: <100msCommon mistake
Candidates focus only on write throughput. The real problem is managing cardinality while still allowing flexible querying. Unlimited dimensions make the system unusable.
Other hard problems:
- 1.Write Amplification: Each data point may be written multiple times (replication, indexing, aggregation)
- 2.Query Performance: Scanning billions of points for a dashboard query
- 3.Hot Spots: Popular metrics (like CPU usage) accessed constantly
- 4.Clock Skew: Data arriving out of order or with future timestamps
- 5.Backpressure: Handling ingestion spikes without dropping data
Scale and Access Patterns
Let me quantify the scale and understand access patterns.
| Dimension | Value | Impact |
|---|---|---|
| Metrics ingestion | 10M points/sec | Need distributed write path, batching |
| Unique time series | 100M series | Index size ~100GB, fits in memory cluster |
What to say
At 10M metrics per second, we need distributed ingestion. But the real constraint is 100M unique series - that determines our index size and query complexity. This is a write-heavy system with read amplification during queries.
Access Pattern Analysis:
Writes: - Append-only (metrics never updated) - Highly parallelizable by metric/host - Bursty (deploy events, incidents) - Must handle backpressure gracefully
Reads: - Time-range queries (last 1h, 24h, 7d) - Aggregation queries (avg, sum, percentiles) - High-cardinality filters (specific host, service) - Dashboard queries are repetitive (cacheable) - Alert queries run constantly (every 30-60 sec)
Raw data (7 days high-res, 10-sec intervals):
10M points/sec x 50 bytes x 86400 sec x 7 days
= 3 PB raw (before compression)High-Level Architecture
Let me design the architecture in layers: collection, transport, storage, and query.
What to say
I will organize this into four layers: collection agents on each host, a transport layer for reliable delivery, a storage layer optimized for time-series, and a query layer for dashboards and alerts.
Metrics System Architecture
Layer Responsibilities:
1. Collection Layer (Agents) - Runs on every host - Collects metrics from applications and system - Batches and compresses before sending - Handles local buffering during network issues
2. Transport Layer (Kafka) - Decouples collection from storage - Handles backpressure via buffering - Enables replay for recovery - Partitions by metric name for parallelism
3. Processing Layer (Ingesters) - Consumes from Kafka - Validates and normalizes data - Writes to hot storage - Feeds real-time aggregation
4. Storage Layer (Tiered) - Hot: Recent data, fast queries - Warm: Aggregated data, medium latency - Cold: Archived data, slow but cheap
5. Query Layer - Parses and optimizes queries - Routes to appropriate storage tier - Caches frequent dashboard queries
Real-world reference
This architecture mirrors Datadog (agents -> intake -> storage -> query), Prometheus with Thanos (local -> object storage -> query), and internal systems at Google (Monarch) and Facebook (Gorilla + ODS).
Data Model and Storage
Time-series data has unique characteristics that require specialized storage.
What to say
Time-series data is append-only, accessed by time range, and highly compressible. I will use a columnar format optimized for these patterns, with separate indexes for metric names and labels.
Data Model:
A metric data point consists of: - Metric name: e.g., http_requests_total - Labels/Tags: key-value pairs for dimensions - Timestamp: Unix timestamp (milliseconds) - Value: 64-bit float or integer
{
"metric": "http_requests_total",
"labels": {Storage Format - Time-Structured Merge Tree (TSMT):
Similar to LSM trees but optimized for time-series:
data/
2024-01-15/
00/ # Hour partitionCompression Techniques:
- 1.Delta-of-Delta for timestamps: Consecutive timestamps are similar - Raw: [1000, 1010, 1020, 1030] - Delta: [1000, 10, 10, 10] - Delta-of-delta: [1000, 10, 0, 0] (highly compressible)
- 2.XOR compression for values (Gorilla): Consecutive values are similar - Only store XOR of consecutive values - Most bits are zero, compress well
- 3.Dictionary encoding for labels: Repeated strings stored once
Raw data point: 50 bytes
timestamp: 8 bytes
value: 8 bytesWhy not a regular database?
Traditional databases optimize for random access. Time-series workloads are sequential writes and range reads. Purpose-built storage achieves 10-100x better compression and query performance.
Index Structure:
We need to quickly find series matching label queries:
Label Index (inverted):
service=api-gateway -> [series_1, series_5, series_99, ...]
host=web-prod-001 -> [series_1, series_2, series_3, ...]
method=GET -> [series_1, series_7, series_42, ...]
Query: service=api-gateway AND method=GET
1. Lookup service=api-gateway: [1, 5, 99, ...]
2. Lookup method=GET: [1, 7, 42, ...]
3. Intersect: [1, ...]
4. Fetch data for matching seriesAggregation and Rollups
Aggregation is critical for query performance and storage efficiency.
What to say
We pre-compute aggregations at multiple time resolutions. This trades storage for query performance - most dashboard queries hit pre-aggregated data, not raw points.
Aggregation Pipeline
Aggregation Functions:
For each rollup period, we pre-compute: - avg: Average value (requires sum and count) - min: Minimum value - max: Maximum value - sum: Sum of values - count: Number of data points - p50, p90, p99: Percentiles (using sketches)
class RollupAggregator:
def __init__(self, input_resolution_sec, output_resolution_sec):
self.input_res = input_resolution_secPercentile Aggregation Challenge:
Percentiles (p50, p99) cannot be computed from sub-aggregates. Options:
- 1.Store all values: Expensive, defeats purpose of rollup 2. T-Digest sketch: Approximate percentiles, mergeable 3. DDSketch: Better accuracy for high percentiles 4. Histogram buckets: Store counts per bucket, approximate
from tdigest import TDigest
class PercentileRollup:Important detail
Aggregation must be idempotent for failure recovery. If an aggregator crashes and restarts, re-processing the same data should produce the same result. Use exactly-once semantics or design for at-least-once with deduplication.
Query Engine
The query engine must efficiently execute time-series queries across potentially billions of data points.
What to say
The query engine has three main jobs: parse the query, select the optimal data source (raw vs rollup), and execute with parallelism. We use a query language like PromQL or custom SQL extension.
# Simple metric query
http_requests_total{service="api", status="200"}
Query Execution Flow:
Query Execution
Query Optimization Strategies:
- 1.Resolution Selection: Pick appropriate rollup based on time range 2. Predicate Pushdown: Filter at storage level, not query level 3. Parallel Execution: Scatter query to multiple shards 4. Result Caching: Cache frequent dashboard queries 5. Query Limits: Timeout and cardinality limits to protect system
class QueryPlanner:
def plan(self, query: ParsedQuery) -> ExecutionPlan:
time_range = query.end_time - query.start_timeMust protect the system
Queries must have cardinality limits and timeouts. A single bad query (SELECT * equivalent) can bring down the entire cluster. Implement query cost estimation and reject expensive queries.
Consistency and Invariants
System Invariants
Metrics must never be silently dropped. Data loss means missed alerts and incorrect dashboards. Better to slow down ingestion than lose data.
Consistency Model:
Metrics systems typically use eventual consistency with specific guarantees:
| Guarantee | Requirement | Implementation |
|---|---|---|
| Write durability | Data survives node failure | Replication factor 2-3, ack after persist |
| Read consistency | Queries see recent data | Bounded staleness (30s typical) |
| Ordering | Points for same series ordered | Per-series ordering, cross-series eventual |
| Aggregation accuracy | Rollups match raw data | Idempotent aggregation, checksums |
Business impact mapping
If a dashboard shows data 30 seconds stale, users will not notice. If an alert fires 30 seconds late, it is usually acceptable. If we lose 5 minutes of metrics during an incident, debugging becomes much harder.
Handling Late-Arriving Data:
Metrics often arrive out of order due to: - Network delays - Batch uploads - Clock skew on source hosts
class IngestionHandler:
def __init__(self):
self.out_of_order_window = timedelta(minutes=5)Cardinality Enforcement:
To prevent cardinality explosion, enforce limits at multiple levels:
class CardinalityEnforcer:
def __init__(self):
self.global_series_limit = 100_000_000 # 100M series totalFailure Modes and Resilience
Proactively discuss failures
Let me walk through failure modes. A metrics system failure during an incident is the worst time to lose observability.
| Failure | Impact | Mitigation | Why It Works |
|---|---|---|---|
| Ingester crash | Data in memory lost | WAL + Kafka replay | Kafka retains data, WAL enables recovery |
| Storage node down | Queries incomplete | Replication + query routing | Data exists on replicas, queries avoid failed node |
Write-Ahead Log (WAL):
Critical for durability without sacrificing write performance:
class WriteAheadLog:
def __init__(self, path: str):
self.path = pathBackpressure Handling:
When ingestion exceeds capacity, we need graceful degradation:
class BackpressureHandler:
def __init__(self):
self.queue_high_watermark = 100_000Real-world lesson
Every major metrics system has had incidents where a cardinality explosion or query of death took down the cluster. Build in protection from day one: cardinality limits, query timeouts, backpressure, and circuit breakers.
Evolution and Scaling
What to say
This design handles 10M metrics per second with 100M series. Let me discuss how it evolves for 10x scale and what changes at different growth stages.
Evolution Path:
Stage 1: Single Cluster (up to 1M series) - Single Prometheus or InfluxDB instance - Local storage, simple setup - Works for many production systems
Stage 2: Federated (up to 100M series) - Multiple clusters by team/service - Central query layer federates - Each team manages their own limits
Stage 3: Globally Distributed (up to 1B series) - Regional ingestion clusters - Central aggregated view - Object storage for long-term
Multi-Region Architecture
Cost Optimization at Scale:
At PB scale, storage cost dominates:
| Strategy | Savings | Tradeoff |
|---|---|---|
| Aggressive downsampling | 80%+ storage reduction | Lose high-resolution historical data |
| Object storage for cold | 10x cheaper than SSD | Higher query latency for old data |
| Metric cardinality limits | Prevent runaway growth | May reject valid metrics |
| Sampling for logs | 90%+ reduction possible | May miss rare events |
| Compression tuning | 2-3x additional | Higher CPU usage |
Alternative approaches
If query latency requirements were stricter, I would use a columnar database like ClickHouse instead of custom storage. If cost were primary concern, I would use sampling more aggressively and store only aggregates after 24 hours.
What I would do differently for...
High-frequency trading metrics: In-memory only, no persistence, sub-millisecond queries. Use specialized time-series DB like QuestDB or kdb+.
IoT sensor data: Higher tolerance for data loss, aggressive sampling, edge aggregation before central ingest.
Compliance/audit logs: Different system entirely - immutable, strongly consistent, cryptographic verification. Not a metrics problem.
Business metrics: Lower volume, but need exact counts (not approximate). Use traditional OLAP database like ClickHouse or Druid.