spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

dlqProducerProperties not work #1077

Closed jokefaker closed 3 years ago

jokefaker commented 3 years ago

Spring boot version: 2.3.10 Spring cloud starter stream kafka : 3.0.11

I want to set the dlq's retention.ms , here is the config file:

spring:
  cloud:
    stream:
      default:
        producer:
          partitionCount: 4
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          input:
            consumer:
              enableDlq: true
              dlqName: contract-service-error.dlq
              dlqPartitions: 1
              dlqProducerProperties:
                topic:
                  properties:
                    retention:
                      ms: 86400000
      bindings:
        input:
          destination: input-topic
          group: contract-service
  application:
    name: creams-contract-service
server:
  port: 8082

contract-service-error.dlq is created without the retention setting.

I find the code in org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java

    private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name,
            String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties,
            boolean anonymous, int partitions) {

        if (properties.getExtension().isEnableDlq() && !anonymous) {
            String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName())
                    ? properties.getExtension().getDlqName()
                    : "error." + name + "." + group;
            int dlqPartitions = properties.getExtension().getDlqPartitions() == null
                    ? partitions
                    : properties.getExtension().getDlqPartitions();
            try {
                createTopicAndPartitions(adminClient, dlqTopic, dlqPartitions,
                        properties.getExtension().isAutoRebalanceEnabled(),
                        properties.getExtension().getTopic());
            }
            catch (Throwable throwable) {
                if (throwable instanceof Error) {
                    throw (Error) throwable;
                }
                else {
                    throw new ProvisioningException("Provisioning exception encountered for " + name, throwable);
                }
            }
            return new KafkaConsumerDestination(name, partitions, dlqTopic);
        }
        return null;
    }

createTopicAndPartitions will use the consumer topic properties, I think it should use the dlqProducerProperties.

sobychacko commented 3 years ago

@jokefaker Thanks for spotting this. Fixed it now.