Skip to main content
Skip to main content
Edit this page

Using the Kafka table engine

The Kafka table engine can be used to read data from and write data to Apache Kafka and other Kafka API-compatible brokers (e.g., Redpanda, Amazon MSK). This engine is bundled with open-source ClickHouse and is available across all deployment types.

Kafka to ClickHouse

Tip

If you're on ClickHouse Cloud, we recommend using ClickPipes instead. ClickPipes natively supports private network connections, scaling ingestion and cluster resources independently, and comprehensive monitoring for streaming Kafka data into ClickHouse.

You can use the Kafka table engine to ingest data from Kafka topics into ClickHouse. The engine is designed to continuously consume and stream messages to attached materialized views, which then insert data into target tables for persistent storage. For this reason, you should be broadly familiar with materialized views and Merge Tree family tables when using the Kafka table engine to read data from Apache Kafka and other Kafka API-compatible brokers.

The engine ensures reliable processing through at-least-once semantics: consumer offsets are only committed to Kafka after all attached materialized views successfully process each batch of messages. If there is an error in any materialized view attached to the engine, the consumer offsets will not be committed, and the same messages will be retried until all materialized views succeed. This means that it is possible to get duplicates in failure scenarios.

Quickstart

To get started ingesting data from Kafka into ClickHouse, follow the steps below. If you already have an existing topic you'd like to consume data from, skip to Step 3.

1. Prepare a sample dataset

To create a new topic with a sample dataset for testing, you can use this Github dataset. To download the dataset, run:

curl -O https://datasets-documentation.s3.eu-west-3.amazonaws.com/kafka/github_all_columns.ndjson

This dataset is a subset of the GitHub archive dataset, modified to include only GitHub events for the ClickHouse repository. Most queries published with the dataset can be used with this modified version to explore the data in ClickHouse, once ingested.

2. Create and populate the topic

Next, create a new topic in your target broker. For example, if you're running Kafka locally, you can use the built-in Kafka CLI tools:

bin/kafka-topics.sh --bootstrap-server <host>:<port> --topic github --partitions 3

If you're using a hosted service like Confluent Cloud, you can use the Cloud Console or a client like the Confluent CLI:

confluent kafka topic create --if-not-exists github --partitions 3

To load the sample dataset into the topic, you can then use a command-line tool like kcat. For example, if you're running Kafka locally with authentication disabled:

cat github_all_columns.ndjson |
kcat -P \
  -b <host>:<port> \
  -t github

Or if you're using a hosted service like Confluent Cloud with SASL authentication:

cat github_all_columns.ndjson |
kcat -P \
  -b <host>:<port> \
  -t github \
  -X security.protocol=sasl_ssl \
  -X sasl.mechanisms=PLAIN \
  -X sasl.username=<username>  \
  -X sasl.password=<password>

The dataset contains 200,000 rows, so it should be available in the specified topic in a few seconds. If you want to work with a larger dataset, take a look at the large datasets section of the ClickHouse/kafka-samples GitHub repository.

3. Configure data ingestion

Before ClickHouse can ingest data from a Kafka topic, you must first provide details on how to connect to and authenticate with your Kafka broker, as well as how to interpret the data. In this example, the Kafka broker uses simple authentication (SASL), the source data is JSON-encoded and no schema registry is used. For a complete overview of all the supported formats, features, and configuration options, see the reference documentation.

In ClickHouse Cloud, you can provide inline credentials in the Kafka table engine CREATE TABLE statement using the SETTINGS clause. See the reference documentation for supported setting configurations.

References

It is not possible to connect to brokers using TLS/SSL from ClickHouse Cloud, since there is no mechanism to upload and rotate certificates yet — only SASL is supported. If this is a requirement for your use case, we recommend using ClickPipes or the Kafka Connect Sink instead.

