neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

properly handle Producer Config for DLQ in kafka-connect environment #238

Closed jexp closed 5 years ago

jexp commented 5 years ago
  1. we shouldn't try to create a producer if the necessary config is not available (short-term) and log a message with the proper configuration
  2. not stop the plugin from starting
  3. make sure to add/use and document the settings needed for properly setting up a Producer for the DLQ in a connect setting.
jexp commented 5 years ago

error with the Kafka Connect Neo4j Sink (issue #229)

Following your advice, we set up the parallelization to False in our config (without applying the new patch 1.0.3 from Dropbox) and it seemed to do the trick.

However, we tried today to test with way more data (around 15 millions messages per connectors - we have 4) and several times we had to restart manually our cluster due to the same error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: This job has not completed yet
at kotlinx.coroutines.JobSupport.getCompletionExceptionOrNull(JobSupport.kt:1057)
at streams.kafka.connect.sink.Neo4jService$writeData$2.invokeSuspend(Neo4jService.kt:137)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
at kotlinx.coroutines.selects.SelectBuilderImpl.resumeWith(Select.kt:247)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:45)
at kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235)
at kotlinx.coroutines.DispatchedContinuation.run(Dispatched.kt:81)
at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732)

We were using this config :

· Neo4J : 3.5.4-entreprise · Kafka : 5.2.0 · Kafka Neo4j Connector : neo4j-kafka-connect-neo4j-1.0.2

We just tried now with the patch 1.0.3 you kindly provided via Dropbox. I can see the jar are correctly set up in my cluster but I'm getting the following error:

[2019-09-02 14:40:47,575] ERROR WorkerSinkTask{id=kc-compte-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at streams.service.errors.KafkaErrorService$Companion.producer(KafkaErrorService.kt:25)
at streams.service.errors.KafkaErrorService$Companion.access$producer(KafkaErrorService.kt:18)
at streams.service.errors.KafkaErrorService.<init>(KafkaErrorService.kt:16)
at streams.kafka.connect.sink.Neo4jSinkTask.start(Neo4jSinkTask.kt:31)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:90)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:49)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:408)
... 14 more

Aside the jar, nothing have been changed, we are using exactly the same configurations for the connectors.

Do you have an idea where this error can come from?

conker84 commented 5 years ago

We can hack the Kafka Connect api in order to extract the retryWithTolerance field from WorkerSinkTask look at:

https://github.com/apache/kafka/blob/237e83dea0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L299

and

https://github.com/apache/kafka/blob/237e83dea081b38d2af8fa465df4f1914747ba6f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L484-L488