birdiecare / connect-smts

Kafka Connect SMTs
MIT License
17 stars 1 forks source link

JsonDeserializer couldn't work in the kafka sink connector #44

Open RaghadAlkhudhair opened 8 months ago

RaghadAlkhudhair commented 8 months ago

I am using the json deserializer on a kafka connector using debezuim and the kafka is deployed on a docker image.

the plugin was installed successfully but when I try to use the smt using the following code added to the sink connector :

transforms: "json"
transforms.json.type: "com.birdie.kafka.connect.smt.DebeziumJsonDeserializer"
transforms.json.optional-struct-fields: true

I get the following error saying it couldn't cast kafka sink to kafka sourcse:


│       State:  FAILED                                                                                                                                                                                                                          │
│       Trace:  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler                                                                                                                                           │
│               at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)                                                                                                   │
│               at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)                                                                                                              │
│               at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54)                                                                                                                                      │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:552)                                                                                                                           │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:505)                                                                                                                                     │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:341)                                                                                                                                                │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)                                                                                                                                           │
│               at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)                                                                                                                                             │
│               at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)                                                                                                                                                       │
│               at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)                                                                                                                                                         │
│               at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)                                                                                                                                │
│               at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)                                                                                                                                            │
│               at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                                                           │
│               at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)                                                                                                                                    │
│               at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)                                                                                                                                    │
│               at java.base/java.lang.Thread.run(Thread.java:833)                                                                                                                                                                              │
│ Caused by: java.lang.ClassCastException: class org.apache.kafka.connect.sink.SinkRecord cannot be cast to class org.apache.kafka.connect.source.SourceRecord (org.apache.kafka.connect.sink.SinkRecord and org.apache.kafka.connect.source.So │
│ urceRecord are in unnamed module of loader 'app')                                                                                                                                                                                             │
│   at com.birdie.kafka.connect.smt.DebeziumJsonDeserializer.apply(DebeziumJsonDeserializer.java:26)                                                                                                                                            │
│   at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)                                                                                                                                                  │
│   at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)                                                                                                                                         │
│   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)                                                                                                                     │
│   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)                                                                                                               │
│   ... 15 more

could you support please??

gtsopour commented 2 weeks ago

Hello @RaghadAlkhudhair did you manage to find a solution on this or did you use any other smts? Additionally is this plugin publicly available that I can refer it on my KafkaConnect setup or I have to build/host it by myself?

Thanks in advance for your support.