CREATE TABLE github_queue
(
    file_time DateTime,
    event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
    actor_login LowCardinality(String),
    repo_name LowCardinality(String),
    created_at DateTime,
    updated_at DateTime,
    action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
    comment_id UInt64,
    path String,
    ref LowCardinality(String),
    ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
    creator_user_login LowCardinality(String),
    number UInt32,
    title String,
    labels Array(LowCardinality(String)),
    state Enum('none' = 0, 'open' = 1, 'closed' = 2),
    assignee LowCardinality(String),
    assignees Array(LowCardinality(String)),
    closed_at DateTime,
    merged_at DateTime,
    merge_commit_sha String,
    requested_reviewers Array(LowCardinality(String)),
    merged_by LowCardinality(String),
    review_comments UInt32,
    member_login LowCardinality(String)
)
ENGINE = Kafka()
SETTINGS kafka_broker_list='<host>:<port>',
         kafka_topic_list='github',
         kafka_group_name='clickhouse',
         kafka_format = 'JSONEachRow',
         kafka_thread_per_consumer = 0, 
         kafka_num_consumers = 1,
         -- Connecting to a Confluent Cloud broker using
         -- simple username/password authentication
         kafka_security_protocol='sasl_ssl',
         kafka_sasl_mechanism = 'PLAIN',
         kafka_sasl_username = '<username>',
         kafka_sasl_password = '<password>';

It's important to note that creating a Kafka table engine table does not start data ingestion — it simply configures a consumer. After this step, you must create a target table and a materialized view to start data ingestion from the specified topic.

Tip

The Kafka table engine is designed for one-time data retrieval. You should never select data from a Kafka table directly, but use a materialized view and query its associated target table instead.

4. Create a target table

Once you define the schema of your Kafka table engine table, you must create a target table that will persist the data in ClickHouse. If the schema of your target table is the same as the schema you defined for the ingestion table (github_queue), you can use the CREATE TABLE AS syntax to copy that schema over.

CREATE TABLE github AS github_queue
ENGINE = MergeTree()
ORDER BY (event_type, repo_name, created_at);

This table must use an engine of the Merge Tree family. For simplicity, this example uses the MergeTree() engine, but you should evaluate the best fit for your use case.

5. Create a materialized view

To ingest data, the Kafka table engine must be attached to a materialized view. As new messages are detected in the upstream Kafka broker, the materialized view is triggered to insert data into persistent storage (i.e., the target table you created in the previous step).

In this example, the target table (github) has the same schema as the ingestion table (github_queue), so the materialized view will do a simple SELECT *.

CREATE MATERIALIZED VIEW github_mv TO github AS
SELECT *
FROM github_queue;

Once created, the materialized view connects to the Kafka table engine and kickstarts data ingestion. This process continues indefinitely: reading new data from the upstream Kafka broker, triggering the materialized view, and inserting data into the target table.

6. Confirm rows have been inserted

To confirm that all messages were processed and stored in ClickHouse, run a count against your target table.

SELECT count() FROM github;

It's important to note that the Kafka table engine processes data in discrete batches (controlled by settings like kafka_max_block_size and kafka_flush_interval_ms), which means that you might see previous state while a batch of rows is being processed, but never partially processed batches. When all data has been processed, you should see 200,000 rows:

┌─count()─┐
│  200000 │
└─────────┘

To monitor ingestion progress and debug errors with the Kafka consumer, you can query the system.kafka_consumers system table. If your deployment has multiple replicas (e.g., ClickHouse Cloud), you must use the clusterAllReplicas table function.

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

Common operations

Troubleshooting

System tables

To troubleshoot errors with the Kafka consumer, you can query the system.kafka_consumers system table.

SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;

Log files

Logging for the Kafka table engine is reported in the ClickHouse server logs, which are not exposed to users in ClickHouse Cloud. If you can't track down the issue using the methods above, you should contact the ClickHouse support team for server-level log analysis.

Stopping & restarting message consumption

To stop message consumption, you can detach the Kafka engine table:

DETACH TABLE github_queue;

This will not impact the offsets of the consumer group. To restart consumption, and continue from the previous offset, reattach the table.

ATTACH TABLE github_queue;
Using Kafka metadata

In addition to the message value, the Kafka table engine also exposes Kafka metadata fields like the message key, headers, and others as virtual columns. These virtual columns are prefixed with _ and can be added as columns in your target table on creation.

If you want to add metadata columns to an existing target table, you must first detach the Kafka table to stop data ingestion.

DETACH TABLE github_queue;

