DEV Community

Cover image for ClickHouse for Real-Time Analytics: MergeTree, Materialized Views & Sharding
Gowtham Potureddi
Gowtham Potureddi

Posted on

ClickHouse for Real-Time Analytics: MergeTree, Materialized Views & Sharding

clickhouse is the answer almost every senior data engineering interview eventually circles back to when the question becomes "how do we serve a dashboard that scans billions of rows in under a second?" The OLAP world built around row-oriented warehouses (Postgres, MySQL, even Snowflake at small scale) flat-lines once interactive latency budgets dip below five seconds — and that is the gap a column-store engine built for vectorised aggregation was designed to close.

This guide walks the four mental models a clickhouse for data engineering interview keeps probing: the columnar storage and vectorised execution model that makes sub-second possible, the MergeTree family of table engines and why one of its six variants is almost always the right answer, the materialized views clickhouse insert-time aggregation pattern that turns one logical pipeline into 1-minute / 1-hour / 1-day pre-aggregations, and the clickhouse sharding plus replication grid that lets a cluster scale horizontally without losing any of the per-node speed. Each section pairs a teaching block with a Solution-Tail interview answer — code, a step-by-step trace, an output table, then a concept-by-concept breakdown of why it works.

PipeCode blog header for ClickHouse for Real-Time Analytics — bold white headline 'ClickHouse · Real-Time Analytics' with subtitle 'MergeTree · materialized views · sharding' and a stylised columnar stack of yellow data columns being read by a glowing query beam on a dark gradient with purple, green, and orange accents and a small pipecode.ai attribution.

When you want hands-on reps immediately after reading, drill the real-time analytics practice library →, rehearse on aggregation problems →, and stack the time-series muscles with time-series practice drills →.


On this page


1. Why ClickHouse for sub-second analytics

Columnar storage and vectorised execution are the two ideas that make a billion-row aggregation feel instant

The one-sentence invariant: ClickHouse stores every column of a table as an independently compressed file and processes those files in CPU-cache-friendly batches of 65,536 values at a time — so a SELECT sum(amount) FROM events reads only the amount bytes, not the whole row, and crunches them with SIMD instead of one tuple at a time. Once you internalise "columns, not rows; batches, not tuples," every other ClickHouse design choice — MergeTree parts, sort-key skipping, materialized views — falls out as an obvious consequence.

The three places columnar wins.

  • Aggregations over a single column. A SUM, AVG, MAX, quantile, or uniq on one column reads exactly that column's bytes from disk — typically 5–20x less I/O than a row-store equivalent on the same table.
  • High-cardinality group-by. A GROUP BY user_id, event_type over a billion-row table is bottlenecked by hash-table memory and CPU, not I/O. Vectorised execution gives ClickHouse a 10–100x edge over Postgres on the same hardware.
  • Time-range scans. With PARTITION BY toYYYYMM(ts) and ORDER BY (ts, user_id), ClickHouse prunes whole partitions and skips data parts via the primary-key sparse index — turning a 90-day query against a 5-year table into a single-partition read.

Three-line latency budget.

Real-time analytics is usually defined as interactive (humans wait for the answer): the contract is a P95 below 1–2 seconds. Streaming, in contrast, talks about end-to-end latency from event to query-visible. ClickHouse is built to win the interactive contract — it does not by itself ingest from Kafka in milliseconds, but it does serve a 50ms SELECT against the result.

What interviewers listen for.

  • Do you say "columnar layout means we read only the columns we project" when asked why ClickHouse is fast? — senior signal.
  • Do you mention vectorised execution as a complementary speedup to columnar I/O? — required answer.
  • Do you call out append-heavy as the write pattern ClickHouse is optimised for? — required answer.
  • Do you flag heavy updates / deletes as the workload to avoid? — senior signal.

The 2026 reality.

  • ClickHouse Cloud and self-hosted both ship the same engine — Cloud adds object-storage tiering and managed Keeper.
  • Cloudflare, Uber, ByteDance, Yandex all run ClickHouse at the multi-PB scale, often as the serving layer behind log analytics and ad-tech dashboards.
  • Druid and Pinot occupy the same niche, but ClickHouse has won most net-new deployments since 2022 because its SQL surface is wider and its operational model simpler.
  • Snowflake / BigQuery still dominate batch analytics; ClickHouse complements rather than replaces them — the lambda pattern is the common deployment.

Worked example — measuring the columnar speed-up on a single aggregate

Detailed explanation. A team migrates a events table from Postgres to ClickHouse. The headline query is SELECT toStartOfHour(ts) AS hour, count(), uniq(user_id) FROM events WHERE ts >= now() - INTERVAL 24 HOUR GROUP BY hour ORDER BY hour. On Postgres it scans every row; on ClickHouse it touches only the ts and user_id columns, and only the last day's partition.

Question. Given a 5-billion-row events table with 50 columns, estimate how much data ClickHouse reads vs Postgres for the hourly count + unique-user query above. Show the math, then write the canonical ClickHouse table definition that enables the optimisation.

Input.

Column Rows Bytes/row (uncompressed) Total bytes
All 50 cols 5,000,000,000 250 1.25 TB
ts only 5,000,000,000 8 40 GB
user_id only 5,000,000,000 8 40 GB
last-24h ts + user_id 50,000,000 16 800 MB

Code.

