Injecting JSON-formatted data into Clickhouse via Kafka Engine

Overview

ClickHouse can read messages directly from a Kafka topic using the Kafka table engine coupled with a materialized view that fetches messages and pushes them to a ClickHouse target table. Here we provide some examples of reading JSON formatted data from Kafka to Clickhouse mergeTree table.

Creat Kafka topic

kafka-topics --bootstrap-server kafka-broker-1.default.svc.cluster.local:9092 \
--topic clickhouseTestJson --create --partitions 6 --replication-factor 2

MergeTree Table

Here we define a mergeTree table which will be used to store injected data.

CREATE TABLE IF NOT EXISTS event
(
    date Date DEFAULT toDate(timestamp, 'UTC') Codec(ZSTD),
    id Int64 Codec(Gorilla, LZ4),
    timestamp_1min UInt64 DEFAULT (floor(timestamp/60) * 60) Codec(DoubleDelta, LZ4),
    message LowCardinality(String),
    timestamp UInt64 Codec(DoubleDelta, LZ4),
    measure_int UInt64 Codec(Gorilla, LZ4),
    measure_float Float64 Codec(Gorilla, LZ4),
    measure_string String Codec(ZSTD)

)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/default/customer',
    '{replica}'
)
PARTITION BY toStartOfMonth(date)
ORDER BY (
    id,
    timestamp_1min
)

Kafka engine with JSONEachRow

{
  "id": "123",
  "message": "test",
  "timestamp": 1234567,
  "measure_int": 987
}

If the input data looks like above, we can define a Kafka Engine table with JSONEachRow format.

CREATE TABLE IF NOT EXISTS json_queue1 (
    id Int64,
    message LowCardinality(String),
    timestamp UInt64,
    measure_int UInt64,
    measure_float Float64,
    measure_string String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-broker-1.default.svc.cluster.local:9092',
       kafka_topic_list = 'clickhouseTestJson',
       kafka_group_name = 'clickhouseTestJsonGroup1',
       kafka_format = 'JSONEachRow',
       kafka_skip_broken_messages = 10000,
       kafka_max_block_size = 1048576;

-- materialized view to automatically move data from Kafka to target table.
CREATE MATERIALIZED VIEW json_mv1 TO event AS
SELECT id, timestamp, message, measure_int, measure_float, measure_string
FROM json_queue1

Kafka engine with JSONAsString

{
  "id": "123",
  "timestamp": 1234567,
  "payload" : {
    "message": "test",
    "measure_string": "haha"
  }
}

If the input data has nested object, we can use JSONAsString format. The JSON object will be parsed and extracted in the materialized view.

CREATE TABLE IF NOT EXISTS json_queue2 (
  all String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-broker-1.default.svc.cluster.local:9092',
       kafka_topic_list = 'clickhouseTestJson',
       kafka_group_name = 'clickhouseTestJsonGroup2',
       kafka_format = 'JSONAsString',
       kafka_skip_broken_messages = 10000,
       kafka_max_block_size = 1048576;


-- materialized view to automatically move data from Kafka to target table.
-- Find more ways to extrac values from JSON: https://clickhouse.com/docs/en/sql-reference/functions/json-functions/
CREATE MATERIALIZED VIEW json_mv2 TO event AS
SELECT 
    JSONExtract(all, 'id', 'Int64') AS id, 
    JSONExtract(all, 'timestamp', 'Int64') AS timestamp, 
    JSONExtractString(all, 'payload', 'message') AS message, 
    JSONExtractString(all, 'payload', 'measure_string') AS measure_string, 
FROM json_queue2

Yuhui Lin
Yuhui Lin
Software Engineer
Previous