Using the Kafka table engine
The Kafka table engine can be used to read data from and write data to Apache Kafka and other Kafka API-compatible brokers (e.g., Redpanda, Amazon MSK). This engine is bundled with open-source ClickHouse and is available across all deployment types.
Kafka to ClickHouse
If you're on ClickHouse Cloud, we recommend using ClickPipes instead. ClickPipes natively supports private network connections, scaling ingestion and cluster resources independently, and comprehensive monitoring for streaming Kafka data into ClickHouse.
You can use the Kafka table engine to ingest data from Kafka topics into ClickHouse. The engine is designed to continuously consume and stream messages to attached materialized views, which then insert data into target tables for persistent storage. For this reason, you should be broadly familiar with materialized views and Merge Tree family tables when using the Kafka table engine to read data from Apache Kafka and other Kafka API-compatible brokers.
The engine ensures reliable processing through at-least-once semantics: consumer offsets are only committed to Kafka after all attached materialized views successfully process each batch of messages. If there is an error in any materialized view attached to the engine, the consumer offsets will not be committed, and the same messages will be retried until all materialized views succeed. This means that it is possible to get duplicates in failure scenarios.
Quickstart
To get started ingesting data from Kafka into ClickHouse, follow the steps below. If you already have an existing topic you'd like to consume data from, skip to Step 3.
1. Prepare a sample dataset
To create a new topic with a sample dataset for testing, you can use this Github dataset. To download the dataset, run:
This dataset is a subset of the GitHub archive dataset, modified to include only GitHub events for the ClickHouse repository. Most queries published with the dataset can be used with this modified version to explore the data in ClickHouse, once ingested.
2. Create and populate the topic
Next, create a new topic in your target broker. For example, if you're running Kafka locally, you can use the built-in Kafka CLI tools:
If you're using a hosted service like Confluent Cloud, you can use the Cloud Console or a client like the Confluent CLI:
To load the sample dataset into the topic, you can then use a command-line tool like kcat. For example, if you're running Kafka locally with authentication disabled:
Or if you're using a hosted service like Confluent Cloud with SASL authentication:
The dataset contains 200,000 rows, so it should be available in the specified topic in a few seconds. If you want to work with a larger dataset, take a look at the large datasets section of the ClickHouse/kafka-samples GitHub repository.
3. Configure data ingestion
Before ClickHouse can ingest data from a Kafka topic, you must first provide details on how to connect to and authenticate with your Kafka broker, as well as how to interpret the data. In this example, the Kafka broker uses simple authentication (SASL), the source data is JSON-encoded and no schema registry is used. For a complete overview of all the supported formats, features, and configuration options, see the reference documentation.
- ClickHouse Cloud
- Self-hosted ClickHouse
In ClickHouse Cloud, you can provide inline credentials in the Kafka table engine CREATE TABLE statement using the SETTINGS clause. See the reference documentation for supported setting configurations.
It is not possible to connect to brokers using TLS/SSL from ClickHouse Cloud, since there is no mechanism to upload and rotate certificates yet — only SASL is supported. If this is a requirement for your use case, we recommend using ClickPipes or the Kafka Connect Sink instead.
In self-hosted ClickHouse, you can configure credentials using configuration files, named collections, or inline in the CREATE TABLE statement using the SETTINGS clause.
Inline credentials are a good fit for prototyping (e.g., to follow the steps in this guide). For production environments, or environments with a large number of tables reading from the same broker, we recommend using configuration files or named collections to manage credentials.
Configuration files
The Kafka table engine supports extended configuration using ClickHouse config files. You can either place the Kafka-specific configuration in a new file under the conf.d/ directory, or append it to existing configuration files.
See the reference documentation for supported configuration keys.
Named collections
You can use named collections to securely store and reuse credentials across multiple CREATE TABLE statements.
You can then inline a named collection in the ENGINE clause of Kafka table engine CREATE TABLE statements.
See the reference documentation for supported configuration keys and this guide for a complete walkthrough of using named collections with the Kafka table engine.
Inline
You can provide inline credentials in the Kafka table engine CREATE TABLE statement using the SETTINGS clause. See the reference documentation for supported setting configurations.
It is not possible to configure certificates to connect to brokers using TLS/SSL using this method, since the required options are not exposed via SQL — only SASL is supported. If this is the case, use one of the other methods instead.
It's important to note that creating a Kafka table engine table does not start data ingestion — it simply configures a consumer. After this step, you must create a target table and a materialized view to start data ingestion from the specified topic.
The Kafka table engine is designed for one-time data retrieval. You should never select data from a Kafka table directly, but use a materialized view and query its associated target table instead.
4. Create a target table
Once you define the schema of your Kafka table engine table, you must create a target table that will persist the data in ClickHouse. If the schema of your target table is the same as the schema you defined for the ingestion table (github_queue), you can use the CREATE TABLE AS syntax to copy that schema over.
This table must use an engine of the Merge Tree family. For simplicity, this example uses the MergeTree() engine, but you should evaluate the best fit for your use case.
5. Create a materialized view
To ingest data, the Kafka table engine must be attached to a materialized view. As new messages are detected in the upstream Kafka broker, the materialized view is triggered to insert data into persistent storage (i.e., the target table you created in the previous step).
In this example, the target table (github) has the same schema as the ingestion table (github_queue), so the materialized view will do a simple SELECT *.
Once created, the materialized view connects to the Kafka table engine and kickstarts data ingestion. This process continues indefinitely: reading new data from the upstream Kafka broker, triggering the materialized view, and inserting data into the target table.
6. Confirm rows have been inserted
To confirm that all messages were processed and stored in ClickHouse, run a count against your target table.
It's important to note that the Kafka table engine processes data in discrete batches (controlled by settings like kafka_max_block_size and kafka_flush_interval_ms), which means that you might see previous state while a batch of rows is being processed, but never partially processed batches. When all data has been processed, you should see 200,000 rows:
To monitor ingestion progress and debug errors with the Kafka consumer, you can query the system.kafka_consumers system table. If your deployment has multiple replicas (e.g., ClickHouse Cloud), you must use the clusterAllReplicas table function.
Common operations
Troubleshooting
- ClickHouse Cloud
- Self-hosted ClickHouse
System tables
To troubleshoot errors with the Kafka consumer, you can query the system.kafka_consumers system table.
Log files
Logging for the Kafka table engine is reported in the ClickHouse server logs, which are not exposed to users in ClickHouse Cloud. If you can't track down the issue using the methods above, you should contact the ClickHouse support team for server-level log analysis.
System tables
To troubleshoot errors with the Kafka consumer, you can query the system.kafka_consumers system table.
If your deployment has multiple replicas, you must use the clusterAllReplicas table function.
Log files
Logging for the Kafka table engine is reported in the ClickHouse server logs. For troubleshooting, use the Errors such as authentication issues are not reported in responses to Kafka engine DDL. For diagnosing issues, we recommend using the main ClickHouse log file clickhouse-server.err.log. You can enable further trace logging for the underlying Kafka client library (librdkafka) through configuration.
Stopping & restarting message consumption
To stop message consumption, you can detach the Kafka engine table:
This will not impact the offsets of the consumer group. To restart consumption, and continue from the previous offset, reattach the table.
Using Kafka metadata
In addition to the message value, the Kafka table engine also exposes Kafka metadata fields like the message key, headers, and others as virtual columns. These virtual columns are prefixed with _ and can be added as columns in your target table on creation.
If you want to add metadata columns to an existing target table, you must first detach the Kafka table to stop data ingestion.
To add new columns to the target table for persisting Kafka metadata, use the ALTER TABLE...ADD COLUMN statement. For example:
Next, you must adjust the materialized view to start consuming these metadata columns. With the Kafka table detached, it's safe to drop the materialized view, re-attach the table, and re-create the materialized view to resume ingestion.
When you select from the target table, you should see the additional columns populated.
| actor_login | event_type | created_at | topic | partition |
|---|---|---|---|---|
| IgorMinar | CommitCommentEvent | 2011-02-12 02:22:00 | github | 0 |
| queeup | CommitCommentEvent | 2011-02-12 02:23:23 | github | 0 |
| IgorMinar | CommitCommentEvent | 2011-02-12 02:23:24 | github | 0 |
| IgorMinar | CommitCommentEvent | 2011-02-12 02:24:50 | github | 0 |
| IgorMinar | CommitCommentEvent | 2011-02-12 02:25:20 | github | 0 |
| dapi | CommitCommentEvent | 2011-02-12 06:18:36 | github | 0 |
| sourcerebels | CommitCommentEvent | 2011-02-12 06:34:10 | github | 0 |
| jamierumbelow | CommitCommentEvent | 2011-02-12 12:21:40 | github | 0 |
| jpn | CommitCommentEvent | 2011-02-12 12:24:31 | github | 0 |
| Oxonium | CommitCommentEvent | 2011-02-12 12:31:28 | github | 0 |
See the reference documentation for a complete list of supported Kafka metadata fields.
Modifying table settings
To modify Kafka table settings, we recommend dropping and recreating the table with the new configuration. The materialized view does not need to be modified - message consumption will automatically resume from the last committed offset when the table is recreated.
While you can use ALTER TABLE...MODIFY SETTING for simple settings like kafka_max_block_size, dropping and recreating is more reliable (and often required) for significant configuration changes such as broker lists, consumer groups, topics, or authentication settings.
Handling malformed messages
Kafka is often used as a "dumping ground" for data. This leads to topics containing mixed message formats and inconsistent field names. Avoid this and utilize Kafka features such Kafka Streams or ksqlDB to ensure messages are well-formed and consistent before insertion into Kafka. If these options are not possible, ClickHouse has some features that can help.
- Treat the message field as strings. Functions can be used in the materialized view statement to perform cleansing and casting if required. This should not represent a production solution but might assist in one-off ingestion.
- If you're consuming JSON from a topic, using the JSONEachRow format, use the setting
input_format_skip_unknown_fields. When writing data, by default, ClickHouse throws an exception if input data contains columns that do not exist in the target table. However, if this option is enabled, these excess columns will be ignored. Again this is not a production-level solution and might confuse others. - Consider the setting
kafka_skip_broken_messages. This requires the user to specify the level of tolerance per block for malformed messages - considered in the context of kafka_max_block_size. If this tolerance is exceeded (measured in absolute messages) the usual exception behaviour will revert, and other messages will be skipped.
Delivery Semantics and challenges with duplicates
The Kafka table engine has at-least-once semantics. Duplicates are possible in several known rare circumstances. For example, messages could be read from Kafka and successfully inserted into ClickHouse. Before the new offset can be committed, the connection to Kafka is lost. A retry of the block in this situation is required. The block may be de-duplicated using a distributed table or ReplicatedMergeTree as the target table. While this reduces the chance of duplicate rows, it relies on identical blocks. Events such as a Kafka rebalancing may invalidate this assumption, causing duplicates in rare circumstances.
Quorum based Inserts
You may need quorum-based inserts for cases where higher delivery guarantees are required in ClickHouse. This can't be set on the materialized view or the target table. It can, however, be set for user profiles. For example:
ClickHouse to Kafka
If you're on ClickHouse Cloud, it's important to note that private network connections are not supported. This means that your broker(s) must be configured for public access.
You can use the Kafka table engine to write data from ClickHouse to Kafka topics. The engine is designed to push messages to a topic when triggered by direct inserts or updates in attached materialized views. For this reason, you should be broadly familiar with materialized views when using the Kafka table engine to write data to Apache Kafka and other Kafka API-compatible brokers.
The engine ensures reliable delivery through at-least-once semantics: when data is inserted into a Kafka table, the operation only succeeds after the data is successfully sent to the Kafka topic. If there is an error sending data to Kafka (e.g., network connectivity issues, Kafka broker unavailability), the engine will automatically handle retries. This means that it is possible to get duplicates in failure scenarios if, for example, data reaches Kafka but the acknowledgment is lost due to network connectivity issues, causing the operation to be retried.
Quickstart
To get started writing data from ClickHouse into Kafka, follow the steps below. If you already have an existing topic you'd like to produce data to, skip to Step 2.
1. Create a target topic
Create a new topic in your target broker. For example, if you're running Kafka locally, you can use the built-in Kafka CLI tools:
If you're using a hosted service like Confluent Cloud, you can use the Cloud Console or a client like the Confluent CLI:
2. Produce data to the target topic
To produce data.
Using materialized views
We can utilize materialized views to push messages to a Kafka engine (and a topic) when documents are inserted into a table. When rows are inserted into the GitHub table, a materialized view is triggered, which causes the rows to be inserted back into a Kafka engine and into a new topic. Again this is best illustrated:
Create a new Kafka topic github_out or equivalent. Ensure a Kafka table engine github_out_queue points to this topic.
Now create a new materialized view github_out_mv to point at the GitHub table, inserting rows to the above engine when it triggers. Additions to the GitHub table will, as a result, be pushed to our new Kafka topic.
Using direct inserts
First, confirm the count of the target table.
You should have 200,000 rows:
Now insert rows from the GitHub target table back into the Kafka table engine github_queue. Note how we utilize JSONEachRow format and LIMIT the select to 100.
Recount the row in GitHub to confirm it has increased by 100. As shown in the above diagram, rows have been inserted into Kafka via the Kafka table engine before being re-read by the same engine and inserted into the GitHub target table by our materialized view!
You should see 100 additional rows:
Should you insert into the original github topic, created as part of Kafka to ClickHouse, documents will magically appear in the "github_clickhouse" topic. Confirm this with native Kafka tooling. For example, below, we insert 100 rows onto the github topic using kcat for a Confluent Cloud hosted topic:
A read on the github_out topic should confirm delivery of the messages.
Although an elaborate example, this illustrates the power of materialized views when used in conjunction with the Kafka engine.
Common operations
Clusters and Performance
Working with ClickHouse Clusters
Through Kafka consumer groups, multiple ClickHouse instances can potentially read from the same topic. Each consumer will be assigned to a topic partition in a 1:1 mapping. When scaling ClickHouse consumption using the Kafka table engine, consider that the total number of consumers within a cluster cannot exceed the number of partitions on the topic. Therefore ensure partitioning is appropriately configured for the topic in advance.
Multiple ClickHouse instances can all be configured to read from a topic using the same consumer group id - specified during the Kafka table engine creation. Therefore, each instance will read from one or more partitions, inserting segments to their local target table. The target tables can, in turn, be configured to use a ReplicatedMergeTree to handle duplication of the data. This approach allows Kafka reads to be scaled with the ClickHouse cluster, provided there are sufficient Kafka partitions.
Tuning Performance
Consider the following when looking to increase Kafka Engine table throughput performance:
- The performance will vary depending on the message size, format, and target table types. 100k rows/sec on a single table engine should be considered obtainable. By default, messages are read in blocks, controlled by the parameter kafka_max_block_size. By default, this is set to the max_insert_block_size, defaulting to 1,048,576. Unless messages are extremely large, this should nearly always be increased. Values between 500k to 1M are not uncommon. Test and evaluate the effect on throughput performance.
- The number of consumers for a table engine can be increased using kafka_num_consumers. However, by default, inserts will be linearized in a single thread unless kafka_thread_per_consumer is changed from the default value of 1. Set this to 1 to ensure flushes are performed in parallel. Note that creating a Kafka engine table with N consumers (and kafka_thread_per_consumer=1) is logically equivalent to creating N Kafka engines, each with a materialized view and kafka_thread_per_consumer=0.
- Increasing consumers is not a free operation. Each consumer maintains its own buffers and threads, increasing the overhead on the server. Be conscious of the overhead of consumers and scale linearly across your cluster first and if possible.
- If the throughput of Kafka messages is variable and delays are acceptable, consider increasing the stream_flush_interval_ms to ensure larger blocks are flushed.
- background_message_broker_schedule_pool_size sets the number of threads performing background tasks. These threads are used for Kafka streaming. This setting is applied at the ClickHouse server start and can't be changed in a user session, defaulting to 16. If you see timeouts in the logs, it may be appropriate to increase this.
- For communication with Kafka, the librdkafka library is used, which itself creates threads. Large numbers of Kafka tables, or consumers, can thus result in large numbers of context switches. Either distribute this load across the cluster, only replicating the target tables if possible, or consider using a table engine to read from multiple topics - a list of values is supported. Multiple materialized views can be read from a single table, each filtering to the data from a specific topic.
Any settings changes should be tested. We recommend monitoring Kafka consumer lags to ensure you are properly scaled.
Additional Settings
Aside from the settings discussed above, the following may be of interest:
- Kafka_max_wait_ms - The wait time in milliseconds for reading messages from Kafka before retry. Set at a user profile level and defaults to 5000.
All settings from the underlying librdkafka can also be placed in the ClickHouse configuration files inside a kafka element - setting names should be XML elements with periods replaced with underscores e.g.
These are expert settings and we'd suggest you refer to the Kafka documentation for an in-depth explanation.