Apache Kafka is a distributed event streaming platform used for high-performance data pipelines, streaming analytics, and data integration. Apache Kafka Connect is a tool to scalably and reliably stream data between Apache Kafka® and other data systems. Kafka Connect is an ecosystem of pre-written and maintained Kafka Producers (source connectors) and Kafka Consumers (sink connectors) for data products and platforms like databases and message brokers.

This guide explains how to set up Kafka and Kafka Connect to stream data from a Kafka topic into your Tiger Cloud service.

To follow the steps on this page:

Create a target Tiger Cloud service with the Real-time analytics capability. You need your connection details. This procedure also works for self-hosted TimescaleDB.

Java8 or higher to run Apache Kafka

To install and configure Apache Kafka:

Extract the Kafka binaries to a local folder curl https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz | tar -xzf - cd kafka_2.13-3.9.0 Copy From now on, the folder where you extracted the Kafka binaries is called <KAFKA_HOME> . Configure and run Apache Kafka KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" ./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties ./bin/kafka-server-start.sh config/kraft/reconfig-server.properties Copy Use the -daemon flag to run this process in the background. Create Kafka topics In another Terminal window, navigate to <KAFKA_HOME>, then call kafka-topics.sh and create the following topics: accounts : publishes JSON messages that are consumed by the timescale-sink connector and inserted into your Tiger Cloud service.

: publishes JSON messages that are consumed by the timescale-sink connector and inserted into your Tiger Cloud service. deadletter : stores messages that cause errors and that Kafka Connect workers cannot process. ./bin/kafka-topics.sh \ --create \ --topic accounts \ --bootstrap-server localhost:9092 \ --partitions 10 ./bin/kafka-topics.sh \ --create \ --topic deadletter \ --bootstrap-server localhost:9092 \ --partitions 10 Copy Test that your topics are working correctly Run kafka-console-producer to send messages to the accounts topic: bin/kafka-console-producer.sh --topic accounts --bootstrap-server localhost:9092 Copy Send some events. For example, type the following: >Tiger >How Cool Copy In another Terminal window, navigate to <KAFKA_HOME>, then run kafka-console-consumer to consume the events you just sent: bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092 Copy Tiger How Cool Copy

Keep these terminals open, you use them to test the integration later.

To set up Kafka Connect server, plugins, drivers, and connectors:

Install the Postgres connector In another Terminal window, navigate to <KAFKA_HOME>, then download and configure the Postgres sink and driver. mkdir -p "plugins/camel-postgresql-sink-kafka-connector" curl https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-postgresql-sink-kafka-connector/3.21.0/camel-postgresql-sink-kafka-connector-3.21.0-package.tar.gz \ | tar -xzf - -C "plugins/camel-postgresql-sink-kafka-connector" --strip-components=1 curl -H "Accept: application/zip" https://jdbc.postgresql.org/download/postgresql-42.7.5.jar -o "plugins/camel-postgresql-sink-kafka-connector/postgresql-42.7.5.jar" echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-distributed.properties" echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-standalone.properties" Copy Start Kafka Connect export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/* ./bin/connect-standalone.sh config/connect-standalone.properties Copy Use the -daemon flag to run this process in the background. Verify Kafka Connect is running In yet another another Terminal window, run the following command: curl http://localhost:8083 Copy You see something like: {"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"J-iy4IGXTbmiALHwPZEZ-A"} Copy

To prepare your Tiger Cloud service for Kafka integration:

Connect to your Tiger Cloud service Create a hypertable to ingest Kafka events CREATE TABLE accounts ( created_at TIMESTAMPTZ DEFAULT NOW ( ) , name TEXT , city TEXT ) WITH ( tsdb . hypertable ) ; Copy When you create a hypertable using CREATE TABLE ... WITH ..., the default partitioning column is automatically the first column with a timestamp data type. Also, TimescaleDB creates a columnstore policy that automatically converts your data to the columnstore, after an interval equal to the value of the chunk_interval, defined through compress_after in the policy. This columnar format enables fast scanning and aggregation, optimizing performance for analytical workloads while also saving significant storage space. In the columnstore conversion, hypertable chunks are compressed by up to 98%, and organized for efficient, large-scale queries. You can customize this policy later using alter_job. However, to change after or created_before , the compression settings, or the hypertable the policy is acting on, you must remove the columnstore policy and add a new one. You can also manually convert chunks in a hypertable to the columnstore.

To create a Tiger Cloud sink in Apache Kafka:

Create the connection configuration In the terminal running Kafka Connect, stop the process by pressing Ctrl+C . Write the following configuration to <KAFKA_HOME>/config/timescale-standalone-sink.properties , then update the <properties> with your connection details. name=timescale-standalone-sink connector.class=org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnector errors.tolerance=all errors.deadletterqueue.topic.name=deadletter tasks.max=10 value.converter=org.apache.kafka.connect.storage.StringConverter key.converter=org.apache.kafka.connect.storage.StringConverter topics=accounts camel.kamelet.postgresql-sink.databaseName=<dbname> camel.kamelet.postgresql-sink.username=<user> camel.kamelet.postgresql-sink.password=<password> camel.kamelet.postgresql-sink.serverName=<host> camel.kamelet.postgresql-sink.serverPort=<port> camel.kamelet.postgresql-sink.query=INSERT INTO accounts (name,city) VALUES (:#name,:#city) Copy Restart Kafka Connect with the new configuration: export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/* ./bin/connect-standalone.sh config/connect-standalone.properties config/timescale-standalone-sink.properties Copy Test the connection To see your sink, query the /connectors route in a GET request: curl -X GET http://localhost:8083/connectors Copy You see: #["timescale-standalone-sink"] Copy

To test this integration, send some messages onto the accounts topic. You can do this using the kafkacat or kcat utility.

In the terminal running kafka-console-producer.sh enter the following json strings {"name":"Lola","city":"Copacabana"} {"name":"Holly","city":"Miami"} {"name":"Jolene","city":"Tennessee"} {"name":"Barbara Ann ","city":"California"} Copy Look in your terminal running kafka-console-consumer to see the messages being processed. Query your Tiger Cloud service for all rows in the accounts table SELECT * FROM accounts ; Copy You see something like: created_at name city 2025-02-18 13:55:05.147261+00 Lola Copacabana 2025-02-18 13:55:05.216673+00 Holly Miami 2025-02-18 13:55:05.283549+00 Jolene Tennessee 2025-02-18 13:55:05.35226+00 Barbara Ann California

You have successfully integrated Apache Kafka with Tiger Cloud.