CREATE TABLE events
(
    ts          DateTime,
    user_id     UInt64,
    event_type  LowCardinality(String),
    value       Float64,
    properties  String,
    -- ... 45 more columns ...
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (ts, user_id);

-- The query interviewers ask about
SELECT
    toStartOfHour(ts) AS hour,
    count()           AS events,
    uniq(user_id)     AS unique_users
FROM events
WHERE ts >= now() - INTERVAL 24 HOUR
GROUP BY hour
ORDER BY hour;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Postgres on this query reads every row in the time range — even with a btree index on ts, the heap fetch pulls all 50 columns. On a 5B-row table, that is roughly 12.5 GB of heap reads for one day of data (250 bytes/row × 50M rows).
  2. ClickHouse with PARTITION BY toYYYYMMDD(ts) prunes every partition outside the last 24 hours — the planner only touches one or two partition directories.
  3. Inside the touched partition, ClickHouse reads only the ts and user_id column files — roughly 800 MB uncompressed for 50M rows. After LZ4 compression on disk, that drops to ~200 MB of actual disk I/O.
  4. The ORDER BY (ts, user_id) sort key makes the primary-key sparse index skip granules whose ts falls outside the WHERE — the engine reads only the relevant granules, not the whole column file.
  5. Vectorised aggregation crunches 65,536 rows per call, hitting SIMD count() and a HyperLogLog-backed uniq() for the unique count.

Output.

Engine Data read Wall time (typical)
Postgres (B-tree on ts) ~12.5 GB 30–90s
ClickHouse (MergeTree, partitioned) ~200 MB 80–400ms

Rule of thumb. When the interactive latency budget is under a second on a billion-row table, the question is not "which row store can we tune?" — it is "which column store fits the shape?" ClickHouse is the default answer when the workload is append-heavy and aggregation-dominant.

Worked example — the workloads ClickHouse does NOT love

Detailed explanation. Senior interviewers love the negation question: "When is ClickHouse the wrong tool?" The answer is anywhere the workload demands frequent point updates, multi-statement transactions, or complex many-to-many joins between large tables. ClickHouse can do all three, but each fights the engine's design rather than leaning on it.

Question. Given a workload mix, classify each as "ClickHouse-native," "possible but painful," or "wrong tool." Justify each verdict in one sentence.

Input.

Workload Read pattern Write pattern Concurrency
Real-time analytics dashboard aggregate over 100M rows bulk insert from Kafka 100 QPS
OLTP order entry single-row lookup by PK single-row insert + update 1000 TPS
Ad-tech event log timeseries aggregate over 50B rows bulk insert from S3 10 QPS
Audit log row-level fetch by ID append-only, then GDPR delete 1 QPS read, 0.01 delete
Star-schema BI fan-out big fact joined to 6 dim tables nightly batch 5 QPS

Code.

-- ClickHouse-native: real-time aggregation
SELECT toStartOfMinute(ts) AS minute, count()
FROM events
WHERE ts >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;

-- Possible but painful: row-level GDPR delete
ALTER TABLE audit DELETE WHERE user_id = 12345;
-- ^ mutation: rewrites entire affected parts in the background.
--   Fine at low volume (occasional GDPR); fatal at high update volume.

-- Wrong tool: many-to-many join with no shard alignment
SELECT a.id, b.id
FROM big_fact_a a
JOIN big_fact_b b ON a.user_id = b.user_id;
-- ^ unless one side fits in memory or both share a shard key,
--   this generates a cross-shard shuffle that defeats the engine.
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The real-time dashboard is the canonical ClickHouse use case — append-only ingest, aggregate-heavy reads, small projection set.
  2. OLTP order entry is the canonical wrong-tool: ClickHouse has no real row-level update, no MVCC, no per-row transactions. Use Postgres.
  3. Ad-tech event log at 50B rows is the canonical scale story — Cloudflare runs this exact shape.
  4. Audit log with occasional GDPR delete is the "possible but painful" middle ground — mutations work, but they rewrite entire parts in the background, so they are batch-friendly and human-frequency-friendly, not event-frequency-friendly.
  5. Star-schema fan-out is doable in ClickHouse via Dictionary tables for small dimensions or careful shard-key co-location for large ones — but a senior interviewer expects you to call out the friction.

Output.

Workload Verdict Reason
Real-time analytics dashboard ClickHouse-native aggregation over append-only data
OLTP order entry Wrong tool no row updates, no transactions
Ad-tech event log ClickHouse-native aggregation at petabyte scale
Audit log + occasional delete Possible but painful mutations are batch-scale
Star-schema BI fan-out Possible with care joins need dictionary or shard co-location

Rule of thumb. Pick ClickHouse when the read pattern is "aggregate over a column" and the write pattern is "append from a stream or a bulk file." Reach for Postgres / a row store the moment the contract is "update this row, transact across rows, or look up one row by primary key 10,000 times a second."

Worked example — vectorised execution by hand

Detailed explanation. Vectorised execution is the often-missed second half of "why ClickHouse is fast." Even with columnar I/O, a row-by-row interpreter would burn cycles on per-tuple function dispatch. ClickHouse processes data in fixed-size column blocks (default 65,536 rows) and dispatches one function call per block — so the inner loop is a tight SIMD-friendly arithmetic kernel.

Question. Walk through how ClickHouse evaluates SELECT sum(value * 1.1) FROM events WHERE event_type = 'click' against a 1-billion-row table. Compare the cost model to a row-at-a-time interpreter.

Input (conceptual block).

block_row event_type value
0 click 10.0
1 view 0.0
2 click 5.0
... ... ...
65535 click 8.0

Code.

SELECT sum(value * 1.1) AS total
FROM events
WHERE event_type = 'click';
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. ClickHouse reads one block (default 65,536 rows) of the event_type and value columns at a time — two separate column files, each compressed with LZ4.
  2. The WHERE event_type = 'click' filter is evaluated as a vectorised string-equality kernel that produces a bitmap of length 65,536 (1 bit per row).
  3. The value * 1.1 projection runs as a vectorised float multiplication: one SIMD instruction processes 4 or 8 doubles in parallel per cycle on modern CPUs.
  4. The sum(...) aggregate folds the masked block into a single double, then accumulates into the running total. One function call processes 65,536 rows.
  5. A row-at-a-time interpreter would dispatch one function call per row for the filter, one per row for the projection, and one per row for the aggregate — three function calls and three CPU cache misses per row, multiplied by 1B rows.

Output (numbers are illustrative).

Engine Function dispatches Wall time
Row-at-a-time interpreter 3 × 1,000,000,000 = 3B ~30 minutes
ClickHouse vectorised 3 × ~15,260 = ~46K ~1.5 seconds

Rule of thumb. When the latency budget is under a second on a column, you need both column-pruning and vectorisation. Single-row JIT (Spark / Postgres) gives you one without the other and tops out around 10x slower than a vectorised engine on the same hardware.

Senior interview question on the ClickHouse latency model

A senior interviewer often opens with: "Explain in 90 seconds why ClickHouse can serve a SELECT count(DISTINCT user_id) GROUP BY day over 30 billion rows in under a second when Postgres on the same hardware would take 20 minutes." This blends columnar storage, partition pruning, the sparse index, and vectorised execution into one answer.

Solution Using the four-layer latency model

-- The reference table that supports the sub-second query
CREATE TABLE events
(
    ts         DateTime,
    user_id    UInt64,
    event_type LowCardinality(String),
    value      Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (toStartOfDay(ts), user_id, ts)
SETTINGS index_granularity = 8192;

-- The interactive query
SELECT
    toStartOfDay(ts) AS day,
    uniq(user_id)    AS dau
FROM events
WHERE ts >= now() - INTERVAL 30 DAY
GROUP BY day
ORDER BY day;
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Layer Mechanism What it skips Time saved
Partition pruning PARTITION BY toYYYYMM(ts) 96% of partitions (only last 2 months touched) minutes → seconds
Sparse index skip ORDER BY (toStartOfDay(ts), user_id, ts) granules outside the WHERE window seconds → 500ms
Columnar I/O reads ts and user_id only, not all columns 90% of bytes 500ms → 200ms
Vectorised + HLL uniq one block per dispatch, HyperLogLog approx per-tuple dispatch + exact distinct 200ms → 50ms

After the trace, the team can answer the next interview question on the same breath: "If you needed exact distincts, you'd use uniqExact() and pay the memory cost. For dashboards, uniq() is the right default."

Output:

day dau
2026-06-14 1,243,800
2026-06-13 1,189,420
2026-06-12 1,201,140
... ...

Why this works — concept by concept:

  • Partition pruningPARTITION BY toYYYYMM(ts) shards the on-disk layout by month. The planner inspects the WHERE predicate against partition keys and physically skips entire directories, turning a 30-billion-row scan into a 1-billion-row one.
  • Sparse primary-key index — the ORDER BY columns define the on-disk sort order. ClickHouse keeps one index entry per index_granularity (default 8192) rows, so the index is tiny (~MB for a 10B-row table) yet still lets the engine skip entire granules whose ts falls outside the WHERE.
  • Columnar I/O — only the ts and user_id column files are read. Each is LZ4-compressed on disk and decompressed in cache-friendly blocks, so the effective read amplification vs row store is roughly columns_read / columns_total.
  • Vectorised execution + HLL uniq — the aggregate runs in 65,536-row blocks with one function dispatch per block, and uniq() uses HyperLogLog so the distinct-count state per group is fixed-size (~16KB) regardless of cardinality.
  • Cost — O(filtered_rows) reads, O(blocks) function dispatches, O(groups × HLL_state) memory. The dominant term is I/O on the projected columns within the partitions touched.

SQL
Topic — real-time analytics
Real-time analytics problems (SQL)

Practice →


2. ClickHouse's role in the modern stack

ClickHouse sits between the stream and the dashboard — the sub-second serving tier that a batch warehouse cannot reach

The mental model in one line: the modern real-time stack is sources → CDC → Kafka → ClickHouse → dashboards, with an optional parallel batch lane to a warehouse — and ClickHouse is the only component on the read path that satisfies an interactive (sub-second) latency budget. Once you can draw that pipeline, every "where does ClickHouse fit?" interview question collapses to "which arrow are you talking about?"

Horizontal pipeline showing sources (Postgres + MySQL + app events) feeding through Debezium CDC + Kafka into ClickHouse, which fans out to BI dashboards (Grafana, Superset) and an API; a parallel batch lane to a warehouse is shown below for the lambda pattern, on a light PipeCode card.

The five-zone reference architecture.

  • Zone 1 — sources. Postgres / MySQL OLTP, app event firehoses, third-party webhooks. The data is row-oriented and transactional.
  • Zone 2 — CDC + stream. Debezium tails the source binlog and produces a Kafka topic per table. Application events land in Kafka directly. Kafka is the durable buffer.
  • Zone 3 — ClickHouse. The Kafka table engine subscribes to a topic; a materialized view fans every insert into a downstream MergeTree table that owns the actual storage. The MV is the bridge between the stream and the column store.
  • Zone 4 — serve. Grafana / Superset query ClickHouse directly. Custom APIs query ClickHouse via the HTTP interface. Internal tools query through the Native protocol.
  • Zone 5 — batch lane (optional). A parallel Source → DataLake → dbt → Snowflake / BigQuery lane backs the long-tail analytics and finance reports. This is the lambda-style two-engine deployment.

Two architecture patterns side by side.

  • Lambda. Sources fan out to both a batch lake and ClickHouse. The batch lane handles correctness (re-processable, idempotent) and long retention. ClickHouse handles latency (sub-second) and the last 30–90 days. The dashboard joins the two only when explicitly needed.
  • Kappa. All ingest goes through Kafka. ClickHouse via the Kafka table engine is the only consumer of record. Replays come from Kafka log compaction or from a separate S3-backed Kafka tier. There is no batch warehouse for analytics — only ClickHouse and (optionally) a cold S3 archive.

Where ClickHouse fits vs the alternatives.

Engine Latency contract Write pattern Replaces
ClickHouse sub-second on aggregates bulk insert from Kafka / S3 Druid, Pinot, Vertica
Druid sub-second on time-series streaming ingest ClickHouse on time-series
Pinot sub-second on user-facing analytics streaming ingest ClickHouse on per-user views
Snowflake / BigQuery seconds to minutes bulk insert + dbt Redshift, batch Hive
Postgres milliseconds for OLTP, slow on aggregate row-level transactions OLTP MySQL

Multi-tenant patterns.

  • One table per customer. Heavy schema overhead, but isolation is perfect — drop a table to offboard a customer.
  • One table partitioned by customer_id. Single table, single MV, but every query needs WHERE customer_id = X to prune.
  • One table sharded by customer_id. Cluster-level isolation; large customers can be moved to dedicated shards.
  • Per-tenant materialized view fan-out. Source table is shared; pre-aggregated views are per-customer with TTL.

Where the data engineer sits in this stack.

  • Owns the Kafka → ClickHouse contract — topic format, MV mapping, schema evolution.
  • Owns the MergeTree schemaORDER BY, PARTITION BY, TTL, codec choices.
  • Owns the materialized-view roll-up tree — 1-minute, 1-hour, 1-day aggregates feed the dashboard.
  • Owns the sharding key — once chosen, it is expensive to change.

Worked example — the canonical Kafka → ClickHouse → dashboard pipeline

Detailed explanation. A team ships a real-time funnel dashboard. App events flow through Kafka. The dashboard queries hourly counts and unique users by event_type. The team writes three objects in ClickHouse: a Kafka engine table (the consumer), a MergeTree table (the storage), and a materialized view that bridges them.

Question. Build the three-object pipeline that takes JSON events from a Kafka topic events and lands them in a MergeTree table events_local such that an hourly dashboard query is fast. Show the Kafka table, the target table, and the materialized view.

Input — Kafka topic schema (JSON).

Field Type Example
ts DateTime 2026-06-15 09:12:30
user_id UInt64 1029384
event_type String click
value Float64 1.0

Code.

-- 1) The Kafka source table — a consumer, not storage
CREATE TABLE events_queue
(
    ts         DateTime,
    user_id    UInt64,
    event_type LowCardinality(String),
    value      Float64
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list   = 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
    kafka_topic_list    = 'events',
    kafka_group_name    = 'clickhouse-ingest',
    kafka_format        = 'JSONEachRow',
    kafka_num_consumers = 3;

-- 2) The MergeTree storage table the dashboard queries
CREATE TABLE events_local
(
    ts         DateTime,
    user_id    UInt64,
    event_type LowCardinality(String),
    value      Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id)
TTL ts + INTERVAL 90 DAY;

-- 3) The materialized view that copies every Kafka insert into storage
CREATE MATERIALIZED VIEW events_mv TO events_local AS
SELECT
    ts,
    user_id,
    event_type,
    value
FROM events_queue;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The Kafka engine table is not a storage table. It is a consumer that pulls messages from a Kafka topic. Every SELECT from it consumes new messages.
  2. The materialized view fires on every batch the Kafka consumer reads. The MV's SELECT FROM events_queue is the read that advances the Kafka offset.
  3. The MV writes into events_local via the TO events_local clause — the target table is the on-disk MergeTree.
  4. events_local is the table dashboards query. Its PARTITION BY (day) lets queries prune by ts; its ORDER BY (event_type, ts, user_id) lets queries filtered by event type skip whole granules.
  5. The TTL clause expires data older than 90 days automatically — ClickHouse drops the affected parts in the background. Cold archival to S3 is a separate MOVE PART policy.

Output (after ingest is running for a few minutes).

Step Effect
Kafka producer writes 100K msgs/s events_queue advances offsets continuously
MV fires every batch rows land in events_local
Dashboard runs GROUP BY toStartOfHour(ts) scans events_local, not events_queue
90-day TTL older partitions drop automatically

