← All posts
Tutorials· 3 min read

Mirroring MySQL to ClickHouse in Near-Real-Time with Debezium + Kafka

A practical CDC pipeline: stream inserts, updates, and deletes from MySQL into ClickHouse in under five minutes — using Debezium, Kafka, and ReplacingMergeTree to keep an append-only columnar store correct.

You have an OLTP MySQL database and you want analytics on it — fast, fresh, and without hammering the source. The naive answer is a scheduled job that polls WHERE updated_at > last_run. It breaks the moment you care about deletes (polling never sees them) or you don’t have a reliable updated_at column.

The robust answer is change data capture (CDC): read MySQL’s binary log, which already records every insert, update, and delete in order, and stream those changes into a columnar store. Here’s the whole pipeline.

1. Capture changes with Debezium

Debezium reads the binlog and emits one message per row change. Register the connector against Kafka Connect:

{
  "name": "mysql-epoch-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.server.id": "184054",
    "topic.prefix": "thryve",
    "table.include.list": "health.epoch_data",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "op,ts_ms"
  }
}
curl -X POST -H "Content-Type: application/json" \
  --data @connector.json http://localhost:8083/connectors

The unwrap transform flattens Debezium’s verbose before/after envelope down to just the row, plus three metadata fields: __op (c/u/d), __ts_ms, and __deleted. That flattening is what keeps the ClickHouse side simple.

2. Consume the topic with a Kafka engine table

ClickHouse can read Kafka natively. This table is a moving cursor over the topic — not storage:

CREATE TABLE kafka_epoch_data (
  user_id     UInt64,
  recorded_at DateTime64(3),
  metric      LowCardinality(String),
  value       Float64,
  __ts_ms     UInt64,
  __deleted   UInt8
) ENGINE = Kafka
SETTINGS
  kafka_broker_list = 'kafka:9092',
  kafka_topic_list  = 'thryve.health.epoch_data',
  kafka_group_name  = 'ch_epoch',
  kafka_format      = 'JSONEachRow';

3. Land it in a ReplacingMergeTree

The destination table uses ReplacingMergeTree with a version and an is_deleted column — the two-argument form that understands deletes:

CREATE TABLE epoch_data (
  user_id     UInt64,
  recorded_at DateTime64(3),
  metric      LowCardinality(String),
  value       Float64,
  version     UInt64,
  is_deleted  UInt8
) ENGINE = ReplacingMergeTree(version, is_deleted)
ORDER BY (user_id, metric, recorded_at);

4. Wire them together with a materialized view

The materialized view is the pump: every batch the Kafka engine reads gets transformed and inserted into the target.

CREATE MATERIALIZED VIEW mv_epoch_data TO epoch_data AS
SELECT
  user_id,
  recorded_at,
  metric,
  value,
  __ts_ms   AS version,
  __deleted AS is_deleted
FROM kafka_epoch_data;

Why this stays correct

ClickHouse is append-only, so an UPDATE upstream arrives as a new row with a higher version. ReplacingMergeTree collapses rows that share the ORDER BY key during background merges, keeping the one with the highest version. A DELETE arrives as a row with is_deleted = 1, and the engine drops it on merge.

Merges are asynchronous, so for read-time correctness either query with FINAL or filter explicitly:

SELECT user_id, metric, avg(value)
FROM epoch_data FINAL
WHERE is_deleted = 0
GROUP BY user_id, metric;

What to defer

You do not need Flink for this. Materialized views handle incremental aggregation on insert. Add a schema registry when schema evolution starts hurting, and a stream processor only when your transformations outgrow SQL. Start lean — this stack runs comfortably around $200/month and scales to a billion-plus rows.