System Design Masterclass
Storagemetricsloggingtime-seriesaggregationobservabilityadvanced

Design Distributed Metrics Logging and Aggregation

Design a system for logging and aggregating metrics at PB scale

PBs of logs/day, millions of metrics/sec|Similar to Datadog, Splunk, Elastic, InfluxData, Prometheus|45 min read

Summary

Distributed metrics systems collect, transport, store, and query billions of data points per second from thousands of sources. The core challenge is balancing write throughput with query performance while managing storage costs across hot, warm, and cold tiers. This pattern powers Datadog, Prometheus, InfluxDB, and internal systems at Google, Facebook, and Netflix.

Key Takeaways

Core Problem

This is fundamentally a time-series data problem with extreme write-heavy workload. We must optimize for append-only writes while supporting flexible time-range queries.

The Hard Part

Cardinality explosion - when metric dimensions (tags) create millions of unique time series, storage and query costs explode exponentially.

Scaling Axis

Scale by time (partition by hour/day) and by metric name hash. Time-based partitioning enables efficient range queries and data lifecycle management.

Critical Invariant

Metrics must never be silently dropped during ingestion spikes. Backpressure or buffering required - data loss breaks alerting and dashboards.

Performance Requirement

Ingestion latency under 100ms p99. Query latency under 5 seconds for 24-hour dashboards, under 30 seconds for 30-day queries.

Key Tradeoff

We trade query flexibility for write performance by pre-aggregating common queries. Raw data kept in cold storage for ad-hoc analysis.

Design Walkthrough

Problem Statement

The Question: Design a distributed metrics and logging system that can ingest petabytes of data per day from thousands of sources, store it cost-effectively, and support real-time dashboards and alerting.

Metrics systems are essential for: - Observability: Understanding system behavior through metrics, logs, and traces - Alerting: Detecting anomalies and triggering notifications - Debugging: Investigating incidents with historical data - Capacity Planning: Analyzing trends for infrastructure decisions - Business Intelligence: Tracking KPIs and user behavior

What to say first

Before I design, let me clarify the types of data we are handling - metrics vs logs vs traces have very different characteristics. I also want to understand the query patterns and retention requirements.

Hidden requirements interviewers are testing: - Do you understand time-series data characteristics? - Can you design for extreme write throughput? - Do you know about cardinality and its impact? - Can you balance storage cost vs query performance? - Do you understand aggregation and rollup strategies?

Clarifying Questions

Ask these questions to shape your architecture. Each answer significantly impacts design decisions.

Question 1: Data Types

Are we handling metrics (numeric time-series), logs (text events), or both? What about traces?

Why this matters: Metrics are fixed-schema numeric data, logs are variable-schema text. Very different storage and query patterns. Typical answer: Primarily metrics with some structured logs Architecture impact: Optimize for time-series storage, add log indexing as secondary concern

Question 2: Scale

How many metrics per second? How many unique time series (cardinality)? How much log data per day?

Why this matters: Cardinality (unique series count) is often the limiting factor, not raw throughput. Typical answer: 10M metrics/sec, 100M unique series, 10TB logs/day Architecture impact: Need distributed storage, cardinality limits, log sampling

Question 3: Query Patterns

What are the common query patterns? Real-time dashboards? Ad-hoc investigation? Alerting queries?

Why this matters: Determines indexing strategy and pre-aggregation needs. Typical answer: 80% dashboard queries (known patterns), 20% ad-hoc Architecture impact: Pre-aggregate common queries, keep raw data for ad-hoc

Question 4: Retention

How long do we keep high-resolution data? What about aggregated data? Compliance requirements?

Why this matters: Storage costs grow linearly with retention. Aggregation reduces long-term storage. Typical answer: 7 days high-res, 90 days hourly, 2 years daily Architecture impact: Multi-tier storage with automatic rollup and downsampling

Stating assumptions

I will assume: 10M metrics/sec, 100M unique time series, 10TB logs/day, 7-day high-resolution retention with rollups to 2 years, 80% dashboard queries with sub-5-second latency requirement.

