kafka-connect-datagen
is a Kafka Connect connector for generating mock data for testing and is not suitable for production scenarios. It is available in Confluent Hub.
There are multiple released versions of this connector, starting with 0.1.0
.
The instructions below use version 0.4.0
as an example, but you can substitute any of the other released versions.
In fact, unless specified otherwise, we recommend using the latest released version to get all of the features and bug fixes.
You can choose to install a released version of the kafka-connect-datagen
from Confluent Hub or build it from source. For running the connector you can choose a local Confluent Platform Installation or in a Docker container.
Using the Confluent Hub Client you may install the kafka-connect-datagen
connector from Confluent Hub.
To install a specific release version you can run:
confluent-hub install confluentinc/kafka-connect-datagen:0.4.0
or to install the latest released version:
confluent-hub install confluentinc/kafka-connect-datagen:latest
Alternatively, you may build and install the kafka-connect-datagen
connector from latest code.
Here we use v0.4.0
to reference the git tag for the 0.4.0
version, but the same pattern works for all released versions.
git checkout v0.4.0
mvn clean package
confluent-hub install target/components/packages/confluentinc-kafka-connect-datagen-0.4.0.zip
Here is an example of how to run the kafka-connect-datagen
on a local Confluent Platform after it's been installed. Configuration details are provided below.
confluent local start connect
confluent local config datagen-pageviews -- -d config/connector_pageviews.config
confluent local status connectors
confluent local consume test1 --value-format avro --max-messages 5 --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --from-beginning
A Docker image based on Kafka Connect with the kafka-connect-datagen
plugin is already available in Dockerhub, and it is ready for you to use.
If you want to build a local copy of the Docker image with kafka-connect-datagen
, this project provides a Dockerfile that you can reference.
You can create a Docker image packaged with the locally built source by running (for example with the 5.5.0 version of Confluent Platform):
make build-docker-from-local CP_VERSION=5.5.0
This will build the connector from source and create a local image with an aggregate version number. The aggregate version number is the kafka-connect-datagen connector version number and the Confluent Platform version number separated with a -
. The local kafka-connect-datagen version number is defined in the pom.xml
file, and the Confluent Platform version defined in the Makefile. An example of the aggregate version number might be: 0.4.0-6.1.0
.
Alternatively, you can install the kafka-connect-datagen
connector from Confluent Hub into a Docker image by running:
make build-docker-from-released CP_VERSION=5.5.0
The Makefile contains some default variables that affect the version numbers of both the installed kafka-connect-datagen
as well as the base Confluent Platform version. The variables are located near the top of the Makefile with the following names and current default values:
CP_VERSION ?= 6.1.0
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.4.0
These values can be overriden with variable declarations before the make
command. For example:
KAFKA_CONNECT_DATAGEN_VERSION=0.3.2 make build-docker-from-released
Here is an example of how to run the kafka-connect-datagen
with the provided docker-compose.yml file. If you wish to use a different Docker image tag, be sure to modify appropriately in the docker-compose.yml file.
docker-compose up -d --build
curl -X POST -H "Content-Type: application/json" --data @config/connector_pageviews.config http://localhost:8083/connectors
docker-compose exec connect kafka-console-consumer --topic pageviews --bootstrap-server kafka:29092 --property print.key=true --max-messages 5 --from-beginning
See all Kafka Connect configuration parameters.
Parameter | Description | Default |
---|---|---|
kafka.topic |
Topic to write to | |
max.interval |
Max interval between messages (ms) | 500 |
iterations |
Number of messages to send from each task, or less than 1 for unlimited | -1 |
schema.string |
The literal JSON-encoded Avro schema to use. Cannot be set with schema.filename or quickstart |
|
schema.filename |
Filename of schema to use. Cannot be set with schema.string or quickstart |
|
schema.keyfield |
Name of field to use as the message key | |
quickstart |
Name of quickstart to use. Cannot be set with schema.string or schema.filename |
See the config folder for sample configurations.
Kafka Connect supports Converters which can be used to convert record key and value formats when reading from and writing to Kafka. As of the 5.5 release, Confluent Platform packages Avro, JSON, and Protobuf converters (earlier versions package just Avro converters).
For an example of using the the Protobuf converter with kafka-connect-datagen, see this example configuration. Take note of the required use of the SetSchemaMetadata
Transformation which addresses a compatibility issue between schema names used by kafka-connect-datagen and Protobuf. See the Schema names are not compatible with Protobuf issue for details.
There are a few quickstart schema specifications bundled with kafka-connect-datagen
, and they are listed in this directory.
To use one of these bundled schema, refer to this mapping and in the configuration file, set the parameter quickstart
to the associated name.
For example:
...
"quickstart": "users",
...
You can also define your own schema specifications if you want to customize the fields and their values to be more domain specific or to match what your application is expecting.
Under the hood, kafka-connect-datagen
uses Avro Random Generator, so the only constraint in writing your own schema specification is that it is compatible with Avro Random Generator.
To define your own schema:
/path/to/your_schema.avsc
that is compatible with Avro Random Generatorquickstart
and add the parameters schema.filename
(which should be the absolute path) and schema.keyfield
:...
"schema.filename": "/path/to/your_schema.avsc",
"schema.keyfield": "<field representing the key>",
...
The custom schema can be used at runtime; it is not necessary to recompile the connector.
You can control the keys that the connector publishes with its records via the schema.keyfield
property. If it's set, the connector will look for a field with that name in the top-level Avro records that it generates, and use the value and schema of that field for the key of the message that it publishes to Kafka.
Keys can be any type (string
, int
, record
, etc.) and can also be nullable. If no schema.keyfield
is provided, the key will be null
with an optional string schema.
To define the set of "rules" for the mock data, kafka-connect-datagen
uses Avro Random Generator.
The configuration parameters quickstart
or schema.filename
specify the Avro schema, or the set of "rules", which declares a list of primitives or more complex data types, length of data, and other properties about the generated mock data.
Examples of these schema files are listed in this directory.
Do not confuse the above terminology with Avro
and schemas
used in a different context as described below.
The Avro schemas for generating mock data are independent of (1) the format of the data produced to Kafka and (2) the schema in Confluent Schema Registry.
value.converter
and value.converter.schema.registry.url
parameters:...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...
Or to produce messages to Kafka where the message value format is JSON, set the value.converter
parameter:
...
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
...
userid
:...
{"name": "userid", "type": {
"type": "string",
"arg.properties": {
"regex": "User_[1-9]{0,1}"
}
}},
...
If you are using Avro format for producing data to Kafka, here is the corresponding field in the registered schema in Confluent Schema Registry:
{"name": "userid", "type": ["null", "string"], "default": null},
If you are not using Avro format for producing data to Kafka, there will be no schema in Confluent Schema Registry.
The Datagen Connector will capture details about the record's generation in the headers of the records it produces. The following fields are populated:
Header Key | Header Value |
---|---|
task.generation |
Task generation number (starts at 0, incremented each time the task restarts) |
task.id |
Task id number (0 up to tasks.max - 1) |
current.iteration |
Record iteration number (starts at 0, incremented each time a record is generated) |
Note: The following instructions are only relevant if you are an administrator of this repository and have push access to the https://hub.docker.com/r/cnfldemos/kafka-connect-datagen/ repository. The local Docker daemon must be logged into a proper Docker Hub account.
To release new versions of the Docker images to Dockerhub (https://hub.docker.com/r/cnfldemos/kafka-connect-datagen/ & https://hub.docker.com/r/cnfldemos/cp-server-connect-operator-with-datagen) use the respective targets in the Makefile.
The Makefile contains some default variables that affect the version numbers of both the installed kafka-connect-datagen
as well as the base Confluent Platform version. The variables are located near the top of the Makefile with the following names and current default values:
CP_VERSION ?= 6.1.0
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.4.0
OPERATOR_VERSION ?= 0 # Operator is a 'rev' version appended at the end of the CP version, like so: 5.5.0.0
To publish the https://hub.docker.com/r/cnfldemos/kafka-connect-datagen/ image:
make push-from-released
and to override the CP Version of the kafka-connect-datagen
version you can run something similar to:
CP_VERSION=5.5.0 KAFKA_CONNECT_DATAGEN_VERSION=0.1.4 make publish-cp-kafka-connect-confluenthub
to override the CP Version and the Operator version, which may happen if Operator releases a patch version, you could run something similar to:
CP_VERSION=5.5.0 OPERATOR_VERSION=1 KAFKA_CONNECT_DATAGEN_VERSION=0.1.4 make push-cp-server-connect-operator-from-released
which would result in a docker image tagged as: cp-server-connect-operator-datagen:0.1.4-5.5.0.1
and pushed to DockerHub