Rule of thumb. Never query a Kafka engine table from a dashboard. Always land the data in a MergeTree via a materialized view first. The Kafka table is a moving cursor, not a queryable surface.

Worked example — choosing between lambda and kappa

Detailed explanation. Senior interviewers love the "do you need a batch lake?" follow-up. The honest answer is "it depends" — but the framing the candidate should bring is: lambda buys correctness, kappa buys simplicity. The right answer is whichever the team's two-week postmortem budget can afford.

Question. Given the requirements list below, decide whether to deploy lambda (ClickHouse + warehouse) or kappa (ClickHouse-only). Justify in one paragraph.

Input.

Requirement Value
Interactive dashboard latency < 1s P95
Long-tail analytics retention 5 years
Re-processable on schema change yes (compliance)
Daily event volume 10B events/day
Team size 4 data engineers

Code (the two architectures as YAML).

# Lambda — two engines
sources:
  - postgres-cdc: debezium
  - app-events: kafka

batch_lane:
  ingest: s3 (parquet)
  transform: dbt-snowflake
  retention: 5 years
  serves: finance, ml, ad-hoc

speed_lane:
  ingest: kafka -> clickhouse Kafka engine
  storage: events_local (90d TTL)
  serves: real-time dashboards

# Kappa — one engine
sources:
  - postgres-cdc: debezium
  - app-events: kafka

speed_lane:
  ingest: kafka -> clickhouse Kafka engine
  storage:
    - events_local: 90d hot
    - events_cold (s3 disk): 5y warm via storage policy
  serves: dashboards, finance, ad-hoc
  replays: from kafka tiered storage
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The interactive contract (< 1s P95) forces ClickHouse into the speed lane regardless of architecture choice.
  2. The 5-year retention contract favours lambda if the warehouse is already running, kappa if ClickHouse's S3-tiered storage is acceptable for cold data.
  3. The "re-processable on schema change" requirement favours lambda — the immutable parquet lake is the canonical replay source. Kappa can do it via Kafka tiered storage but with more operational overhead.
  4. 10B events/day is well within ClickHouse's single-cluster comfort zone (~150K events/sec).
  5. A 4-engineer team usually benefits from kappa's "one fewer engine to operate" — lambda's complexity grows superlinearly with team size on the operations side.

Output.

Architecture Pros Cons Verdict for this team
Lambda clean re-processing, mature dbt tooling, finance team familiar with Snowflake two engines, two costs, two pipelines to schema-evolve strong choice if Snowflake already exists
Kappa one engine, one schema-evolution surface, simpler to operate replay requires Kafka tiered storage, dbt-on-ClickHouse is newer strong choice for greenfield

Rule of thumb. Start kappa if the team is greenfield and small; layer lambda on top only when an explicit batch use case (finance, ML training data) cannot be served by ClickHouse. The "one engine" argument compounds against complexity over years.

Worked example — multi-tenant table layout

Detailed explanation. Multi-tenant ClickHouse usually starts as "one shared table with customer_id in the sort key" and only graduates to per-customer tables or sharding once one customer's volume dominates the rest. The transition is operationally expensive, so the choice of sort key has to anticipate the future.

Question. Given a SaaS analytics product with 200 customers ranging from 1M events/day to 1B events/day, design the ClickHouse table layout that supports per-customer dashboards in sub-second.

Input.

Customer count Per-customer event volume
195 < 50M events/day
4 50M – 500M events/day
1 > 1B events/day

Code.

-- Shared table for the small/medium customers
CREATE TABLE events_shared
(
    customer_id UInt32,
    ts          DateTime,
    user_id     UInt64,
    event_type  LowCardinality(String),
    value       Float64
)
ENGINE = MergeTree
PARTITION BY (customer_id, toYYYYMM(ts))
ORDER BY (customer_id, toStartOfHour(ts), user_id);

-- Dedicated table for the giant customer
CREATE TABLE events_customer_999
(
    ts         DateTime,
    user_id    UInt64,
    event_type LowCardinality(String),
    value      Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (toStartOfHour(ts), user_id);

-- Query layer routes by customer_id
-- (application-level routing, NOT a UNION ALL)
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. PARTITION BY (customer_id, toYYYYMM(ts)) means each customer's data lives in its own physical directory per month. Queries filtered by customer_id prune to one customer's data immediately.
  2. The ORDER BY starts with customer_id — the sparse index for any single-customer query is dense and lets the engine skip to that customer's range fast.
  3. The giant customer (>1B/day) gets its own table because their data alone is bigger than the rest combined. Mixing them in the shared table would force every shared query to scan past their granules.
  4. Routing logic lives in the application — a small lookup table maps customer_id → table_name. The query layer dispatches accordingly.
  5. Sharding (next section) is the further evolution when one customer outgrows a single node.

Output (latency contract).

Customer size Table Per-customer dashboard latency
Small (1M/day) events_shared 30–80ms
Medium (50M/day) events_shared 100–300ms
Large (1B/day) events_customer_999 200–500ms

Rule of thumb. Make customer_id the first column of ORDER BY (or PARTITION BY) on day one. The next decision — dedicated table or dedicated shard — is operationally cheap if the sort key already isolates the tenant. Retrofitting tenant isolation onto a non-tenant-keyed table is painful enough to be the most common reason for a v2 rewrite.

Senior interview question on real-time stack design

A senior interviewer often opens with: "Design the data pipeline for a real-time analytics product that ingests 100K events/sec from Kafka and serves a sub-second dashboard. Where does ClickHouse sit, what does the Kafka contract look like, and how do you handle a downstream schema change?"

Solution Using a four-component pipeline

-- 1) The Kafka source table (consumer)
CREATE TABLE kafka_events
(
    ts          DateTime CODEC(DoubleDelta, LZ4),
    user_id     UInt64,
    event_type  LowCardinality(String),
    properties  String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '...',
         kafka_topic_list  = 'events',
         kafka_group_name  = 'ch-prod',
         kafka_format      = 'JSONEachRow';

-- 2) The MergeTree storage table
CREATE TABLE events
(
    ts          DateTime CODEC(DoubleDelta, LZ4),
    user_id     UInt64,
    event_type  LowCardinality(String),
    properties  String
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, toStartOfHour(ts), user_id)
TTL ts + INTERVAL 90 DAY;

-- 3) The bridge MV
CREATE MATERIALIZED VIEW events_mv TO events AS
SELECT ts, user_id, event_type, properties
FROM kafka_events;

-- 4) The roll-up MV for the dashboard
CREATE TABLE events_hourly
(
    hour       DateTime,
    event_type LowCardinality(String),
    events     AggregateFunction(count),
    users      AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour);

CREATE MATERIALIZED VIEW events_hourly_mv TO events_hourly AS
SELECT
    toStartOfHour(ts) AS hour,
    event_type,
    countState()      AS events,
    uniqState(user_id) AS users
FROM events
GROUP BY hour, event_type;
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Component Role Reads Writes
kafka_events Kafka consumer Kafka topic events nothing (cursor only)
events_mv bridge kafka_events events (raw storage)
events raw storage dashboard ad-hoc nothing
events_hourly_mv roll-up events on insert events_hourly
events_hourly dashboard surface dashboard nothing

When a schema change comes in (e.g. add a column region), the team adds it to kafka_events and events with ALTER TABLE ... ADD COLUMN region String DEFAULT '', then to the MV bodies. ClickHouse can ALTER MATERIALIZED VIEW ... MODIFY QUERY to evolve the body without dropping the target table.

Output:

Dashboard query Latency
Hourly event count by type (last 30 days) 40–120ms
Unique users per hour by type 60–200ms
Top 10 event types over last day 30–80ms

Why this works — concept by concept:

  • Separation of concerns — the Kafka engine is the cursor, the MergeTree is the storage, and the AggregatingMergeTree is the dashboard surface. Each component owns one job, so a failure in one does not corrupt the others.
  • Bridge MV patternCREATE MATERIALIZED VIEW ... TO target AS SELECT ... FROM kafka_events is the canonical bridge. It fires on every insert into the source and lands the transformed rows in the target.
  • Roll-up MV with -State functionscountState() and uniqState() produce partial aggregate states that are stored in AggregatingMergeTree. Background merges roll them up further; queries finalize them with countMerge / uniqMerge.
  • Schema-evolution safety — the Kafka engine table, the storage table, and the MV all need the column added together. ClickHouse 23+ supports ALTER MATERIALIZED VIEW ... MODIFY QUERY to evolve MVs in place.
  • Cost — O(events_per_sec) per Kafka batch; O(events × MV_count) for materialized-view fanout; O(unique_groups × state_size) for the aggregating target table. Dashboard cost is O(touched_partitions) on the much smaller roll-up.

SQL
Topic — streaming
Streaming pipeline problems (SQL)

Practice →


3. The MergeTree family — the heart of ClickHouse

MergeTree is one engine, six personalities — the variant you pick is the variant your write pattern needs

The mental model in one line: MergeTree is a columnar table engine that writes immutable on-disk "parts" and merges them in the background according to the ORDER BY key — and the family variants (ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, CollapsingMergeTree, ReplicatedMergeTree) layer additional semantics onto the merge step. Once you say "the merge is when the variant's magic happens," the entire family becomes a memorisation exercise.

Family-tree diagram of MergeTree engine variants — a root MergeTree card branching into ReplacingMergeTree (dedup glyph), SummingMergeTree (Σ glyph), AggregatingMergeTree (state glyph), CollapsingMergeTree (sign +/- glyph) and ReplicatedMergeTree (replica icon), each with a small caption pill describing its use case, on a light PipeCode card.

The family in one table.

Engine Merge-step semantic Common use
MergeTree none — just sort and merge parts base columnar table
ReplacingMergeTree dedupe by sort key, keep latest version CDC upsert sink
SummingMergeTree sum numeric columns by sort key pre-aggregated counters
AggregatingMergeTree merge -State aggregate columns by sort key materialized-view roll-ups
CollapsingMergeTree collapse sign = -1 rows against sign = +1 rows row-level updates via tombstones
VersionedCollapsingMergeTree same as Collapsing, but with a version column concurrent CDC streams
ReplicatedMergeTree (and variants) adds Keeper-coordinated replication on top every production cluster

PARTITION BY vs ORDER BY — two different concepts.

  • PARTITION BY defines the physical directory structure on disk. Each unique partition expression value is a separate directory. The planner prunes whole partitions before reading anything. Use coarse expressions like toYYYYMM(ts) — fine-grained partitions (e.g. per-hour) create thousands of tiny directories and crater performance.
  • ORDER BY defines the sort order within a part, and the sparse primary-key index is built on the first N columns. The planner uses it to skip granules (8192-row chunks). Use the highest-cardinality WHERE / GROUP BY columns here in cardinality order.