The Hard Part

Say this out loud

The hard part here is cardinality explosion. When users add high-cardinality dimensions like user_id or request_id to metrics, the number of unique time series explodes, breaking storage and query performance.

Why cardinality is the real challenge:

Consider a simple metric: http_requests_total

With dimensions: method, status, endpoint, host - 5 methods x 10 statuses x 100 endpoints x 1000 hosts = 5 million series

Now add user_id with 10 million users: - 5 million x 10 million = 50 TRILLION series (impossible)

This is called cardinality explosion.

Low cardinality (1K series):
  Storage: ~100 MB/day
  Query: <100ms
+ 11 more lines...

Common mistake

Candidates focus only on write throughput. The real problem is managing cardinality while still allowing flexible querying. Unlimited dimensions make the system unusable.

Other hard problems:

  1. 1.Write Amplification: Each data point may be written multiple times (replication, indexing, aggregation)
  2. 2.Query Performance: Scanning billions of points for a dashboard query
  3. 3.Hot Spots: Popular metrics (like CPU usage) accessed constantly
  4. 4.Clock Skew: Data arriving out of order or with future timestamps
  5. 5.Backpressure: Handling ingestion spikes without dropping data

Scale and Access Patterns

Let me quantify the scale and understand access patterns.

DimensionValueImpact
Metrics ingestion10M points/secNeed distributed write path, batching
Unique time series100M seriesIndex size ~100GB, fits in memory cluster
+ 5 more rows...

What to say

At 10M metrics per second, we need distributed ingestion. But the real constraint is 100M unique series - that determines our index size and query complexity. This is a write-heavy system with read amplification during queries.

Access Pattern Analysis:

Writes: - Append-only (metrics never updated) - Highly parallelizable by metric/host - Bursty (deploy events, incidents) - Must handle backpressure gracefully

Reads: - Time-range queries (last 1h, 24h, 7d) - Aggregation queries (avg, sum, percentiles) - High-cardinality filters (specific host, service) - Dashboard queries are repetitive (cacheable) - Alert queries run constantly (every 30-60 sec)

Raw data (7 days high-res, 10-sec intervals):
  10M points/sec x 50 bytes x 86400 sec x 7 days
  = 3 PB raw (before compression)
+ 8 more lines...

High-Level Architecture

Let me design the architecture in layers: collection, transport, storage, and query.

What to say

I will organize this into four layers: collection agents on each host, a transport layer for reliable delivery, a storage layer optimized for time-series, and a query layer for dashboards and alerts.

Metrics System Architecture

Layer Responsibilities:

1. Collection Layer (Agents) - Runs on every host - Collects metrics from applications and system - Batches and compresses before sending - Handles local buffering during network issues

2. Transport Layer (Kafka) - Decouples collection from storage - Handles backpressure via buffering - Enables replay for recovery - Partitions by metric name for parallelism

3. Processing Layer (Ingesters) - Consumes from Kafka - Validates and normalizes data - Writes to hot storage - Feeds real-time aggregation

4. Storage Layer (Tiered) - Hot: Recent data, fast queries - Warm: Aggregated data, medium latency - Cold: Archived data, slow but cheap

5. Query Layer - Parses and optimizes queries - Routes to appropriate storage tier - Caches frequent dashboard queries

Real-world reference

This architecture mirrors Datadog (agents -> intake -> storage -> query), Prometheus with Thanos (local -> object storage -> query), and internal systems at Google (Monarch) and Facebook (Gorilla + ODS).

Data Model and Storage

Time-series data has unique characteristics that require specialized storage.

What to say

Time-series data is append-only, accessed by time range, and highly compressible. I will use a columnar format optimized for these patterns, with separate indexes for metric names and labels.

Data Model:

A metric data point consists of: - Metric name: e.g., http_requests_total - Labels/Tags: key-value pairs for dimensions - Timestamp: Unix timestamp (milliseconds) - Value: 64-bit float or integer

