You have an OLTP MySQL database and you want analytics on it — fast, fresh, and without hammering the source. The naive answer is a scheduled job that polls WHERE updated_at > last_run. It breaks the moment you care about deletes (polling never sees them) or you don’t have a reliable updated_at column.
The robust answer is change data capture (CDC): read MySQL’s binary log, which already records every insert, update, and delete in order, and stream those changes into a columnar store. Here’s the whole pipeline.
1. Capture changes with Debezium
Debezium reads the binlog and emits one message per row change. Register the connector against Kafka Connect:
{
"name": "mysql-epoch-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.server.id": "184054",
"topic.prefix": "thryve",
"table.include.list": "health.epoch_data",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,ts_ms"
}
}
curl -X POST -H "Content-Type: application/json" \
--data @connector.json http://localhost:8083/connectors
The unwrap transform flattens Debezium’s verbose before/after envelope down to just the row, plus three metadata fields: __op (c/u/d), __ts_ms, and __deleted. That flattening is what keeps the ClickHouse side simple.
2. Consume the topic with a Kafka engine table
ClickHouse can read Kafka natively. This table is a moving cursor over the topic — not storage:
CREATE TABLE kafka_epoch_data (
user_id UInt64,
recorded_at DateTime64(3),
metric LowCardinality(String),
value Float64,
__ts_ms UInt64,
__deleted UInt8
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'thryve.health.epoch_data',
kafka_group_name = 'ch_epoch',
kafka_format = 'JSONEachRow';
3. Land it in a ReplacingMergeTree
The destination table uses ReplacingMergeTree with a version and an is_deleted column — the two-argument form that understands deletes:
CREATE TABLE epoch_data (
user_id UInt64,
recorded_at DateTime64(3),
metric LowCardinality(String),
value Float64,
version UInt64,
is_deleted UInt8
) ENGINE = ReplacingMergeTree(version, is_deleted)
ORDER BY (user_id, metric, recorded_at);
4. Wire them together with a materialized view
The materialized view is the pump: every batch the Kafka engine reads gets transformed and inserted into the target.
CREATE MATERIALIZED VIEW mv_epoch_data TO epoch_data AS
SELECT
user_id,
recorded_at,
metric,
value,
__ts_ms AS version,
__deleted AS is_deleted
FROM kafka_epoch_data;
Why this stays correct
ClickHouse is append-only, so an UPDATE upstream arrives as a new row with a higher version. ReplacingMergeTree collapses rows that share the ORDER BY key during background merges, keeping the one with the highest version. A DELETE arrives as a row with is_deleted = 1, and the engine drops it on merge.
Merges are asynchronous, so for read-time correctness either query with FINAL or filter explicitly:
SELECT user_id, metric, avg(value)
FROM epoch_data FINAL
WHERE is_deleted = 0
GROUP BY user_id, metric;
What to defer
You do not need Flink for this. Materialized views handle incremental aggregation on insert. Add a schema registry when schema evolution starts hurting, and a stream processor only when your transformations outgrow SQL. Start lean — this stack runs comfortably around $200/month and scales to a billion-plus rows.