camunda-community-hub / kafka-connect-zeebe

Kafka Connect for Zeebe.io
Apache License 2.0
96 stars 52 forks source link

Zeebe Sink Connector changes to "Degraded" if a empty message is published. #51

Closed Sargastico closed 3 years ago

Sargastico commented 3 years ago

Hi, while testing zeebe connector for kafka, I found something that was causing me some trouble, and maybe is some kind of bug or undesired behavior.

If an empty message is published in the kafka topic, the sink connector will consumes it, and raise the exception:

Running on Kubernetes from GCP (GKE):

org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalArgumentException: json string can not be null or empty
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:65)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: json string can not be null or empty
    at com.jayway.jsonpath.internal.Utils.notEmpty(Utils.java:386)
    at com.jayway.jsonpath.internal.ParseContextImpl.parse(ParseContextImpl.java:36)
    at com.jayway.jsonpath.JsonPath.parse(JsonPath.java:599)
    at io.zeebe.kafka.connect.sink.message.JsonRecordParser.parseDocument(JsonRecordParser.java:79)
    at io.zeebe.kafka.connect.sink.message.JsonRecordParser.parse(JsonRecordParser.java:53)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.preparePublishRequest(ZeebeSinkTask.java:94)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$publishMessages$1(ZeebeSinkTask.java:84)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
    at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.publishMessages(ZeebeSinkTask.java:87)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$put$0(ZeebeSinkTask.java:57)
    at io.zeebe.kafka.connect.util.ManagedClient.withClient(ManagedClient.java:53)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:57)

Followed by:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalArgumentException: json string can not be null or empty
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:65)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
Caused by: java.lang.IllegalArgumentException: json string can not be null or empty
    at com.jayway.jsonpath.internal.Utils.notEmpty(Utils.java:386)
    at com.jayway.jsonpath.internal.ParseContextImpl.parse(ParseContextImpl.java:36)
    at com.jayway.jsonpath.JsonPath.parse(JsonPath.java:599)
    at io.zeebe.kafka.connect.sink.message.JsonRecordParser.parseDocument(JsonRecordParser.java:79)
    at io.zeebe.kafka.connect.sink.message.JsonRecordParser.parse(JsonRecordParser.java:53)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.preparePublishRequest(ZeebeSinkTask.java:94)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$publishMessages$1(ZeebeSinkTask.java:84)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
    at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.publishMessages(ZeebeSinkTask.java:87)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.lambda$put$0(ZeebeSinkTask.java:57)
    at io.zeebe.kafka.connect.util.ManagedClient.withClient(ManagedClient.java:53)
    at io.zeebe.kafka.connect.sink.ZeebeSinkTask.put(ZeebeSinkTask.java:57)

The logs are from 'Kafka Connect'. The connector do not recover, it stays as "Degraded" (status from confluent control center) and a fresh deploy is needed

berndruecker commented 3 years ago

Thanks for reporting @Sargastico and sorry for the delay, my Github notifications didn't come through in my email inbox. Is this still relevant or urgent for you? I will try to look at this over the next week.

Sargastico commented 3 years ago

@berndruecker It's relevant. I appreciate if you could figure out the problem. That was a while ago and I don't know if something has changed in the meantime.

berndruecker commented 3 years ago

Hi @Sargastico - sorry for the long delay. I am looking at it now!

You send in an empty message, which means the connector can't derive important information it needs to know to route the message to Zeebe (like message name or correlation key).

What would be the expected behavior you would assume in this case? I think the connector can't handle empty messages, so throwing an exception might be just right.

Note, that you can influence failure situations by configuring what Kafka should do, see https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/. There you could define to ignore such messages or send it so some dead letter queue. Is this what you were searching for?

berndruecker commented 3 years ago

The above article is also linked in the readme: https://github.com/camunda-community-hub/kafka-connect-zeebe#configuring-error-handling-of-kafka-connect-eg-logging-or-dead-letter-queues. I would close this issue - feel free to reopen if you have good input on how the connector itself should be improved!

Also always interessted if you can share some info about your use case - maybe in the forum: https://forum.camunda.io/?

Best Bernd