Parts and merges in plain words.

  • Every INSERT creates one or more new on-disk parts under the partition directory.
  • Background merges combine small parts into larger ones, applying the variant-specific semantic during the merge.
  • A part is immutable — to "update" a row, you write a new part with the new value and let the variant-specific merge resolve.
  • OPTIMIZE TABLE ... FINAL forces a merge of all parts in a partition. Useful for testing, dangerous in production at scale.

Common interview probes.

  • "What does MergeTree actually merge?" — parts. Small parts created by inserts are merged into larger parts in the background to keep the part count low.
  • "What is the difference between ReplacingMergeTree and CollapsingMergeTree?" — Replacing keeps the latest row per sort key; Collapsing requires the writer to emit a +1 row for "current" and a -1 row for "old" — the two collapse during merge.
  • "What is LowCardinality?" — a string codec that dictionary-encodes the column. Small distinct sets (event_type, status, region) become 1–2 byte integers on disk and in memory.
  • "What is index_granularity?" — the sparse index granule size (default 8192). Each index entry covers 8192 rows.

Worked example — choosing MergeTree vs ReplacingMergeTree for a CDC sink

Detailed explanation. A team lands Postgres CDC events into ClickHouse. Each event is a full row image with a primary key. The team wants to query "the current state of every order" — but ClickHouse does not natively update rows. The fix is ReplacingMergeTree: every insert is an append, but the merge step deduplicates by sort key, keeping the latest version.

Question. Build the CDC sink table. The source emits one row per change with order_id, status, amount, and an updated_at timestamp. Show the table definition and the query that returns the "current state" of orders.

Input (rows arriving over time).

insert # order_id status amount updated_at
1 1 placed 100 2026-06-15 09:00
2 1 shipped 100 2026-06-15 10:00
3 2 placed 50 2026-06-15 11:00
4 1 delivered 100 2026-06-15 12:00

Code.

CREATE TABLE orders_cdc
(
    order_id   UInt64,
    status     LowCardinality(String),
    amount     Decimal(12, 2),
    updated_at DateTime
)
ENGINE = ReplacingMergeTree(updated_at)
ORDER BY order_id;

-- Query for current state — use FINAL or argMax
SELECT order_id, status, amount, updated_at
FROM orders_cdc
FINAL;

-- Or, without FINAL (cheaper, more typing)
SELECT
    order_id,
    argMax(status, updated_at)     AS status,
    argMax(amount, updated_at)     AS amount,
    max(updated_at)                AS updated_at
FROM orders_cdc
GROUP BY order_id;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. ReplacingMergeTree(updated_at) says "during merges, when two rows share the same ORDER BY key (order_id), keep the one with the greater updated_at."
  2. Between merges, every version of every row is still on disk. A query without FINAL sees all four rows.
  3. FINAL forces the engine to apply the dedup semantic at query time — at the cost of extra read amplification. Fine for dashboards on small tables; expensive on billion-row tables.
  4. The argMax(col, updated_at) GROUP BY order_id pattern is the cheap alternative: it computes the same answer without FINAL, at the cost of a GROUP BY scan.
  5. For the highest-traffic queries, build a downstream materialized view that pre-aggregates the current state into a smaller table.

Output (current state of orders).

order_id status amount updated_at
1 delivered 100 2026-06-15 12:00
2 placed 50 2026-06-15 11:00

Rule of thumb. Use ReplacingMergeTree for CDC sinks where you only ever care about the latest version. Pair it with argMax(...) GROUP BY pk for hot queries; reserve FINAL for low-QPS dashboards and ad-hoc sanity checks.

Worked example — SummingMergeTree for a pre-aggregated counter table

Detailed explanation. When the read pattern is "give me the running total per key," and the write pattern is "many small increments," SummingMergeTree collapses the per-key rows during merge — the on-disk size shrinks and reads scan fewer rows.

Question. Build a per-day click counter table where each insert is (day, page_id, +1) and the dashboard reads "clicks per page per day." Show the table and the query.

Input.

day page_id clicks
2026-06-15 A 1
2026-06-15 A 1
2026-06-15 B 1
2026-06-15 A 1

Code.

CREATE TABLE clicks_daily
(
    day     Date,
    page_id String,
    clicks  UInt64
)
ENGINE = SummingMergeTree(clicks)
PARTITION BY toYYYYMM(day)
ORDER BY (day, page_id);

INSERT INTO clicks_daily VALUES
    ('2026-06-15', 'A', 1),
    ('2026-06-15', 'A', 1),
    ('2026-06-15', 'B', 1),
    ('2026-06-15', 'A', 1);

-- Read pattern
SELECT day, page_id, sum(clicks) AS clicks
FROM clicks_daily
WHERE day = '2026-06-15'
GROUP BY day, page_id
ORDER BY clicks DESC;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. SummingMergeTree(clicks) declares that during a merge, rows sharing the same ORDER BY key (day, page_id) collapse into one row whose clicks column is the sum.
  2. Before merge: 4 rows. After merge: 2 rows (A with 3, B with 1).
  3. The read pattern still uses sum(clicks) GROUP BY ... — this is required because between merges, there may still be multiple rows per key. Always GROUP BY + sum, never trust the row count.
  4. The on-disk size approaches the cardinality of (day, page_id) after enough merges — perfect for high-volume counters.
  5. For multi-column counters (e.g. clicks + impressions + revenue), list them all in the engine: SummingMergeTree((clicks, impressions, revenue)).

Output.

day page_id clicks
2026-06-15 A 3
2026-06-15 B 1

Rule of thumb. Use SummingMergeTree when every increment is a row and the dashboard wants the sum per key. Pair it with AggregatingMergeTree (next section) when you also need distinct counts, quantiles, or anything beyond plain sum.

Worked example — CollapsingMergeTree for row-level updates

Detailed explanation. CollapsingMergeTree is the "I really do need row updates" answer. The writer emits two rows for every logical update: a sign = -1 "cancel" row for the old state, and a sign = +1 "create" row for the new. The merge step pairs them and drops both — leaving only the latest version.

Question. Track an order's current status using CollapsingMergeTree. Show the insert sequence for a "placed → shipped" transition and the dashboard read.

Input (rows emitted by the application).

order_id status sign
1 placed +1
1 placed -1
1 shipped +1

Code.

CREATE TABLE orders_collapsing
(
    order_id UInt64,
    status   LowCardinality(String),
    sign     Int8
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY order_id;

-- The writer must emit pair-rows for every update
INSERT INTO orders_collapsing VALUES
    (1, 'placed', +1);
-- ... later, when order ships:
INSERT INTO orders_collapsing VALUES
    (1, 'placed', -1),
    (1, 'shipped', +1);

-- Read pattern uses SUM(sign) trick
SELECT
    order_id,
    argMax(status, sign) AS status
FROM orders_collapsing
GROUP BY order_id
HAVING sum(sign) > 0;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Three rows enter the table: (1, placed, +1), (1, placed, -1), (1, shipped, +1).
  2. During merge, the engine pairs the (placed, +1) row with the (placed, -1) row (same order_id and same all-columns-except-sign) and drops both. Only (1, shipped, +1) remains.
  3. Between merges, all three rows are still on disk. The read pattern uses HAVING sum(sign) > 0 to filter out "fully cancelled" keys.
  4. The application must do extra work: read the previous state, emit a cancel row, emit a new row. Often the OLTP source does not know the previous state, which is why Replacing is more common.
  5. Use Collapsing when the application does know the previous state (e.g. the OLTP writes events as (before, after) pairs), or when you need to delete individual rows without rewriting parts.

Output.

order_id status
1 shipped

Rule of thumb. Reach for CollapsingMergeTree only when the application has a clean "before / after" event source. For the more common "I just have the latest version" pattern, ReplacingMergeTree is simpler — the application emits one row, ClickHouse handles the dedup.

Worked example — ReplicatedMergeTree for production HA

Detailed explanation. Every production ClickHouse cluster uses a Replicated*MergeTree engine variant. The replication is coordinated by ZooKeeper or, increasingly, ClickHouse Keeper. Replicas are eventually consistent — a write commits locally, then propagates to peers within milliseconds to seconds.

Question. Convert the single-node events table into a replicated one. Show the engine signature, the Keeper path convention, and how a query reads from any replica.

Input. Single-node table:

CREATE TABLE events (...) ENGINE = MergeTree
ORDER BY (event_type, ts);
Enter fullscreen mode Exit fullscreen mode

Code.

-- Replicated version (per-shard, per-replica)
CREATE TABLE events ON CLUSTER prod
(
    ts         DateTime,
    user_id    UInt64,
    event_type LowCardinality(String),
    value      Float64
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/events',  -- Keeper path: shared per shard
    '{replica}'                            -- replica name: unique per node
)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id);
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}') declares that this table participates in replication. The first argument is the Keeper path shared across all replicas of the same shard. The second is the replica name, unique per node.
  2. {shard} and {replica} are macros defined in each node's config.xml. On node-1a they might resolve to shard=01, replica=node-1a.
  3. ON CLUSTER prod runs the DDL on every node in the named cluster — both shards and replicas. Without it, you have to run the CREATE on each node manually.
  4. After creation, every write to one replica is committed locally, then asynchronously replicated to peers via Keeper-tracked log entries.
  5. Reads can hit any replica. The load balancer (or the ClickHouse Distributed engine on top) picks one per query.

Output (cluster topology).

Shard Replica Keeper path Role
01 node-1a /clickhouse/tables/01/events accept writes, serve reads
01 node-1b /clickhouse/tables/01/events accept writes, serve reads
02 node-2a /clickhouse/tables/02/events accept writes, serve reads
02 node-2b /clickhouse/tables/02/events accept writes, serve reads

Rule of thumb. Use Replicated*MergeTree for every production table without exception. Single-node MergeTree is for development and ETL scratch space only. The cost of switching from single-node to replicated after a year of writes is rebuilding the table.

Senior interview question on choosing the right MergeTree variant

A senior interviewer often opens with: "We are landing Postgres CDC into ClickHouse and want to (a) get the current state of every row, (b) keep a 30-day audit trail, and (c) survive a node failure. Which MergeTree variants do you use and how do you assemble them?"

Solution Using a ReplicatedReplacingMergeTree plus an AggregatingMergeTree roll-up