To add new columns to the target table for persisting Kafka metadata, use the ALTER TABLE...ADD COLUMN statement. For example:

ALTER TABLE github
   ADD COLUMN topic String,
   ADD COLUMN partition UInt64;

Next, you must adjust the materialized view to start consuming these metadata columns. With the Kafka table detached, it's safe to drop the materialized view, re-attach the table, and re-create the materialized view to resume ingestion.

DROP VIEW github_mv;
ATTACH TABLE github_queue;
CREATE MATERIALIZED VIEW github_mv TO github AS
SELECT *, _topic as topic, _partition as partition
FROM github_queue;

When you select from the target table, you should see the additional columns populated.

SELECT actor_login, event_type, created_at, topic, partition
FROM github
LIMIT 10;
actor_loginevent_typecreated_attopicpartition
IgorMinarCommitCommentEvent2011-02-12 02:22:00github0
queeupCommitCommentEvent2011-02-12 02:23:23github0
IgorMinarCommitCommentEvent2011-02-12 02:23:24github0
IgorMinarCommitCommentEvent2011-02-12 02:24:50github0
IgorMinarCommitCommentEvent2011-02-12 02:25:20github0
dapiCommitCommentEvent2011-02-12 06:18:36github0
sourcerebelsCommitCommentEvent2011-02-12 06:34:10github0
jamierumbelowCommitCommentEvent2011-02-12 12:21:40github0
jpnCommitCommentEvent2011-02-12 12:24:31github0
OxoniumCommitCommentEvent2011-02-12 12:31:28github0

See the reference documentation for a complete list of supported Kafka metadata fields.

Modifying table settings

To modify Kafka table settings, we recommend dropping and recreating the table with the new configuration. The materialized view does not need to be modified - message consumption will automatically resume from the last committed offset when the table is recreated.

While you can use ALTER TABLE...MODIFY SETTING for simple settings like kafka_max_block_size, dropping and recreating is more reliable (and often required) for significant configuration changes such as broker lists, consumer groups, topics, or authentication settings.

Handling malformed messages

Kafka is often used as a "dumping ground" for data. This leads to topics containing mixed message formats and inconsistent field names. Avoid this and utilize Kafka features such Kafka Streams or ksqlDB to ensure messages are well-formed and consistent before insertion into Kafka. If these options are not possible, ClickHouse has some features that can help.

  • Treat the message field as strings. Functions can be used in the materialized view statement to perform cleansing and casting if required. This should not represent a production solution but might assist in one-off ingestion.
  • If you're consuming JSON from a topic, using the JSONEachRow format, use the setting input_format_skip_unknown_fields. When writing data, by default, ClickHouse throws an exception if input data contains columns that do not exist in the target table. However, if this option is enabled, these excess columns will be ignored. Again this is not a production-level solution and might confuse others.
  • Consider the setting kafka_skip_broken_messages. This requires the user to specify the level of tolerance per block for malformed messages - considered in the context of kafka_max_block_size. If this tolerance is exceeded (measured in absolute messages) the usual exception behaviour will revert, and other messages will be skipped.
Delivery Semantics and challenges with duplicates

The Kafka table engine has at-least-once semantics. Duplicates are possible in several known rare circumstances. For example, messages could be read from Kafka and successfully inserted into ClickHouse. Before the new offset can be committed, the connection to Kafka is lost. A retry of the block in this situation is required. The block may be de-duplicated using a distributed table or ReplicatedMergeTree as the target table. While this reduces the chance of duplicate rows, it relies on identical blocks. Events such as a Kafka rebalancing may invalidate this assumption, causing duplicates in rare circumstances.

Quorum based Inserts

You may need quorum-based inserts for cases where higher delivery guarantees are required in ClickHouse. This can't be set on the materialized view or the target table. It can, however, be set for user profiles. For example:

<profiles>
  <default>
    <insert_quorum>2</insert_quorum>
  </default>
</profiles>

ClickHouse to Kafka

Note

If you're on ClickHouse Cloud, it's important to note that private network connections are not supported. This means that your broker(s) must be configured for public access.

