microsoft / spring-cloud-azure

Spring Cloud Azure is an open-source project that provides seamless Spring integration with Azure services.
https://microsoft.github.io/spring-cloud-azure
MIT License
217 stars 105 forks source link

Is ListenerContainerWithDlqAndRetryCustomizer supported when using kafka binder API for EventHubs? #1041

Closed snimmagadda1 closed 1 year ago

snimmagadda1 commented 1 year ago

Query/Question

Hi - I'm using the following starter:

<dependency>
  <groupId>com.azure.spring</groupId>
  <artifactId>azure-spring-cloud-starter-eventhubs-kafka</artifactId>
  <version>2.14.0</version>
</dependency>

I'm looking to implement a retry policy with sum time greater than the max.poll.interval of the eventhub broker. For this scenario, Spring Cloud Kafka binder implementation recommends a ListenerContainerWithDlqAndRetryCustomizer as described here.

However, after registering this configuration and retry policy, It looks like DLQ producer configs are incorrect and using default Kafka configurations- I see the binder attempting to send DLQ messages to the wrong broker (localhost:9092, connection is timing out). I assume this is because the default KafkaTemplate is being wired in... I'm new to this interface & lib specifically, is this DQL and Retry processing interface supported with eventhubs?

Relevant code & configs, with some properties excluded for brevity:

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

            @Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                            String group,
                            @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                            @Nullable BackOff backOff) {

                    if (destinationName.contains("-retry")) {
                            ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template, dlqDestinationResolver);
                            container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
                    }
            }

            @Override
            public boolean retryAndDlqInBinding(String destinationName, String group) {
                    return !destinationName.contains("-retry");
            }

    };
}
spring:
  cloud:
    stream:
         ...
        retryConsumer-in-0:
          destination: ${APP_DESTINATION_TOPIC}-retry
          group: "appRetry"
          consumer:
            max-attempts: 4
            back-off-initial-interval: 60000
            back-off-max-interval: 600000
            back-off-multiplier: 2.0
            retryable-exceptions:
              com.app.domain.RecordNotFoundException: false
              org.apache.kafka.common.errors.SerializationException: false
              java.lang.NullPointerException: false
              java.lang.IllegalStateException: false
      kafka:
          ...
          consumer-properties:
            enable.auto.commit: false
            session.timeout.ms: 30000
            request.timeout.ms: 60000
            max.poll.interval.ms: 300000
            max.poll.records: 5
        bindings:
          retryConsumer-in-0:
            consumer:
              start-offset: earliest
              enable-dlq: true
              dlq-name: ${APP_DESTINATION_TOPIC}-dlt
              auto-commit-offset: false
              ack-mode: BATCH

Why is this not a Bug or a feature Request? I was unable to find documentation in the event hub specific docs.

Content (please complete the following information if possible):

snimmagadda1 commented 1 year ago

oh interesting, looks like this just went into spring-cloud https://github.com/spring-cloud/spring-cloud-stream/pull/2344

Per this, the KafkaTemplate it pulls from configs set in spring.kafka - this has me wondering if these equivalent configs can be set for EventHubs specifically

chenrujun commented 1 year ago

@yiliuTo Please help to handle this issue.

snimmagadda1 commented 1 year ago

I was able to get it working, a bit counter-intuitive to have to configure the same properties in a second location (spring.kafka) in addition to spring.cloud:

spring:
  kafka:
    producer:
      bootstrap-servers: ${KAFKA_BROKER}
      client-id: dlqProducerClient
      properties:
        security.protocol: SASL_SSL
        sasl.mechanism: PLAIN
        sasl.jaas.config: ${KAFKA_JAAS_CONFIG}
      template:
        default-topic: ${APP_TOPIC}-dlt