jcustenborder / kafka-connect-transform-common

Common Transforms for Kafka Connect.
Apache License 2.0
151 stars 57 forks source link

MAP support for HeaderToField #71

Open prestonmcgowan opened 3 years ago

prestonmcgowan commented 3 years ago

I am making use of the HeaderToField to attempt to filter messages from a message's header. The message header I am making use of is added by replicator, the provenance header for replicator (__replicator_id).

I attempted to add the HeaderToField transform, but the connect failed with the following error. Configuration:

    "transforms" : "headerToField",
    "transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
    "transforms.headerToField.header.mappings" : "__replicator_id:MAP"

I had also tried STRING and BYTES, for grins, still complained about MAP is not a supported type.

Error:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    used by: java.lang.UnsupportedOperationException: MAP is not a supported type.
    at com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation.processMap(BaseKeyValueTransformation.java:48)
    at com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation.process(BaseKeyValueTransformation.java:130)
    at com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation.apply(BaseKeyValueTransformation.java:192)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
    ... 11 more
andriihrachov commented 2 years ago

Same for me when using mongodb sink, but even without setting MAP

    transforms=headerToField
    transforms.headerToField.type=com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value
    transforms.headerToField.header.mappings=x-event-type:BYTES:x_event_type
shberlin commented 2 years ago

Same for me. I assume, this happens when the Header is given as MAP instead of LIST. A proper MAP support would be awesome.

jx2lee commented 11 months ago

related PR opend (#99)