-- 1) CDC sink: replicated, dedup-on-merge
CREATE TABLE orders_cdc ON CLUSTER prod
(
    order_id   UInt64,
    status     LowCardinality(String),
    amount     Decimal(12, 2),
    updated_at DateTime
)
ENGINE = ReplicatedReplacingMergeTree(
    '/clickhouse/tables/{shard}/orders_cdc',
    '{replica}',
    updated_at                        -- version column for Replacing
)
PARTITION BY toYYYYMM(updated_at)
ORDER BY order_id
TTL updated_at + INTERVAL 30 DAY;     -- 30-day audit retention

-- 2) Current-state view (logical — uses argMax)
CREATE VIEW orders_current AS
SELECT
    order_id,
    argMax(status, updated_at) AS status,
    argMax(amount, updated_at) AS amount,
    max(updated_at)            AS updated_at
FROM orders_cdc
GROUP BY order_id;
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Requirement Engine choice Why
(a) Current state of every row ReplicatedReplacingMergeTree(updated_at) merge dedupes by order_id, keeps the latest updated_at
(b) 30-day audit trail TTL updated_at + INTERVAL 30 DAY older rows drop automatically
(c) Survive node failure Replicated... prefix + Keeper every write replicated; any replica can serve reads
Hot read of current state argMax(...) GROUP BY order_id avoids FINAL cost on dashboards

After the dedup merge fires, ClickHouse keeps only one row per order_id. The TTL clause drops anything older than 30 days from the audit trail. Replication keeps both sides — current state and audit — symmetrical across replicas.

Output (the orders_current view).

order_id status amount updated_at
1 delivered 100.00 2026-06-15 12:00
2 placed 50.00 2026-06-15 11:00
3 shipped 200.00 2026-06-15 10:30

Why this works — concept by concept:

  • ReplicatedReplacingMergeTree — combines the replication contract (every write goes to every replica via Keeper-tracked log entries) with the Replacing semantic (dedup by sort key during merge). One engine, two layered concerns.
  • Version columnReplicatedReplacingMergeTree(..., updated_at) tells the engine which column tiebreaks duplicates: keep the row with the greatest updated_at. Without it, the engine keeps an arbitrary row, which is rarely what the application wants.
  • TTL for retentionTTL updated_at + INTERVAL 30 DAY makes the engine schedule background drops for any part whose every row has aged out. No cron job, no DELETE statement.
  • argMax pattern instead of FINALargMax(col, version) GROUP BY pk produces the same answer as SELECT ... FINAL but uses a normal aggregation instead of a full table re-scan at query time. Pay the GROUP BY cost once per query, not the FINAL cost per granule.
  • Cost — O(parts) for the dedup merge (background); O(unique_keys) for the argMax aggregation; O(replicas) network amplification for the write. Reads on either side scale with the touched-partition byte count, not the row count.

SQL
Topic — time-series
Time-series problems (SQL)

Practice →


4. Materialized views — incremental aggregation engine

ClickHouse materialized views are insert-time triggers, not refresh-on-schedule — the difference is the whole story

The mental model in one line: a ClickHouse materialized view is an INSERT INTO target SELECT ... FROM source that fires every time the source table receives a batch — there is no schedule, no refresh, no cron. Once you internalise "MVs are triggers, not refreshes," the entire materialized-view interview surface (POPULATE, -State functions, cascading MVs) becomes a sequence of obvious follow-ons.

Visual flow showing inserts arriving in a raw events table triggering an MV that fans out into 1-minute, 1-hour and 1-day AggregatingMergeTree target tables; small -State / -Merge function badges show how aggregates compose; a cascade arrow shows the 1-min table feeding into a downstream MV, on a light PipeCode card.

The insert-time MV contract.

  • The MV is a stored SELECT query plus a target table.
  • When a batch of N rows arrives in the source, the MV's SELECT runs over that batch only (not the whole source table) and inserts the result into the target.
  • The MV does not maintain incremental state — it sees one batch, produces one output, and forgets.
  • "Refreshable" MVs (a 2024+ feature) are a separate construct on a schedule; they are not what an interview means by "materialized view."

The two MV idioms.

  • TO target_table — the canonical 2024+ pattern. The MV writes into an existing table you defined separately. Backfill is straightforward (INSERT INTO target SELECT ... FROM source).
  • POPULATE at create time — runs the SELECT once over the existing source data, then enables trigger mode. Convenient for one-shot setup; dangerous on large tables because it locks until done.

-State / -Merge / -MergeState — the aggregate function trinity.

  • countState(), uniqState(col), sumState(col), quantileState(col) — return a partial-aggregate state object, not a final value. Suitable for storage in an AggregatingMergeTree.
  • countMerge(state), uniqMerge(state) — finalize a state into a number, typically at query time.
  • countMergeState(state), uniqMergeState(state) — combine multiple states into one new state. Used in cascading MVs.

Cascading MVs in plain words.

  • The 1-minute roll-up MV reads from raw_events on insert and writes to agg_1m.
  • A second MV reads from agg_1m on insert and writes to agg_1h — combining 60 minute-states into one hour-state via *MergeState.
  • A third MV reads from agg_1h and writes to agg_1d.
  • Each MV fires only on its source's inserts, so the cascade is incremental from end to end.

Common interview probes.

  • "Are ClickHouse MVs refreshed on a schedule?" — no. They fire on every source insert.
  • "What is the difference between -State and -Merge?" — -State produces a partial state for storage; -Merge finalizes a state into a number for reading.
  • "How do you backfill a new MV with historical data?" — either use POPULATE at create time, or create the MV first (which captures new inserts) then run INSERT INTO target SELECT ... FROM source WHERE ts < cutoff.
  • "What happens if the source schema changes?" — the MV's SELECT must match the new schema; otherwise the trigger fails. Always evolve the MV body with ALTER MATERIALIZED VIEW ... MODIFY QUERY.

Worked example — a 1-minute pre-aggregation MV

Detailed explanation. A dashboard needs "events per minute, per event_type" with sub-second latency over the last 24 hours. A raw events table at 100K events/sec would force the dashboard to scan 8.6B rows. An AggregatingMergeTree table fed by an MV reduces that to one row per (minute, event_type) — typically a few thousand rows per minute total.

Question. Build the target events_1m table and the MV that maintains it. Show how the dashboard query reads from it.

Input (raw events arriving at high rate).

ts user_id event_type value
2026-06-15 09:00:00.123 1 click 1.0
2026-06-15 09:00:00.456 2 click 1.0
2026-06-15 09:00:05.789 1 view 1.0

Code.

