---
title: Integrate Apache Kafka with Tiger Cloud | Tiger Data Docs
description: Stream, process, and analyze real-time event data from Apache Kafka topics
---

[Apache Kafka](https://kafka.apache.org/documentation/) is a distributed event streaming platform used for high-performance data pipelines, streaming analytics, and data integration. [Apache Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html) is a tool to scalably and reliably stream data between Apache Kafka® and other data systems. Kafka Connect is an ecosystem of pre-written and maintained Kafka Producers (source connectors) and Kafka Consumers (sink connectors) for data products and platforms like databases and message brokers.

This guide explains how to set up Kafka and Kafka Connect to stream data from a Kafka topic into your Tiger Cloud service.

## Prerequisites

To follow the procedure on this page you need to:

- Create a [target Tiger Cloud service](/docs/get-started/quickstart/create-service/index.md).

  This procedure also works for [self-hosted TimescaleDB](/docs/get-started/choose-your-path/install-timescaledb/index.md).

* [Java8 or higher](https://www.oracle.com/java/technologies/downloads/) to run Apache Kafka

## Install and configure Apache Kafka

To install and configure Apache Kafka:

1. **Extract the Kafka binaries to a local folder**

   Terminal window

   ```
   curl https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz | tar -xzf -
   cd kafka_2.13-3.9.0
   ```

   From now on, the folder where you extracted the Kafka binaries is called `<KAFKA_HOME>`.

2. **Configure and run Apache Kafka**

   Terminal window

   ```
   KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
   ./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
   ./bin/kafka-server-start.sh config/kraft/reconfig-server.properties
   ```

   Use the `-daemon` flag to run this process in the background.

3. **Create Kafka topics**

   In another Terminal window, navigate to \<KAFKA\_HOME>, then call `kafka-topics.sh` and create the following topics:

   - `accounts`: publishes JSON messages that are consumed by the timescale-sink connector and inserted into your Tiger Cloud service.
   - `deadletter`: stores messages that cause errors and that Kafka Connect workers cannot process.

   Terminal window

   ```
   ./bin/kafka-topics.sh \
        --create \
        --topic accounts \
        --bootstrap-server localhost:9092 \
        --partitions 10


   ./bin/kafka-topics.sh \
        --create \
        --topic deadletter \
        --bootstrap-server localhost:9092 \
        --partitions 10
   ```

4. **Test that your topics are working correctly**

   1. Run `kafka-console-producer` to send messages to the `accounts` topic:

      Terminal window

      ```
      bin/kafka-console-producer.sh --topic accounts --bootstrap-server localhost:9092
      ```

   2. Send some events. For example, type the following:

      Terminal window

      ```
      >Tiger Cloud
      >How Cool
      ```

   3. In another Terminal window, navigate to \<KAFKA\_HOME>, then run `kafka-console-consumer` to consume the events you just sent:

      Terminal window

      ```
      bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092
      ```

      You see

      Terminal window

      ```
      Tiger Cloud
      How Cool
      ```

   ```
   ```

Keep these terminals open, you use them to test the integration later.

## Install the sink connector to communicate with Tiger Cloud

To set up Kafka Connect server, plugins, drivers, and connectors:

1. **Install the PostgreSQL connector**

   In another terminal window, navigate to `<KAFKA_HOME>`, then download and configure the PostgreSQL sink and driver.

   Terminal window

   ```
   mkdir -p "plugins/camel-postgresql-sink-kafka-connector"
   curl https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-postgresql-sink-kafka-connector/3.21.0/camel-postgresql-sink-kafka-connector-3.21.0-package.tar.gz \
   | tar -xzf - -C "plugins/camel-postgresql-sink-kafka-connector" --strip-components=1
   curl  -H "Accept: application/zip" https://jdbc.postgresql.org/download/postgresql-42.7.5.jar -o  "plugins/camel-postgresql-sink-kafka-connector/postgresql-42.7.5.jar"
   echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-distributed.properties"
   echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-standalone.properties"
   ```

2. **Start Kafka Connect**

   Terminal window

   ```
   export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/*
   ./bin/connect-standalone.sh config/connect-standalone.properties
   ```

   Use the `-daemon` flag to run this process in the background.

3. **Verify Kafka Connect is running**

   In yet another another terminal window, run the following command:

   Terminal window

   ```
   curl http://localhost:8083
   ```

   You see something like:

   Terminal window

   ```
   {"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"J-iy4IGXTbmiALHwPZEZ-A"}
   ```

## Create a table in your service to ingest Kafka events

To prepare your Tiger Cloud service for Kafka integration:

1. **[Connect](/docs/build/data-management/run-queries-from-tiger-console/index.md) to your Tiger Cloud service**

2. **Create a hypertable to ingest Kafka events**

   ```
   CREATE TABLE accounts (
    created_at TIMESTAMPTZ DEFAULT NOW(),
    name TEXT,
    city TEXT
   ) WITH (
     tsdb.hypertable
   );
   ```

   When you create a hypertable using [CREATE TABLE … WITH …](/docs/reference/timescaledb/hypertables/create_table/index.md), the default partitioning column is automatically the first column with a timestamp data type. Also, TimescaleDB creates a [columnstore policy](/docs/reference/timescaledb/hypercore/add_columnstore_policy/index.md) that automatically converts your data to the columnstore, after an interval equal to the value of the [chunk\_interval](/docs/reference/timescaledb/hypertables/set_chunk_time_interval/index.md), defined through `after` in the policy. This columnar format enables fast scanning and aggregation, optimizing performance for analytical workloads while also saving significant storage space. In the columnstore conversion, hypertable chunks are compressed by up to 98%, and organized for efficient, large-scale queries.

   You can customize this policy later using [alter\_job](/docs/reference/timescaledb/jobs-automation/alter_job/index.md). However, to change `after` or `created_before`, the compression settings, or the hypertable the policy is acting on, you must [remove the columnstore policy](/docs/reference/timescaledb/hypercore/remove_columnstore_policy/index.md) and [add a new one](/docs/reference/timescaledb/hypercore/add_columnstore_policy/index.md).

   You can also manually [convert chunks](/docs/reference/timescaledb/hypercore/convert_to_columnstore/index.md) in a hypertable to the columnstore.

## Create the Tiger Cloud sink

To create a Tiger Cloud sink in Apache Kafka:

1. **Create the connection configuration**

   1. In the terminal running Kafka Connect, stop the process by pressing `Ctrl+C`.

   2. Write the following configuration to `<KAFKA_HOME>/config/timescale-standalone-sink.properties`, then update the `<properties>` with your [connection details](/docs/integrate/find-connection-details/index.md).

      ```
      name=timescale-standalone-sink
      connector.class=org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnector
      errors.tolerance=all
      errors.deadletterqueue.topic.name=deadletter
      tasks.max=10
      value.converter=org.apache.kafka.connect.storage.StringConverter
      key.converter=org.apache.kafka.connect.storage.StringConverter
      topics=accounts
      camel.kamelet.postgresql-sink.databaseName=<dbname>
      camel.kamelet.postgresql-sink.username=<user>
      camel.kamelet.postgresql-sink.password=<password>
      camel.kamelet.postgresql-sink.serverName=<host>
      camel.kamelet.postgresql-sink.serverPort=<port>
      camel.kamelet.postgresql-sink.query=INSERT INTO accounts (name,city) VALUES (:#name,:#city)
      ```

   3. Restart Kafka Connect with the new configuration:

      Terminal window

      ```
      export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/*
      ./bin/connect-standalone.sh config/connect-standalone.properties config/timescale-standalone-sink.properties
      ```

2. **Test the connection**

   To see your sink, query the `/connectors` route in a GET request:

   Terminal window

   ```
   curl -X GET http://localhost:8083/connectors
   ```

   You see:

   Terminal window

   ```
   #["timescale-standalone-sink"]
   ```

## Test the integration with Tiger Cloud

To test this integration, send some messages onto the `accounts` topic. You can do this using the kafkacat or kcat utility.

1. **In the terminal running `kafka-console-producer.sh` enter the following json strings**

   Terminal window

   ```
   {"name":"Lola","city":"Copacabana"}
   {"name":"Holly","city":"Miami"}
   {"name":"Jolene","city":"Tennessee"}
   {"name":"Barbara Ann ","city":"California"}
   ```

   Look in your terminal running `kafka-console-consumer` to see the messages being processed.

2. **Query your Tiger Cloud service for all rows in the `accounts` table**

   ```
   SELECT * FROM accounts;
   ```

   You see something like:

   | created\_at                   | name        | city       |
   | ----------------------------- | ----------- | ---------- |
   | 2025-02-18 13:55:05.147261+00 | Lola        | Copacabana |
   | 2025-02-18 13:55:05.216673+00 | Holly       | Miami      |
   | 2025-02-18 13:55:05.283549+00 | Jolene      | Tennessee  |
   | 2025-02-18 13:55:05.35226+00  | Barbara Ann | California |

You have successfully integrated Apache Kafka with Tiger Cloud.
