opentracing-contrib / java-kafka-client

OpenTracing Instrumentation for Apache Kafka Client
Apache License 2.0
125 stars 64 forks source link

Infinite retry by using DeadLetterPublishingRecoverer with TracingProducerFactory #89

Open fahtom94 opened 3 years ago

fahtom94 commented 3 years ago

Hi, I found exception which causes to infinite retry by using DeadLetterPublishingRecoverer with TracingProducerFactory together. If TracingProducerFactory injects to DeadLetterPublishingRecoverer then by handling error in consumer will be java.lang.UnsupportedOperationException thrown and it causes to infinite publishing message to the dlq. It happens because in TracingProducerFactory wasn't implemented org.springframework.kafka.core.ProducerFactory#getConfigurationProperties method

Here is example of kafkaListenerContainerFactory's configuration

@Bean
    fun kafkaListenerContainerFactory(template: KafkaTemplate<String, String>): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties
        factory.setConcurrency(concurrency.toInt())
        factory.setErrorHandler(
            SeekToCurrentErrorHandler(
                DeadLetterPublishingRecoverer(
                    template,
                    { record: ConsumerRecord<*, *>?, ex: Exception? ->
                        TopicPartition(
                            "test_topic",
                            -1
                        )
                    }), exponentialBackoff()))
        return factory
    }

exception's stacktrace

{"log":"{\"ts\":\"2021-09-25T09:21:37.836Z\",\"level\":\"ERROR\",\"logger_name\":\"org.springframework.kafka.listener.SeekToCurrentErrorHandler\",\"message\":\"Failed to determine if this record (test_record-2@0) should be recovererd, including in seeks\",\"root_stack_trace_element\":{\"class_name\":\"org.springframework.kafka.core.ProducerFactory\",\"method_name\":\"getConfigurationProperties\"},\"stack_trace\":\"java.lang.UnsupportedOperationException: This implementation doesn't support this method\\n\
at org.springframework.kafka.core.ProducerFactory.getConfigurationProperties(ProducerFactory.java:119)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.determineSendTimeout(DeadLetterPublishingRecoverer.java:507)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:488)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:480)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:382)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:351)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:323)\\n\
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:227)\\n\
at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:182)\\n\
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:105)\\n\
at java.base/java.util.ArrayList.forEach(Unknown Source)\\n\
at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:102)\\n\
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:205)\\n\
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2360)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2229)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2143)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2025)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1707)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)\\n\
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)\\n\
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)\\n\
0x0000-dot-ru commented 2 years ago

Looks like duplicate #87