-- Target table for the 1-minute roll-up
CREATE TABLE events_1m
(
    minute     DateTime,
    event_type LowCardinality(String),
    events     AggregateFunction(count),
    users      AggregateFunction(uniq, UInt64),
    value_sum  AggregateFunction(sum, Float64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMMDD(minute)
ORDER BY (event_type, minute);

-- The MV that fires on every batch in `events`
CREATE MATERIALIZED VIEW events_1m_mv TO events_1m AS
SELECT
    toStartOfMinute(ts) AS minute,
    event_type,
    countState()        AS events,
    uniqState(user_id)  AS users,
    sumState(value)     AS value_sum
FROM events
GROUP BY minute, event_type;

-- Dashboard query
SELECT
    minute,
    event_type,
    countMerge(events)    AS events,
    uniqMerge(users)      AS users,
    sumMerge(value_sum)   AS value_sum
FROM events_1m
WHERE minute >= now() - INTERVAL 24 HOUR
GROUP BY minute, event_type
ORDER BY minute, event_type;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The AggregatingMergeTree target stores partial aggregate states, not finalized numbers. Each column is typed as AggregateFunction(name, arg_types).
  2. The MV's SELECT runs over each insert batch into events. The GROUP BY minute, event_type collapses the batch into one row per (minute, event_type) combination.
  3. countState() produces a tiny state (an integer); uniqState(user_id) produces a HyperLogLog state (~16KB at full density, but compact for small groups); sumState(value) is a single float.
  4. Background merges in AggregatingMergeTree combine states for the same ORDER BY key — turning many small states into one big state per (minute, event_type).
  5. The dashboard reads with *Merge functions, which finalize the states into numbers. The GROUP BY minute, event_type in the read is required because between merges multiple rows per key may still exist.

Output (one row per (minute, event_type) after merges).

minute event_type events users value_sum
2026-06-15 09:00 click 1240 980 1240.0
2026-06-15 09:00 view 800 720 800.0
2026-06-15 09:01 click 1310 1010 1310.0

Rule of thumb. Always use *State in the MV body and *Merge in the read query. Mixing them ("can I just store count() instead of countState()?") breaks the moment the target table accumulates more than one row per key — which happens after every merge.

Worked example — cascading MVs (1-minute → 1-hour → 1-day)

Detailed explanation. When the dashboard has three zoom levels (last hour at minute resolution, last day at hour resolution, last month at day resolution), the cleanest architecture is a cascade: the 1-minute MV feeds the 1-hour MV, which feeds the 1-day MV. Each MV fires only on its direct source's inserts.

Question. Extend the 1-minute roll-up with 1-hour and 1-day cascade MVs. Show the engine and the chained MV definitions.

Input. The events_1m table from the previous example.

Code.

-- 1-hour target
CREATE TABLE events_1h
(
    hour       DateTime,
    event_type LowCardinality(String),
    events     AggregateFunction(count),
    users      AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour);

-- Cascade MV: events_1m -> events_1h
CREATE MATERIALIZED VIEW events_1h_mv TO events_1h AS
SELECT
    toStartOfHour(minute)        AS hour,
    event_type,
    countMergeState(events)      AS events,
    uniqMergeState(users)        AS users
FROM events_1m
GROUP BY hour, event_type;

-- 1-day target
CREATE TABLE events_1d
(
    day        Date,
    event_type LowCardinality(String),
    events     AggregateFunction(count),
    users      AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(day)
ORDER BY (event_type, day);

-- Cascade MV: events_1h -> events_1d
CREATE MATERIALIZED VIEW events_1d_mv TO events_1d AS
SELECT
    toDate(hour)              AS day,
    event_type,
    countMergeState(events)   AS events,
    uniqMergeState(users)     AS users
FROM events_1h
GROUP BY day, event_type;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. events_1m_mv fires on every insert into events and writes the per-minute aggregate states into events_1m.
  2. events_1h_mv fires on every insert into events_1m and rolls 60 minute-states into one hour-state via countMergeState and uniqMergeState.
  3. events_1d_mv does the same one level up — 24 hour-states into one day-state.
  4. The MergeState variants take existing states and combine them, producing a new state (not a finalised number). This is what makes the cascade incremental.
  5. Each cascade level writes 60x less data than the previous — events_1d is roughly events_1m / 1440.

Output (sketch of disk sizes).

Table Rows per day Storage
events 8.6B ~100 GB
events_1m 1440 × distinct event_types ~50 MB
events_1h 24 × distinct event_types ~2 MB
events_1d 1 × distinct event_types ~100 KB

Rule of thumb. Cascade MVs trade disk for read latency at each level. Three levels (1m / 1h / 1d) is the sweet spot for most dashboards. Beyond that, the operational complexity of maintaining the cascade outweighs the marginal scan reduction.

Worked example — backfilling a new MV without POPULATE

Detailed explanation. POPULATE is convenient but blocks on inserts during the backfill. A cleaner two-step pattern is to (1) create the MV first so it captures new inserts, then (2) backfill historical data with INSERT INTO target SELECT ... FROM source WHERE ts < cutoff.

Question. A new events_1h MV needs to be backfilled with 90 days of history without blocking the live ingest. Show the two-step backfill.

Input. events has 90 days of history. events_1h_mv is the MV. events_1h is the target.

Code.

-- Step 0: pick a cutoff *before* creating the MV.
--    We will backfill rows with ts < cutoff.
--    The MV will catch every row with ts >= cutoff via trigger.

-- Step 1: create the MV (it starts firing on new inserts immediately)
CREATE MATERIALIZED VIEW events_1h_mv TO events_1h AS
SELECT
    toStartOfHour(ts)   AS hour,
    event_type,
    countState()        AS events,
    uniqState(user_id)  AS users
FROM events
GROUP BY hour, event_type;

-- Step 2: backfill historical data in batches
INSERT INTO events_1h
SELECT
    toStartOfHour(ts)   AS hour,
    event_type,
    countState()        AS events,
    uniqState(user_id)  AS users
FROM events
WHERE ts < now() - INTERVAL 24 HOUR    -- safety gap to avoid double-counting
GROUP BY hour, event_type;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. Step 1: create the MV. From this moment, every new insert into events fires the MV and lands rows in events_1h.
  2. Step 2: explicitly backfill historical rows via INSERT INTO events_1h SELECT ... FROM events WHERE ts < cutoff. The cutoff has a safety gap to avoid double-counting rows that may have been ingested between Step 1 and Step 2.
  3. The AggregatingMergeTree engine handles the overlap automatically — duplicate (hour, event_type) keys merge their states. As long as the cutoff is conservative, the slight overlap is harmless (states combine, not duplicate values).
  4. For very large backfills, run Step 2 in batched ranges (e.g. per partition) to avoid one giant query.
  5. The alternative — CREATE MATERIALIZED VIEW ... POPULATE AS ... — does both steps in one statement but blocks the source from receiving inserts until done. Unacceptable on a live table.

Output.

Phase Source covered Notes
Step 1: MV created ts >= now() every new row triggers it
Step 2: INSERT SELECT ts < now() - INTERVAL 24 HOUR one-shot backfill in batches
Net coverage full range, slight overlap at boundary overlap absorbed by AggregatingMergeTree

Rule of thumb. Never use POPULATE on a live table. The two-step "create MV, then INSERT SELECT" pattern is safer, batchable, and survives the operator pressing Ctrl-C halfway through. Pay the 30 seconds of extra typing.

Worked example — the MV-source-join pitfall

Detailed explanation. A common bug: the MV body joins the source table to another table. The MV only fires on inserts into the source — joins read the target of the join at insert time. If a fact-table insert arrives before its dim-table row, the join misses and the MV writes a row with NULL dim values that never updates.

Question. Diagnose the pitfall in the MV below and propose two fixes.

Input. A events table and a users dim table. The MV joins them.

Code (the broken MV).

CREATE MATERIALIZED VIEW events_enriched_mv TO events_enriched AS
SELECT
    e.ts,
    e.user_id,
    e.event_type,
    u.country,
    u.tier
FROM events e
LEFT JOIN users u ON u.user_id = e.user_id;
-- ^ bug: the join reads `users` at the time the events batch arrives.
--   If the user row is added later, the joined columns stay NULL forever.
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The MV fires on inserts into events. The LEFT JOIN users is evaluated at that moment.
  2. If events has a row for user_id = 12345 but users does not yet, the LEFT JOIN returns NULL for country and tier. The MV writes NULL into events_enriched.
  3. Later, when users gets the row for 12345, the existing events_enriched row does not automatically update — there is no recalculation. The data is permanently stale.
  4. Fix 1 — Dictionary. Define users as a Dictionary in ClickHouse. Dictionaries are looked up at query time on events_enriched, so the join is fresh on every read.
  5. Fix 2 — defer the enrichment. Drop the join from the MV; do the enrichment at the dashboard query layer with JOIN users or dictGet(...).

Output (the fix using a Dictionary).

CREATE DICTIONARY users_dict
(
    user_id UInt64,
    country String,
    tier    String
)
PRIMARY KEY user_id
SOURCE(CLICKHOUSE(TABLE 'users'))
LIFETIME(MIN 300 MAX 600)
LAYOUT(HASHED());

-- The MV now stores raw events, no join
CREATE MATERIALIZED VIEW events_enriched_mv TO events_enriched AS
SELECT
    ts, user_id, event_type
FROM events;

-- Enrichment happens at query time, against the live dictionary
SELECT
    event_type,
    dictGet('users_dict', 'country', user_id) AS country,
    count()
FROM events_enriched
WHERE ts >= now() - INTERVAL 1 HOUR
GROUP BY event_type, country;
Enter fullscreen mode Exit fullscreen mode

Rule of thumb. Never join in an MV body if the right side of the join can update independently. Use Dictionaries for small dimension tables (refreshed on a TTL) and defer enrichment to the read query when the dim is large or changes frequently.

Senior interview question on materialized-view design

A senior interviewer often frames this as: "Design the materialized-view tree for a real-time analytics product that needs to serve hourly DAU (distinct user count) over the last 30 days with sub-100ms latency. Walk through the engine choices, the -State / -Merge functions, and the backfill plan."

Solution Using an AggregatingMergeTree roll-up with HyperLogLog uniq state

-- 1) Raw events table
CREATE TABLE events
(
    ts         DateTime,
    user_id    UInt64,
    event_type LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id)
TTL ts + INTERVAL 90 DAY;

-- 2) Hourly DAU roll-up target
CREATE TABLE dau_hourly
(
    hour       DateTime,
    event_type LowCardinality(String),
    users      AggregateFunction(uniq, UInt64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour);

-- 3) The MV that fires on every batch of `events`
CREATE MATERIALIZED VIEW dau_hourly_mv TO dau_hourly AS
SELECT
    toStartOfHour(ts) AS hour,
    event_type,
    uniqState(user_id) AS users
FROM events
GROUP BY hour, event_type;

-- 4) Dashboard query
SELECT
    hour,
    event_type,
    uniqMerge(users) AS dau
FROM dau_hourly
WHERE hour >= now() - INTERVAL 30 DAY
GROUP BY hour, event_type
ORDER BY hour;
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Layer What it does Latency contribution
events (raw) append-only, partitioned, sorted not on read path
dau_hourly_mv (trigger) fires on each insert batch into events adds ~1–3% write overhead
dau_hourly (target) AggregatingMergeTree storing HLL states one row per (hour, event_type) after merges
Dashboard read uniqMerge on ~24×30×N rows sub-100ms

After 30 days, dau_hourly holds roughly 720 hours × distinct event_types rows. With a dozen event types, that is ~8K rows — a flat scan is microseconds.

Output:

hour event_type dau
2026-06-15 08:00 click 124,300
2026-06-15 08:00 view 198,420
2026-06-15 09:00 click 143,100
2026-06-15 09:00 view 211,820

Why this works — concept by concept:

  • Insert-time MV trigger — the MV fires on every batch into events, not on a schedule. The roll-up table is always up-to-date with the latency of the Kafka consumer (typically 1–10 seconds).
  • AggregatingMergeTree with uniqStateuniqState(user_id) produces a HyperLogLog state that approximates the unique count. The state is fixed-size (~16KB at full cardinality), so the roll-up table size is bounded by groups × state_size, not by the input row count.
  • -State at write, -Merge at read-State produces partial states for storage; -Merge finalizes them at query time. This is the only correct pattern; storing uniq(...) directly would forbid further aggregation.
  • Schema-evolution path — add a column with ALTER TABLE events ADD COLUMN ..., then ALTER TABLE dau_hourly ADD COLUMN ..., then ALTER MATERIALIZED VIEW dau_hourly_mv MODIFY QUERY .... The MV body change is the one that needs care; storage adds are cheap.
  • Cost — write amplification per MV roughly equals (input rows / group cardinality) — for a billion-row day into ~24×10 groups, that is a ~4M×N reduction. Read cost on the dashboard is O(touched_rows_in_target) × O(uniqMerge cost), measured in milliseconds for 30-day windows.

SQL
Topic — data aggregation
Aggregation problems (SQL)

Practice →


5. Sharding and replication at scale

Distributed table + Replicated*MergeTree is the production pattern — shards scale write throughput, replicas scale HA and read concurrency

The mental model in one line: a ClickHouse cluster is a grid of shards × replicas, where each shard owns a disjoint slice of the data (by sharding key) and each replica within a shard is a fully synchronized copy — and a Distributed table on top fans queries out across all shards in parallel. Once you can draw the 3×2 grid (3 shards, 2 replicas each), the entire scaling story collapses to "more shards = more write throughput, more replicas = more read HA."

Grid diagram showing a 3-shard × 2-replica ClickHouse cluster — six node cards arranged in a 3x2 grid with a Distributed table sitting above and a ZooKeeper / Keeper coordination block on the right; sharding key arrow on the left labelled cityHash64(user_id) and replication arrows between paired replicas, on a light PipeCode card.

The four building blocks.

  • Local table — a Replicated*MergeTree on each node. Replicas of the same shard share the same Keeper path; replicas of different shards have different paths.
  • Distributed table — a thin "fan-out" engine that lives on every node and routes reads and writes across all shards. It owns no data of its own.
  • Sharding key — a deterministic function (typically cityHash64(user_id)) that maps each row to a shard. Picked once; expensive to change.
  • ClickHouse Keeper (or ZooKeeper) — the coordination service that sequences replication log entries and DDL.

