spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 302 forks source link

publishing to incorrect partition in DLQ #1181

Closed karthik86 closed 2 years ago

karthik86 commented 2 years ago

I am facing issues with DLQ configuration, In my project DLQ topic has 5 partitions where as consumer topic has 20 partitions. When there is an error in consuming record we are pushing such message to DLQ topic but here it is failing because it is trying to push message to 17th partition (original message was consumed at 17th partition) of DLQ topic which does not exist

I am using ListenerContainerWithDlqAndRetryCustomizer to customize error handling. I tried fixing partition issue with DlqPartitionFunction but that didn't work. please find following code snippet.

We are using custom error handler to shutdown container if we there is an infra error from third party services.

@Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver, BackOff backOff) {
                logger.info("consumer-handler.."+count++);
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(new KafkaTemplate(getProducerFactory(destinationName)),
                dlqDestinationResolver);
                CustomErrorHandler handler = new CustomErrorHandler(dlpr, backOff, bindingsEndpoint, functionsTopicMap);
                Map<Class<? extends Throwable>, Boolean> retrybleExceptionMap = new HashMap<>();
                retrybleExceptionMap.put(Exception.class, true);
                handler.setClassifications(retrybleExceptionMap, false);
                container.setCommonErrorHandler(handler);
            }
@Bean
    public DlqPartitionFunction partitionFunction(){
        Random randomGenerator = new Random();
        return (group, record, throwable) -> {
            logger.info("partition-dlq"+record.partition());
            if(record.partition() > 5){
                Integer partitionNUmber = randomGenerator.nextInt(5)+1;
                logger.info("partition-id"+partitionNUmber);
                return partitionNUmber;
            }else{
                return record.partition();
            }
        };
    }
sobychacko commented 2 years ago

@karthik86 Apologies for the delay in responding. DlqPartitionFunction is a binder level interface and the DeadLetterPublishingRecoverer (which is from Spring for Apache Kafka) does not use this. I see that in your configure method, you are passing a BiFunction for the destination resolver. The BiFunction takes the ConsumerRecord and the exception as inputs and then produces a TopicPartition as the destination. This is where you need to configure your target partition where the consumer record in error will land. See if that works in your scenario.

sobychacko commented 2 years ago

Closing this issue due to no activity. Please feel free to re-open if you are still facing this issue.