{
  "metric": "http_requests_total",
  "labels": {
+ 8 more lines...

Storage Format - Time-Structured Merge Tree (TSMT):

Similar to LSM trees but optimized for time-series:

data/
  2024-01-15/
    00/                          # Hour partition
+ 18 more lines...

Compression Techniques:

  1. 1.Delta-of-Delta for timestamps: Consecutive timestamps are similar - Raw: [1000, 1010, 1020, 1030] - Delta: [1000, 10, 10, 10] - Delta-of-delta: [1000, 10, 0, 0] (highly compressible)
  2. 2.XOR compression for values (Gorilla): Consecutive values are similar - Only store XOR of consecutive values - Most bits are zero, compress well
  3. 3.Dictionary encoding for labels: Repeated strings stored once
Raw data point: 50 bytes
  timestamp: 8 bytes
  value: 8 bytes
+ 9 more lines...

Why not a regular database?

Traditional databases optimize for random access. Time-series workloads are sequential writes and range reads. Purpose-built storage achieves 10-100x better compression and query performance.

Index Structure:

We need to quickly find series matching label queries:

Label Index (inverted):
  service=api-gateway -> [series_1, series_5, series_99, ...]
  host=web-prod-001   -> [series_1, series_2, series_3, ...]
  method=GET          -> [series_1, series_7, series_42, ...]

Query: service=api-gateway AND method=GET
  1. Lookup service=api-gateway: [1, 5, 99, ...]
  2. Lookup method=GET: [1, 7, 42, ...]
  3. Intersect: [1, ...]
  4. Fetch data for matching series

Aggregation and Rollups

Aggregation is critical for query performance and storage efficiency.

What to say

We pre-compute aggregations at multiple time resolutions. This trades storage for query performance - most dashboard queries hit pre-aggregated data, not raw points.

Aggregation Pipeline

Aggregation Functions:

For each rollup period, we pre-compute: - avg: Average value (requires sum and count) - min: Minimum value - max: Maximum value - sum: Sum of values - count: Number of data points - p50, p90, p99: Percentiles (using sketches)

class RollupAggregator:
    def __init__(self, input_resolution_sec, output_resolution_sec):
        self.input_res = input_resolution_sec
+ 28 more lines...

Percentile Aggregation Challenge:

Percentiles (p50, p99) cannot be computed from sub-aggregates. Options:

  1. 1.Store all values: Expensive, defeats purpose of rollup 2. T-Digest sketch: Approximate percentiles, mergeable 3. DDSketch: Better accuracy for high percentiles 4. Histogram buckets: Store counts per bucket, approximate
from tdigest import TDigest

class PercentileRollup:
+ 17 more lines...

Important detail

Aggregation must be idempotent for failure recovery. If an aggregator crashes and restarts, re-processing the same data should produce the same result. Use exactly-once semantics or design for at-least-once with deduplication.

Query Engine

The query engine must efficiently execute time-series queries across potentially billions of data points.

What to say

The query engine has three main jobs: parse the query, select the optimal data source (raw vs rollup), and execute with parallelism. We use a query language like PromQL or custom SQL extension.

# Simple metric query
http_requests_total{service="api", status="200"}
+ 12 more lines...

Query Execution Flow:

Query Execution

Query Optimization Strategies:

  1. 1.Resolution Selection: Pick appropriate rollup based on time range 2. Predicate Pushdown: Filter at storage level, not query level 3. Parallel Execution: Scatter query to multiple shards 4. Result Caching: Cache frequent dashboard queries 5. Query Limits: Timeout and cardinality limits to protect system
class QueryPlanner:
    def plan(self, query: ParsedQuery) -> ExecutionPlan:
        time_range = query.end_time - query.start_time
+ 26 more lines...

Must protect the system

Queries must have cardinality limits and timeouts. A single bad query (SELECT * equivalent) can bring down the entire cluster. Implement query cost estimation and reject expensive queries.

Consistency and Invariants

System Invariants

Metrics must never be silently dropped. Data loss means missed alerts and incorrect dashboards. Better to slow down ingestion than lose data.

Consistency Model:

Metrics systems typically use eventual consistency with specific guarantees:

GuaranteeRequirementImplementation
Write durabilityData survives node failureReplication factor 2-3, ack after persist
Read consistencyQueries see recent dataBounded staleness (30s typical)
OrderingPoints for same series orderedPer-series ordering, cross-series eventual
Aggregation accuracyRollups match raw dataIdempotent aggregation, checksums

Business impact mapping

If a dashboard shows data 30 seconds stale, users will not notice. If an alert fires 30 seconds late, it is usually acceptable. If we lose 5 minutes of metrics during an incident, debugging becomes much harder.

Handling Late-Arriving Data:

Metrics often arrive out of order due to: - Network delays - Batch uploads - Clock skew on source hosts

class IngestionHandler:
    def __init__(self):
        self.out_of_order_window = timedelta(minutes=5)
+ 20 more lines...

Cardinality Enforcement:

To prevent cardinality explosion, enforce limits at multiple levels:

class CardinalityEnforcer:
    def __init__(self):
        self.global_series_limit = 100_000_000      # 100M series total
+ 25 more lines...

Failure Modes and Resilience

Proactively discuss failures

Let me walk through failure modes. A metrics system failure during an incident is the worst time to lose observability.

FailureImpactMitigationWhy It Works
Ingester crashData in memory lostWAL + Kafka replayKafka retains data, WAL enables recovery
Storage node downQueries incompleteReplication + query routingData exists on replicas, queries avoid failed node
+ 4 more rows...

Write-Ahead Log (WAL):

Critical for durability without sacrificing write performance:

class WriteAheadLog:
    def __init__(self, path: str):
        self.path = path
+ 25 more lines...

Backpressure Handling:

When ingestion exceeds capacity, we need graceful degradation:

class BackpressureHandler:
    def __init__(self):
        self.queue_high_watermark = 100_000
+ 31 more lines...

Real-world lesson

Every major metrics system has had incidents where a cardinality explosion or query of death took down the cluster. Build in protection from day one: cardinality limits, query timeouts, backpressure, and circuit breakers.

Evolution and Scaling

What to say

This design handles 10M metrics per second with 100M series. Let me discuss how it evolves for 10x scale and what changes at different growth stages.

Evolution Path:

Stage 1: Single Cluster (up to 1M series) - Single Prometheus or InfluxDB instance - Local storage, simple setup - Works for many production systems

Stage 2: Federated (up to 100M series) - Multiple clusters by team/service - Central query layer federates - Each team manages their own limits

Stage 3: Globally Distributed (up to 1B series) - Regional ingestion clusters - Central aggregated view - Object storage for long-term

Multi-Region Architecture

Cost Optimization at Scale:

At PB scale, storage cost dominates:

StrategySavingsTradeoff
Aggressive downsampling80%+ storage reductionLose high-resolution historical data
Object storage for cold10x cheaper than SSDHigher query latency for old data
Metric cardinality limitsPrevent runaway growthMay reject valid metrics
Sampling for logs90%+ reduction possibleMay miss rare events
Compression tuning2-3x additionalHigher CPU usage

Alternative approaches

If query latency requirements were stricter, I would use a columnar database like ClickHouse instead of custom storage. If cost were primary concern, I would use sampling more aggressively and store only aggregates after 24 hours.

What I would do differently for...

High-frequency trading metrics: In-memory only, no persistence, sub-millisecond queries. Use specialized time-series DB like QuestDB or kdb+.

IoT sensor data: Higher tolerance for data loss, aggressive sampling, edge aggregation before central ingest.

Compliance/audit logs: Different system entirely - immutable, strongly consistent, cryptographic verification. Not a metrics problem.

Business metrics: Lower volume, but need exact counts (not approximate). Use traditional OLAP database like ClickHouse or Druid.

Design Trade-offs

Advantages

  • +Optimized for time-series
  • +Excellent compression
  • +Fast range queries

Disadvantages

  • -Complex implementation
  • -Write amplification on compaction
  • -Not general purpose
When to use

Purpose-built metrics systems like Prometheus, InfluxDB