Sharding key choice.

  • Hash shardingcityHash64(user_id). Even distribution across shards; same user always lands on the same shard (useful for per-user joins).
  • Random shardingrand(). Perfectly even, but per-user joins must hit every shard via GLOBAL JOIN.
  • Custom sharding — a domain-specific function (e.g. customer_id % 4). Useful for multi-tenant where you want a specific customer on a specific shard.

Distributed reads.

  • A SELECT on the Distributed table fans out to every shard. Each shard runs the same query locally on one replica (the load balancer picks).
  • Partial results stream back to the coordinator, which aggregates and returns.
  • For joins that need cross-shard data, GLOBAL JOIN sends the right-side table contents to every shard.

Distributed writes.

  • A write to the Distributed table routes the row to its target shard based on the sharding key.
  • The Distributed engine can buffer briefly (distributed_directory_monitor_sleep_time_ms) and batch the forwarded writes.
  • For high-throughput, applications often write directly to the local Replicated*MergeTree on a chosen shard, skipping the Distributed table's overhead.

Replication contract.

  • A write on one replica is committed locally, then asynchronously propagated to peers via Keeper-tracked log entries.
  • Replication is eventually consistent — readers may see fresher data on one replica than another for a few seconds during catch-up.
  • system.replicas and system.replication_queue tables show per-shard replication health.

Common interview probes.

  • "What is the difference between a Distributed table and a Replicated table?" — Distributed fans queries across shards (horizontal scaling); Replicated synchronizes a single shard's data across replicas (HA).
  • "Why does ClickHouse use ClickHouse Keeper instead of ZooKeeper?" — same Raft contract, but written in C++ and packaged with ClickHouse, simpler ops.
  • "Can you change the sharding key online?" — not easily. The common pattern is to write to a new cluster with the new sharding key and dual-write during migration.
  • "What is GLOBAL IN and when do you need it?" — when a subquery in a Distributed query needs to be evaluated once on the coordinator and broadcast to all shards (rather than executed independently per shard).

Worked example — a 3-shard × 2-replica reference cluster

Detailed explanation. The canonical small-but-real ClickHouse cluster is 3 shards × 2 replicas — 6 nodes total. This is the smallest topology that demonstrates both write fan-out (sharding) and read HA (replication). Most production clusters scale by adding shards (for write throughput) or replicas (for read concurrency).

Question. Define the events table on a 3-shard × 2-replica cluster. Show the per-node local table, the Distributed table on top, and the cluster configuration sketch.

Input (cluster XML config sketch).

Shard Replicas
01 node-1a, node-1b
02 node-2a, node-2b
03 node-3a, node-3b

Code.

-- 1) Local table on every node (created ON CLUSTER)
CREATE TABLE events_local ON CLUSTER prod
(
    ts         DateTime,
    user_id    UInt64,
    event_type LowCardinality(String),
    value      Float64
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/events_local',
    '{replica}'
)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id)
TTL ts + INTERVAL 90 DAY;

-- 2) Distributed table on top (also ON CLUSTER)
CREATE TABLE events ON CLUSTER prod AS events_local
ENGINE = Distributed(
    prod,                  -- cluster name from config.xml
    default,               -- database
    events_local,          -- local table
    cityHash64(user_id)    -- sharding key
);
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. events_local is the storage table. Each node has its own copy keyed by {shard} and {replica} macros. Replicas of the same shard share a Keeper path; replicas of different shards do not.
  2. events is the router table. It owns no data; it routes reads and writes across events_local on every shard.
  3. The sharding key cityHash64(user_id) deterministically maps each user_id to a shard. All events for a given user land on the same shard, which makes per-user joins cheap.
  4. ON CLUSTER prod runs the DDL on every node listed under the cluster prod in config.xml. Without it, you'd run the CREATE on each node manually.
  5. Applications can write to events (the Distributed table) for convenience or directly to events_local on a chosen shard for max throughput.

Output (topology).

Node Shard Replica Owns
node-1a 01 r1 shard 01 data (master copy)
node-1b 01 r2 shard 01 data (replicated copy)
node-2a 02 r1 shard 02 data
node-2b 02 r2 shard 02 data
node-3a 03 r1 shard 03 data
node-3b 03 r2 shard 03 data

Rule of thumb. Start every production deployment as 2 shards × 2 replicas (4 nodes). Scale by adding shards when write throughput is the bottleneck; add replicas when read concurrency is. The 3-shard × 2-replica grid is the minimum to demonstrate the pattern in interviews.

Worked example — choosing the sharding key

Detailed explanation. The sharding key choice is one of the most consequential decisions in a ClickHouse cluster. A bad choice causes hot shards (skewed write traffic) or cross-shard joins (slow). The default answer is cityHash64(user_id) for user-facing analytics — even distribution and per-user co-location in one expression.

Question. For each workload below, pick the sharding key and explain in one sentence.

Input.

Workload Per-row identity Common query pattern
User-event log user_id per-user funnel
Time-series metrics (metric_name, ts) metric over time
Ad impressions (campaign_id, user_id) campaign-level aggregate
Multi-tenant SaaS (customer_id, ...) per-customer dashboard

Code.

-- 1) User-event log: hash on user_id for even distribution + per-user co-location
ENGINE = Distributed(prod, default, events_local, cityHash64(user_id));

-- 2) Time-series metrics: hash on metric_name to keep each metric on one shard
ENGINE = Distributed(prod, default, metrics_local, cityHash64(metric_name));

-- 3) Ad impressions: hash on campaign_id, since campaign-level aggregates dominate
ENGINE = Distributed(prod, default, impressions_local, cityHash64(campaign_id));

-- 4) Multi-tenant SaaS: hash on customer_id so each tenant lives on one shard
ENGINE = Distributed(prod, default, events_local, cityHash64(customer_id));
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. user_id hash gives even distribution (assuming user_id is roughly random) and co-locates a user's events on one shard — per-user joins become per-shard joins.
  2. metric_name hash keeps each time-series on one shard. Time-range scans become per-shard scans rather than cross-shard. Watch for a "celebrity metric" — one metric with disproportionate traffic — which would create a hot shard.
  3. campaign_id hash is right when campaigns dominate the read pattern. If a single mega-campaign skews traffic, fall back to (campaign_id, user_id) hash to spread.
  4. customer_id hash gives tenant isolation at the shard level. Large customers can be moved to dedicated shards via cluster reshape; small customers share shards.

Output (trade-off summary).

Sharding key Even distribution? Per-key co-location? Cross-shard joins?
cityHash64(user_id) yes (if user_id random) yes per user only for cross-user aggregates
cityHash64(metric_name) yes (if many metrics) yes per metric only for cross-metric aggregates
cityHash64(campaign_id) yes (if many campaigns) yes per campaign for cross-campaign cohorts
cityHash64(customer_id) depends on customer mix yes per customer rarely needed
rand() perfect no always

Rule of thumb. The sharding key is "what does my dashboard query group by most often?" If the answer is user_id, hash by user. If the answer is customer_id, hash by customer. Co-location at write time pays off at read time.

Worked example — GLOBAL IN for cross-shard subqueries

Detailed explanation. A subquery in a Distributed query is executed per shard by default — every shard runs the subquery independently against its own local data. When the subquery should produce the same result on every shard (e.g. "the top 100 users globally"), use GLOBAL IN: the coordinator runs the subquery once and broadcasts the result to every shard.

Question. Find every event by the top 100 users (by total event count) globally. Show the naive query and the GLOBAL IN fix.

Input. events is a Distributed table over 3 shards.

Code.

-- BROKEN: each shard computes its own "top 100 by local count"
SELECT *
FROM events
WHERE user_id IN (
    SELECT user_id
    FROM events
    GROUP BY user_id
    ORDER BY count() DESC
    LIMIT 100
);

-- FIX: GLOBAL IN — coordinator runs the subquery once, broadcasts result
SELECT *
FROM events
WHERE user_id GLOBAL IN (
    SELECT user_id
    FROM events
    GROUP BY user_id
    ORDER BY count() DESC
    LIMIT 100
);
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. The broken query fans out the outer SELECT to every shard. Each shard then independently runs the inner subquery against its own data — producing 3 different "top 100 by local count" lists.
  2. The outer WHERE on each shard checks user_id IN (local_top_100), which excludes users whose events happen to be on a different shard.
  3. The GLOBAL IN fix changes the execution: the coordinator runs the inner subquery once (which itself fans out to every shard for the aggregation), collects the top 100 globally, then broadcasts that list to every shard for the outer WHERE.
  4. Now every shard filters by the same global top-100 list. The result is what the user actually wanted.
  5. GLOBAL JOIN is the analogous fix for joins where the right side needs to be computed once and broadcast.

Output.

Query Result
Naive IN each shard's local top 100, no global consistency
GLOBAL IN events by the true global top 100 users

Rule of thumb. Whenever a subquery on a Distributed table should produce a single result for the whole cluster (not per-shard), use GLOBAL IN or GLOBAL JOIN. Without GLOBAL, every shard re-runs the subquery against its own partial data.

Worked example — measuring replication lag

Detailed explanation. Replication in ClickHouse is asynchronous — writes commit locally then propagate. Under normal load, lag is sub-second; under heavy bulk inserts, it can climb to seconds or tens of seconds. Monitoring the gap is the first step toward debugging "the dashboard shows stale data on one replica" tickets.

Question. Write a system-table query that reports per-shard replication lag in seconds. Explain the columns it reads.

Input. A running cluster with events_local on every node.

Code.

SELECT
    database,
    table,
    replica_name,
    is_leader,
    absolute_delay,
    queue_size,
    log_max_index,
    log_pointer
FROM system.replicas
WHERE table = 'events_local'
ORDER BY absolute_delay DESC;
Enter fullscreen mode Exit fullscreen mode

Step-by-step explanation.

  1. system.replicas is the live view of replication health. It exposes one row per replicated table per node.
  2. absolute_delay (seconds) is the time since the most recent unmerged log entry was generated on the leader. Anything > 30 is worth investigating.
  3. queue_size is the count of pending log entries waiting for this replica to apply. A growing queue with steady log_max_index means the replica is falling behind.
  4. log_max_index is the most recent log entry index globally; log_pointer is this replica's local pointer. The difference is the count of pending log entries.
  5. is_leader rotates between replicas of the same shard. Routine reads can hit any replica; some DDL (mutations, drops) goes through the leader.

Output.

database table replica_name is_leader absolute_delay queue_size
default events_local node-1a 1 0 0
default events_local node-1b 0 2 4
default events_local node-2a 1 0 0
default events_local node-2b 0 15 87

