Injecting JSON-formatted data into Clickhouse via Kafka Engine
Overview
ClickHouse can read messages directly from a Kafka topic using the Kafka table engine coupled with a materialized view that fetches messages and pushes them to a ClickHouse target table. Here we provide some examples of reading JSON formatted data from Kafka to Clickhouse mergeTree table.
Creat Kafka topic
kafka-topics --bootstrap-server kafka-broker-1.default.svc.cluster.local:9092 \
--topic clickhouseTestJson --create --partitions 6 --replication-factor 2
MergeTree Table
Here we define a mergeTree table which will be used to store injected data.
CREATE TABLE IF NOT EXISTS event
(
date Date DEFAULT toDate(timestamp, 'UTC') Codec(ZSTD),
id Int64 Codec(Gorilla, LZ4),
timestamp_1min UInt64 DEFAULT (floor(timestamp/60) * 60) Codec(DoubleDelta, LZ4),
message LowCardinality(String),
timestamp UInt64 Codec(DoubleDelta, LZ4),
measure_int UInt64 Codec(Gorilla, LZ4),
measure_float Float64 Codec(Gorilla, LZ4),
measure_string String Codec(ZSTD)
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/default/customer',
'{replica}'
)
PARTITION BY toStartOfMonth(date)
ORDER BY (
id,
timestamp_1min
)
Kafka engine with JSONEachRow
{
"id": "123",
"message": "test",
"timestamp": 1234567,
"measure_int": 987
}
If the input data looks like above, we can define a Kafka Engine table with JSONEachRow
format.
CREATE TABLE IF NOT EXISTS json_queue1 (
id Int64,
message LowCardinality(String),
timestamp UInt64,
measure_int UInt64,
measure_float Float64,
measure_string String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-broker-1.default.svc.cluster.local:9092',
kafka_topic_list = 'clickhouseTestJson',
kafka_group_name = 'clickhouseTestJsonGroup1',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 10000,
kafka_max_block_size = 1048576;
-- materialized view to automatically move data from Kafka to target table.
CREATE MATERIALIZED VIEW json_mv1 TO event AS
SELECT id, timestamp, message, measure_int, measure_float, measure_string
FROM json_queue1
Kafka engine with JSONAsString
{
"id": "123",
"timestamp": 1234567,
"payload" : {
"message": "test",
"measure_string": "haha"
}
}
If the input data has nested object, we can use JSONAsString
format. The JSON object will be parsed and extracted in the materialized view.
CREATE TABLE IF NOT EXISTS json_queue2 (
all String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-broker-1.default.svc.cluster.local:9092',
kafka_topic_list = 'clickhouseTestJson',
kafka_group_name = 'clickhouseTestJsonGroup2',
kafka_format = 'JSONAsString',
kafka_skip_broken_messages = 10000,
kafka_max_block_size = 1048576;
-- materialized view to automatically move data from Kafka to target table.
-- Find more ways to extrac values from JSON: https://clickhouse.com/docs/en/sql-reference/functions/json-functions/
CREATE MATERIALIZED VIEW json_mv2 TO event AS
SELECT
JSONExtract(all, 'id', 'Int64') AS id,
JSONExtract(all, 'timestamp', 'Int64') AS timestamp,
JSONExtractString(all, 'payload', 'message') AS message,
JSONExtractString(all, 'payload', 'measure_string') AS measure_string,
FROM json_queue2