This project contains common transformations for every day use cases with Kafka Connect.
The following command can be used to install the plugin directly from the Confluent Hub using the Confluent Hub Client.
confluent-hub install jcustenborder/kafka-connect-transform-common:latest
The zip file that is deployed to the Confluent Hub is available under
target/components/packages/
. You can manually extract this zip file which includes all dependencies. All the dependencies
that are required to deploy the plugin are under target/kafka-connect-target
as well. Make sure that you include all the dependencies that are required
to run the plugin.
plugin.path
on your Connect worker.Key
com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value
charset
The charset to use when creating the output string.
Importance: HIGH
Type: STRING
Default Value: UTF-8
fields
The fields to transform.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value
from
The format to move from
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
to
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase
This transformation is used to change the case of a topic.
This transformation will convert a topic name like 'TOPIC_NAME' to topicName
, or topic_name
.
from
The format of the incoming topic name. LOWER_CAMEL
= Java variable naming convention, e.g., "lowerCamel". LOWER_HYPHEN
= Hyphenated variable naming convention, e.g., "lower-hyphen". LOWER_UNDERSCORE
= C++ variable naming convention, e.g., "lower_underscore". UPPER_CAMEL
= Java and C++ class naming convention, e.g., "UpperCamel". UPPER_UNDERSCORE
= Java and C++ constant naming convention, e.g., "UPPER_UNDERSCORE".
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
to
The format of the outgoing topic name. LOWER_CAMEL
= Java variable naming convention, e.g., "lowerCamel". LOWER_HYPHEN
= Hyphenated variable naming convention, e.g., "lower-hyphen". LOWER_UNDERSCORE
= C++ variable naming convention, e.g., "lower_underscore". UPPER_CAMEL
= Java and C++ class naming convention, e.g., "UpperCamel". UPPER_UNDERSCORE
= Java and C++ constant naming convention, e.g., "UPPER_UNDERSCORE".
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
Key
com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Value
input.inner.field.name
The field on the child struct containing the field to be extracted. For example if you wanted the extract address.state
you would use state
.
Importance: HIGH
Type: STRING
input.outer.field.name
The field on the parent struct containing the child struct. For example if you wanted the extract address.state
you would use address
.
Importance: HIGH
Type: STRING
output.field.name
The field to place the extracted value into.
Importance: HIGH
Type: STRING
Key
com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value
This transformation is used to use a field from the input data to override the timestamp for the record.
field.name
The field to pull the timestamp from. This must be an int64 or a timestamp.
Importance: HIGH
Type: STRING
Key
com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value
header.mappings
The mapping of the header to the field in the message.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Value
This transformation is used to convert older schema versions to the latest schema version. This works by keying all of the schemas that are coming into the transformation by their schema name and comparing the version() of the schema. The latest version of a schema will be used. Schemas are discovered as the flow through the transformation. The latest version of a schema is what is used.
Key
com.github.jcustenborder.kafka.connect.transform.common.PatternFilter$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.PatternFilter$Value
pattern
The regex to test the message with.
Importance: HIGH
Type: STRING
Validator: com.github.jcustenborder.kafka.connect.utils.config.validators.PatternValidator@4170ee0f
fields
The fields to transform.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value
field.pattern
Importance: HIGH
Type: STRING
field.replacement
Importance: HIGH
Type: STRING
field.pattern.flags
Importance: LOW
Type: LIST
Default Value: [CASE_INSENSITIVE]
Validator: [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES]
Key
com.github.jcustenborder.kafka.connect.transform.common.SchemaNameToTopic$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SchemaNameToTopic$Value
This transformation is used to take the name from the schema for the key or value and replace the topic with this value.
Key
com.github.jcustenborder.kafka.connect.transform.common.SetMaximumPrecision$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SetMaximumPrecision$Value
This transformation is used to ensure that all decimal fields in a struct are below the maximum precision specified.
The Confluent AvroConverter uses a default precision of 64 which can be too large for some database systems.
precision.max
The maximum precision allowed.
Importance: HIGH
Type: INT
Validator: [1,...,64]
Key
com.github.jcustenborder.kafka.connect.transform.common.SetNull$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SetNull$Value
com.github.jcustenborder.kafka.connect.transform.common.TimestampNow
This transformation is used to override the timestamp of the incoming record to the time the record is being processed.
Key
com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value
This transformation is used to set a field with the current timestamp of the system running the transformation.
fields
The field(s) that will be inserted with the timestamp of the system.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value
output.schema.type
The connect schema type to output the converted JSON as.
Importance: MEDIUM
Type: STRING
Default Value: STRING
Validator: [STRING, BYTES]
schemas.enable
Flag to determine if the JSON data should include the schema.
Importance: MEDIUM
Type: BOOLEAN
Key
com.github.jcustenborder.kafka.connect.transform.common.ToLong$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ToLong$Value
fields
The fields to transform.
Importance: HIGH
Type: LIST
Key
com.github.jcustenborder.kafka.connect.transform.common.TopicNameToField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.TopicNameToField$Value
field
The field to insert the topic name.
Importance: HIGH
Type: STRING
mvn clean package
Contributions are always welcomed! Before you start any development please create an issue and
start a discussion. Create a pull request against your newly created issue and we're happy to see
if we can merge your pull request. First and foremost any time you're adding code to the code base
you need to include test coverage. Make sure that you run mvn clean package
before submitting your
pull to ensure that all of the tests, checkstyle rules, and the package can be successfully built.