You can use the Kafka table engine to write data from ClickHouse to Kafka topics. The engine is designed to push messages to a topic when triggered by direct inserts or updates in attached materialized views. For this reason, you should be broadly familiar with materialized views when using the Kafka table engine to write data to Apache Kafka and other Kafka API-compatible brokers.

The engine ensures reliable delivery through at-least-once semantics: when data is inserted into a Kafka table, the operation only succeeds after the data is successfully sent to the Kafka topic. If there is an error sending data to Kafka (e.g., network connectivity issues, Kafka broker unavailability), the engine will automatically handle retries. This means that it is possible to get duplicates in failure scenarios if, for example, data reaches Kafka but the acknowledgment is lost due to network connectivity issues, causing the operation to be retried.

Quickstart

To get started writing data from ClickHouse into Kafka, follow the steps below. If you already have an existing topic you'd like to produce data to, skip to Step 2.

1. Create a target topic

Create a new topic in your target broker. For example, if you're running Kafka locally, you can use the built-in Kafka CLI tools:

bin/kafka-topics.sh --bootstrap-server <host>:<port> --topic github_out --partitions 3

If you're using a hosted service like Confluent Cloud, you can use the Cloud Console or a client like the Confluent CLI:

confluent kafka topic create --if-not-exists github_out --partitions 3
2. Produce data to the target topic

To produce data.

Using materialized views

We can utilize materialized views to push messages to a Kafka engine (and a topic) when documents are inserted into a table. When rows are inserted into the GitHub table, a materialized view is triggered, which causes the rows to be inserted back into a Kafka engine and into a new topic. Again this is best illustrated:

Create a new Kafka topic github_out or equivalent. Ensure a Kafka table engine github_out_queue points to this topic.

CREATE TABLE github_out_queue
(
    file_time DateTime,
    event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
    actor_login LowCardinality(String),
    repo_name LowCardinality(String),
    created_at DateTime,
    updated_at DateTime,
    action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
    comment_id UInt64,
    path String,
    ref LowCardinality(String),
    ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
    creator_user_login LowCardinality(String),
    number UInt32,
    title String,
    labels Array(LowCardinality(String)),
    state Enum('none' = 0, 'open' = 1, 'closed' = 2),
    assignee LowCardinality(String),
    assignees Array(LowCardinality(String)),
    closed_at DateTime,
    merged_at DateTime,
    merge_commit_sha String,
    requested_reviewers Array(LowCardinality(String)),
    merged_by LowCardinality(String),
    review_comments UInt32,
    member_login LowCardinality(String)
)
   ENGINE = Kafka('host:port', 'github_out', 'clickhouse_out',
            'JSONEachRow') settings kafka_thread_per_consumer = 0, kafka_num_consumers = 1;

Now create a new materialized view github_out_mv to point at the GitHub table, inserting rows to the above engine when it triggers. Additions to the GitHub table will, as a result, be pushed to our new Kafka topic.

CREATE MATERIALIZED VIEW github_out_mv TO github_out_queue AS
SELECT file_time, event_type, actor_login, repo_name,
       created_at, updated_at, action, comment_id, path,
       ref, ref_type, creator_user_login, number, title,
       labels, state, assignee, assignees, closed_at, merged_at,
       merge_commit_sha, requested_reviewers, merged_by,
       review_comments, member_login
FROM github
FORMAT JsonEachRow;

Using direct inserts

First, confirm the count of the target table.

SELECT count() FROM github;

You should have 200,000 rows:

┌─count()─┐
│  200000 │
└─────────┘

Now insert rows from the GitHub target table back into the Kafka table engine github_queue. Note how we utilize JSONEachRow format and LIMIT the select to 100.

INSERT INTO github_queue SELECT * FROM github LIMIT 100 FORMAT JSONEachRow

Recount the row in GitHub to confirm it has increased by 100. As shown in the above diagram, rows have been inserted into Kafka via the Kafka table engine before being re-read by the same engine and inserted into the GitHub target table by our materialized view!

SELECT count() FROM github;

You should see 100 additional rows:

┌─count()─┐
│  200100 │
└─────────┘

Should you insert into the original github topic, created as part of Kafka to ClickHouse, documents will magically appear in the "github_clickhouse" topic. Confirm this with native Kafka tooling. For example, below, we insert 100 rows onto the github topic using kcat for a Confluent Cloud hosted topic:

head -n 10 github_all_columns.ndjson |
kcat -P \
  -b <host>:<port> \
  -t github
  -X security.protocol=sasl_ssl \
  -X sasl.mechanisms=PLAIN \
  -X sasl.username=<username> \
  -X sasl.password=<password>

A read on the github_out topic should confirm delivery of the messages.

kcat -C \
  -b <host>:<port> \
  -t github_out \
  -X security.protocol=sasl_ssl \
  -X sasl.mechanisms=PLAIN \
  -X sasl.username=<username> \
  -X sasl.password=<password> \
  -e -q |
wc -l

Although an elaborate example, this illustrates the power of materialized views when used in conjunction with the Kafka engine.

Common operations

Clusters and Performance

Working with ClickHouse Clusters

Through Kafka consumer groups, multiple ClickHouse instances can potentially read from the same topic. Each consumer will be assigned to a topic partition in a 1:1 mapping. When scaling ClickHouse consumption using the Kafka table engine, consider that the total number of consumers within a cluster cannot exceed the number of partitions on the topic. Therefore ensure partitioning is appropriately configured for the topic in advance.

Multiple ClickHouse instances can all be configured to read from a topic using the same consumer group id - specified during the Kafka table engine creation. Therefore, each instance will read from one or more partitions, inserting segments to their local target table. The target tables can, in turn, be configured to use a ReplicatedMergeTree to handle duplication of the data. This approach allows Kafka reads to be scaled with the ClickHouse cluster, provided there are sufficient Kafka partitions.

Tuning Performance

Consider the following when looking to increase Kafka Engine table throughput performance:

  • The performance will vary depending on the message size, format, and target table types. 100k rows/sec on a single table engine should be considered obtainable. By default, messages are read in blocks, controlled by the parameter kafka_max_block_size. By default, this is set to the max_insert_block_size, defaulting to 1,048,576. Unless messages are extremely large, this should nearly always be increased. Values between 500k to 1M are not uncommon. Test and evaluate the effect on throughput performance.
  • The number of consumers for a table engine can be increased using kafka_num_consumers. However, by default, inserts will be linearized in a single thread unless kafka_thread_per_consumer is changed from the default value of 1. Set this to 1 to ensure flushes are performed in parallel. Note that creating a Kafka engine table with N consumers (and kafka_thread_per_consumer=1) is logically equivalent to creating N Kafka engines, each with a materialized view and kafka_thread_per_consumer=0.
  • Increasing consumers is not a free operation. Each consumer maintains its own buffers and threads, increasing the overhead on the server. Be conscious of the overhead of consumers and scale linearly across your cluster first and if possible.
  • If the throughput of Kafka messages is variable and delays are acceptable, consider increasing the stream_flush_interval_ms to ensure larger blocks are flushed.
  • background_message_broker_schedule_pool_size sets the number of threads performing background tasks. These threads are used for Kafka streaming. This setting is applied at the ClickHouse server start and can't be changed in a user session, defaulting to 16. If you see timeouts in the logs, it may be appropriate to increase this.
  • For communication with Kafka, the librdkafka library is used, which itself creates threads. Large numbers of Kafka tables, or consumers, can thus result in large numbers of context switches. Either distribute this load across the cluster, only replicating the target tables if possible, or consider using a table engine to read from multiple topics - a list of values is supported. Multiple materialized views can be read from a single table, each filtering to the data from a specific topic.

Any settings changes should be tested. We recommend monitoring Kafka consumer lags to ensure you are properly scaled.

Additional Settings

Aside from the settings discussed above, the following may be of interest:

  • Kafka_max_wait_ms - The wait time in milliseconds for reading messages from Kafka before retry. Set at a user profile level and defaults to 5000.

All settings from the underlying librdkafka can also be placed in the ClickHouse configuration files inside a kafka element - setting names should be XML elements with periods replaced with underscores e.g.

<clickhouse>
   <kafka>
       <enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
   </kafka>
</clickhouse>

These are expert settings and we'd suggest you refer to the Kafka documentation for an in-depth explanation.