⚠ This project is discontinued in favor of exasol/kafka-connector-extension. This repository is archived.
Exasol database dialect example setup for Kafka Confluent JDBC Connector.
Please bear in mind that this only works with Kafka Connect JDBC version 5.0+
If you already have an running Confluent Kafka Connect cluster, you need setup Exasol source or sink configuration (or both). You can find example configurations for exasol-source and exasol-sink. Please upload these to Kafka Connect connectors, for example,
curl -X POST \
-H "Content-Type: application/json" \
--data @exasol-source.json kafka.connect.host:8083/connectors
Additionally, you need to upload the Exasol JDBC
jars to the connect plugin path. The plugin paths
are possibly /usr/share/java
or /etc/kafka-connect/jars
. However, please
check that these paths are on Kafka classpath.
You can find more information on Confluent documentation pages. Some relevant documentations are listed below.
For testing we are going to use docker and docker-compose. Please set them up accordingly on your local machine. For running Exasol docker-db you need root privileges.
Additionally, if you are using non Linux machine, please obtain the ip address for docker or docker-machine. For example, in MacOS, with the following command:
docker-machine ip
For the rest of documentation, when we refer to localhost
, substitute it with
ip address resulted from above command.
We need to open several terminals for dockerized testing.
git clone https://github.com/EXASOL/kafka-connect-jdbc-exasol.git
cd kafka-connect-jdbc-exasol
# If you're running docker in a virtual environment you might
# need to run the following command before docker-compose up:
# export COMPOSE_TLS_VERSION=TLSv1_2
docker-compose up
country
table inside country_schema
and inserts couple of records into it.
This step should happen before Kafka connector configurations setup because
Kafka immediately starts to look for the Exasol tables.docker exec -it exasol-db exaplus -c localhost:8563 -u sys -P exasol -f /test/country.sql
# Create and add a new Kafka Connect Source
curl -X POST \
-H "Content-Type: application/json" \
--data @exasol-source.json localhost:8083/connectors
# You can see all available connectors with the command:
curl localhost:8083/connectors/
# Similarly, you can see the status of a connector with the command:
curl localhost:8083/connectors/exasol-source/status
docker exec -it kafka02 /bin/bash
# List available Kafka topics, we should see the 'EXASOL_COUNTRY' listed.
kafka-topics --list --zookeeper zookeeper.internal:2181
# Start kafka console consumer
kafka-console-consumer \
--bootstrap-server kafka01.internal:9092 \
--from-beginning \
--topic EXASOL_COUNTRY
You should see two records inserted from other terminal. Similarly, if you
insert new records into country
table in Exasol, they should be listed on
kafka consumer console.
In order to see the console results in a structured way, you can consume them
using Avro console consumer from schema-registry
container:
docker exec -it schema-registry /bin/bash
kafka-avro-console-consumer \
--bootstrap-server kafka01.internal:9092 \
--from-beginning \
--topic EXASOL_COUNTRY
country_population
table inside country_schema
and will be the destination for the kafka topic records.
This step should happen before Kafka connector configurations setup otherwise it will not find the sink table in Exasoldocker exec -it exasol-db exaplus -c localhost:8563 -u sys -P exasol -f /test/country_population.sql
curl -X POST \
-H "Content-Type: application/json" \
--data @exasol-sink.json \
localhost:8083/connectors
## Check that sink connector is running
curl localhost:8083/connectors/exasol-sink/status
country_population
:docker exec -it schema-registry /bin/bash
kafka-avro-console-producer \
--broker-list kafka01.internal:9092 \
--topic COUNTRY_POPULATION \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"COUNTRY_NAME","type":"string"},{"name":"POPULATION", "type": "long"}]}'
{"COUNTRY_NAME": "France", "POPULATION": 67} {"COUNTRY_NAME": "Croatia", "POPULATION": 4}```bash
* In another terminal, ensure that the records are available in Exasol table:
```bash
docker exec -it exasol-db bash -c 'exaplus -c localhost:8563 -u sys -P exasol -sql "SELECT * FROM country_schema.country_population;"'
For this example setup we depend on several jar files:
mvn clean package
that
will create jar file in target/
. Then copy it into
kafka-connect-image/jars/
.Additionally, we are using docker-compose based Exasol and Kafka Connect services. The Kafka Connect is configured for distributed mode.
Service Name | Versions | Description |
---|---|---|
exasol-db |
dockerhub/exasol/docker-db | An Exasol docker db. Please note that we use stand-alone cluster mode. |
zookeeper |
dockerhub/confluentinc/cp-zookeeper | A single node zookeeper instance. |
kafka |
dockerhub/confluentinc/cp-kafka | A kafka instance. We have three kafka node setup. |
schema-registry |
dockerhub/confluentinc/cp-schema-registry | A schema-registry instance. |
kafka-connect |
dockerhub/confluentinc/cp-kafka-connect | Custom configured kafka-connect instance. |
There are several tips and tricks to consider when setting up the Kafka Exasol connector.
The timestamp and incrementing column names should be in upper case, for
example, "timestamp.column.name": "UPDATED_AT"
. This is due to fact that
Exasol makes all fields upper case and Kafka connector is case sensitive.
The tasks.max
should be more than the number of tables in production
systems. That is in jdbc connectors each table is sourced per partition then
handled by single task.
The incrementing
or timestamp
column names in Kafka Connect configuration,
should have a NOT NULL
constraint when creating a table definition.
The batch mode together with upsert is not supported at the moment. We transform
the Kafka upserts into Exasol specific MERGE
statements that does not support
batches.
You can read more about it at issue #5.