spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 302 forks source link

ListenerContainerWithDlqAndRetryCustomizer template points to localhost:9092 #1208

Closed sidsamant closed 2 years ago

sidsamant commented 2 years ago

Hi,

I am using following dependencies

spring-cloud-stream: 3.21
spring-cloud-stream-binder-kafka: 3.21
spring-kafka: 2.8.2

To forward deserialization errors to DLQ, I added the following bean:

    @Bean
    ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
        return new ListenerContainerWithDlqAndRetryCustomizer() {
            @Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                    String group,
                    BiFunction<ConsumerRecord<?, ?>, Exception, org.apache.kafka.common.TopicPartition> dlqDestinationResolver,
                    BackOff backOff) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template, dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff==null? new FixedBackOff(0L, 1L):backOff));
            }
                        ......
        };
    }

I have the following configuration in application.yml

spring.cloud:
    schemaRegistryClient:
      endpoint: .....
    stream:
      default:
        consumer:
          maxAttempts: 1
      function:
        definition: myDataConsumer
      bindings:        
        myDataConsumer-in-0:
          destination: input-topic
          ....
        myDataConsumer-output:
          destination: output-topic
          ...
      kafka:
        default:
          consumer:
            useNativeDecoding: true
            enableDlq: true
            dlqName: dlq-topic
        bindings:
          myDataConsumer-in-0:
            consumer:
                ...
        binder:
          brokers: "broker1Url,broker2Url"
          producer-properties:
               ...

When the application runs and if there are Deserialization errors, it tries to produce to non-existent localhost:9092 instead of the brokers in the config. What am I missing?

Thanks in advance

garyrussell commented 2 years ago

You are wiring in Boot's auto-configured KafkaTemplate. Boot knows nothing about binders or binder configuration; it uses the normal Boot spring.kafka.... properties when configuring that template.

https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.bootstrap-servers

sidsamant commented 2 years ago

Thanks @garyrussell . It worked on configuring the spring.kafka properties as you mentioned.

I assumed it would pick the server settings from the binder configuration based on https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/docs/src/main/asciidoc/overview.adoc#retry-and-dead-letter-processing

garyrussell commented 2 years ago

Reopening; we should improve that documentation.

sobychacko commented 2 years ago

Issue moved to spring-cloud/spring-cloud-stream #2313 via ZenHub

sidsamant commented 2 years ago

Little writeup to make it easy for others https://medium.com/@sidsamanta/spring-cloud-stream-kafka-errors-to-dead-letter-queue-dlq-bb4078d9c6a0