Rule of thumb. Alert on absolute_delay > 30 seconds per replicated table. Alert on queue_size growing for more than 60 seconds. Both indicate that the replica is not keeping up with writes — either because of network, disk, or merge backlog.

Senior interview question on cluster scaling

A senior interviewer often asks: "Your ClickHouse cluster is 3 shards × 2 replicas. Write QPS is doubling every quarter and a single shard is now hitting CPU saturation. Walk me through how you scale, what breaks, and how you keep the dashboard online."

Solution Using a four-step horizontal scale-out plan

-- Step 1: stand up new shards (4 and 5) ON CLUSTER prod
CREATE TABLE events_local ON CLUSTER prod_v2  -- new cluster def includes 5 shards
(
    ts         DateTime,
    user_id    UInt64,
    event_type LowCardinality(String),
    value      Float64
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/v2/{shard}/events_local',
    '{replica}'
)
PARTITION BY toYYYYMMDD(ts)
ORDER BY (event_type, ts, user_id);

-- Step 2: new Distributed table over the 5-shard cluster
CREATE TABLE events_v2 ON CLUSTER prod_v2 AS events_local
ENGINE = Distributed(
    prod_v2,
    default,
    events_local,
    cityHash64(user_id)
);

-- Step 3: backfill (or dual-write from the Kafka MV bridge)
-- Option A: backfill from old cluster using remote() function
INSERT INTO events_v2
SELECT * FROM remote('prod_old_clusters', default.events_local)
WHERE ts >= '2026-01-01';

-- Option B: dual-write at the MV bridge layer (Kafka -> both clusters)
-- by adding a second MV that targets the new cluster.

-- Step 4: cut the dashboard over to events_v2 and decommission v1
Enter fullscreen mode Exit fullscreen mode

Step-by-step trace.

Step Action Risk
1 Stand up 2 new shards with their replicas new nodes; no data yet
2 Define events_v2 as Distributed over 5-shard cluster router only; no traffic yet
3 Backfill or dual-write to populate the new shards I/O-heavy; do in batches
4 Cut dashboards to events_v2, decommission events requires app-config push

The migration is online: the old cluster keeps serving until the dashboard is cut over. The hard part is Step 3 — the backfill must respect the new sharding function so that cityHash64(user_id) % 5 routes rows to the right new shard.

Output (after migration):

Cluster Shards Replicas Write throughput Dashboard latency
prod (old) 3 2 150K events/sec 200–500ms
prod_v2 5 2 250K events/sec 100–300ms

Why this works — concept by concept:

  • Add shards to scale write throughput — each new shard owns a slice of the hash space. Write throughput scales linearly because each shard handles its own partition's inserts and merges.
  • Add replicas to scale read concurrency and HA — each replica can independently serve reads. Two replicas tolerate one node failure; three tolerate two.
  • Distributed table is a thin router — it owns no data, so reshaping the cluster (adding shards) does not lose any of the cluster's data when done correctly. The migration risk is in the backfill, not in the router.
  • Dual-write at the MV bridge — if the Kafka → ClickHouse MV is the only writer, adding a second MV that targets the new cluster gives you dual-write for free during migration. Cut the dashboard, then drop the old MV.
  • Cost — O(rows × N_replicas) write amplification per shard; O(touched_partitions / shards) read latency reduction per added shard. The migration itself is O(historical_rows / network_throughput).

SQL
Topic — design
System design problems (DE)

Practice →


Cheat sheet — ClickHouse recipes

  • Default time-series schema. ENGINE = MergeTree PARTITION BY toYYYYMMDD(ts) ORDER BY (entity_id, toStartOfHour(ts), ts) — coarse partition, sort by the most-filtered column then time.
  • Real-time roll-up. AggregatingMergeTree target + materialized view with -State aggregate functions. Read with *Merge and GROUP BY at query time.
  • Dedup on CDC. ReplacingMergeTree(version_col) with argMax(col, version_col) GROUP BY pk for hot queries; reserve FINAL for low-QPS dashboards.
  • Distributed table. ENGINE = Distributed(cluster, db, local_table, cityHash64(shard_key)) — co-locate the most-grouped column on one shard.
  • Backfill a new MV. Two-step: (1) create the MV (captures new inserts via trigger), (2) INSERT INTO target SELECT ... FROM source WHERE ts < cutoff for history. Avoid POPULATE on live tables.
  • Schema evolve an MV. ALTER MATERIALIZED VIEW mv MODIFY QUERY ... after adding the column to source and target with ADD COLUMN.
  • Test cardinality before partitioning. SELECT uniq(col) FROM table LIMIT 1 — if a candidate partition column has > 1000 distinct values, it is too fine-grained for PARTITION BY.
  • Compress for time-series. CODEC(DoubleDelta, LZ4) on monotonic timestamps; CODEC(T64, LZ4) on bounded integers; CODEC(LZ4HC(9)) for cold data.
  • Replication health. SELECT replica_name, absolute_delay, queue_size FROM system.replicas WHERE absolute_delay > 30.
  • Inspect parts. SELECT partition, count() FROM system.parts WHERE active GROUP BY partition — too many parts per partition (>50) indicates merge pressure.
  • Force a merge (test only). OPTIMIZE TABLE x PARTITION 'YYYYMMDD' FINAL — never run unconditionally in production at scale.
  • Insert via Kafka engine. Kafka table (ENGINE = Kafka) + MV (TO target) + MergeTree target — the canonical three-object pipeline.
  • GLOBAL IN for cross-shard subqueries. Whenever the subquery should yield one global result, write WHERE col GLOBAL IN (subquery).
  • Dictionaries for joins. Define small dimension tables as Dictionary with LAYOUT(HASHED()) + LIFETIME(MIN 300 MAX 600); read with dictGet('dict', 'col', key) for sub-millisecond lookups.

Frequently asked questions

What is ClickHouse used for?

ClickHouse is an open-source columnar OLAP database designed for sub-second interactive analytics over billions of rows. It is the default answer for real-time dashboards, log analytics, event-stream aggregation, ad-tech metrics, and any workload where the read pattern is "aggregate over a column" and the write pattern is "append from a stream or a bulk file." Major deployments at Cloudflare, Uber, ByteDance, and Yandex run ClickHouse at the multi-petabyte scale. It is not a replacement for Postgres / MySQL (no row-level transactions, no point updates) or for Snowflake (slower at heavy multi-join batch). It sits between the stream and the dashboard as the sub-second serving tier.

What is the difference between MergeTree and ReplacingMergeTree?

MergeTree is the base columnar engine — it writes immutable on-disk parts and merges them in the background according to the ORDER BY key. ReplacingMergeTree adds a dedup semantic to the merge: when two rows share the same ORDER BY key, the merge keeps only one of them (the one with the greatest value in an optional version column, otherwise an arbitrary one). Use MergeTree for append-only event streams where every row is unique; use ReplacingMergeTree for CDC sinks where you want "the latest version of every row by primary key." Note that between merges, both versions may exist on disk — production queries pair ReplacingMergeTree with argMax(col, version) GROUP BY pk or with SELECT ... FINAL for the dedup at read time.

How do materialized views work in ClickHouse?

ClickHouse materialized views are insert-time triggers, not refresh-on-schedule snapshots. When you create a materialized view with CREATE MATERIALIZED VIEW mv TO target AS SELECT ... FROM source, the engine fires the SELECT over each insert batch into the source table and writes the result into the target table. There is no schedule, no cron, no full-table refresh. For real-time roll-ups, the target is typically an AggregatingMergeTree that stores partial aggregate states (countState, uniqState, sumState), and the dashboard reads with the matching *Merge functions to finalize the states. The "Refreshable Materialized View" feature added in 2024 is a separate construct that does run on a schedule — but in interviews "materialized view" almost always refers to the insert-time trigger variant.

How does ClickHouse handle updates and deletes?

ClickHouse does not have OLTP-style row updates. The closest equivalents are (1) ALTER TABLE ... UPDATE / DELETE mutations, which rewrite entire affected on-disk parts in the background — fine at low volume but unsuitable for high-frequency point updates; (2) ReplacingMergeTree with a version column, which lets the writer emit a new row per version and the merge dedupes at the sort-key level; (3) CollapsingMergeTree, which collapses paired +1 / -1 rows during merge; and (4) ALTER TABLE ... DROP PARTITION, which is the cheapest way to delete a coarse range (e.g. GDPR-driven cohort deletion at month granularity). If the workload demands frequent point updates, you are using the wrong tool — Postgres or a key-value store is the right answer, and ClickHouse becomes the analytical mirror downstream via CDC.

ClickHouse vs Snowflake — which one for real-time analytics?

For interactive sub-second dashboards over append-heavy data, ClickHouse is the strong default — its columnar storage, vectorised execution, and materialized-view roll-ups land query latencies in the 50–500ms range that Snowflake's compute-warehouse model cannot match without aggressive caching. For batch ELT, long-tail analytics, complex multi-join workloads, and ad-hoc SQL across many domains, Snowflake is the strong default — its separation of storage and compute, mature dbt integration, and seconds-to-minutes latency budget fit the batch pattern. The most common production deployment is both: ClickHouse for the real-time speed lane (kept hot for 30–90 days), Snowflake for the batch warehouse (kept for 5+ years). Pick the one that matches the latency contract, not the cost contract.

Do I need ZooKeeper to run ClickHouse?

For single-node ClickHouse (development, ETL scratch space, small dashboards) — no. For any production cluster with Replicated*MergeTree tables — yes, you need either ZooKeeper or ClickHouse Keeper as the coordination service. ClickHouse Keeper is a Raft-based, C++-implemented drop-in replacement that ships with ClickHouse and is the recommended choice for new clusters since 2023; it can be deployed standalone or co-located on ClickHouse nodes. ZooKeeper remains supported and is the right choice if your organisation already operates a ZooKeeper ensemble. Either way, the coordination service sequences replication log entries, DDL queries, and distributed leadership — without it, replication, ON CLUSTER DDL, and distributed mutations cannot function.

Practice on PipeCode

Pipecode.ai is Leetcode for Data Engineering — every ClickHouse recipe above ships with hands-on practice rooms where you write the MergeTree table definition, the AggregatingMergeTree roll-up MV, and the Distributed-table sharding key against graded inputs that mirror real production schemas. PipeCode pairs every reading with 450+ DE-focused problems and a real-time scoring engine, so you never have to wonder whether your `cityHash64(user_id)` choice actually balances the shards or whether your `uniqState` / `uniqMerge` pairing returns the correct DAU.

Practice real-time analytics now →
Time-series drills →

Top comments (0)