spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.18k stars 1.56k forks source link

Infinite loop when retryable topic #3187

Closed jozbar closed 5 months ago

jozbar commented 6 months ago

In what version(s) of Spring for Apache Kafka are you seeing this issue?

For example:

3.1.3

Describe the bug

I have application which consuming events from one topic (with String deserializer), then converting them to proto model and sending it to another topic. I want to use retryable topic so that if error occurs, after few retries event should be passed to dlt topic. The problem is that I'm getting an error:

Can't convert value of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer

which then comes with infinity loop.

I noticed that the problem is when the serializer for the Producer is set to ByteArraySerializer. If I comment this, or change String serializer it works fine but I need the ByteSerializer cause it should be send in proto model.

I think that somehow DefaultKafkaConsumerFactory and DefaulKafkaProducerFactory interacts with themself.

Here are the snippets of my code:

` @Slf4j @Component public class Consumer {

@RetryableTopic(
    attempts = "4",
    backoff = @Backoff(delay = 1000, multiplier = 2.0),
    autoCreateTopics = "false",
    topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = {"first-topic"}, groupId = "kafka-retryable-group-id")
public void consume(ConsumerRecord<String, String> consumerRecord, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.OFFSET) long offset) {
    log.info("Kafka consumer received message {}", consumerRecord.value());
    throw new RuntimeException("test message");
}

@DltHandler
public void dlt(ConsumerRecord<String, String> message,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                           @Header(KafkaHeaders.OFFSET) long offset) {
    log.error("DLT Received: {} from {} offset {}", message, topic, offset);
}

} `

` @Configuration public class ConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

} `

`@Configuration public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, byte[]> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

}`

and log:

`2024-04-10T13:28:46.733+02:00 ERROR 32096 --- [kafka-retryable] [ntainer#0-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer : Dead-letter publication to call-center-collector-actions-eventreleased-output-local-retry-0 failed for: call-center-collector-actions-eventreleased-output-local-0@16

org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1003) ~[kafka-clients-3.6.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947) ~[kafka-clients-3.6.1.jar:na] at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1050) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:804) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:773) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:577) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:689) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:598) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:564) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:533) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:228) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:188) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:107) ~[spring-kafka-3.1.3.jar:3.1.3] at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) ~[na:na] at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:104) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:207) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2917) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2789) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2511) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2153) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1493) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1458) ~[spring-kafka-3.1.3.jar:3.1.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-3.1.3.jar:3.1.3] at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na] at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap') at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19) ~[kafka-clients-3.6.1.jar:na] at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-3.6.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000) ~[kafka-clients-3.6.1.jar:na] ... 27 common frames omitted `

To Reproduce

  1. Create consumerConfig with String deserializer
  2. Create producerConfig with ByteArraySerializer
  3. Create consumer with retryable topic
  4. Send event to topic

Expected behavior

Retryable mechanism should work and after few retries message should be passed to dlt.

Sample

https://github.com/jozbar/kafka-retryable-infinite-loop

sobychacko commented 6 months ago

@jozbar Thanks for the sample app to reproduce. We will look at this and get back to you.

sobychacko commented 6 months ago

@jozbar, could you add an ApplicationRunner bean to your sample application that sends some data to the topic where the consumer/retry is failing? Or create a reproducible test. Thanks!

Wzy19930507 commented 6 months ago

Hello @jozbar @sobychacko, after reproduction, it may be caused by the following reasons

  1. Producer send use ByteArraySerializer, Consumer receive use StringDeserializer.
  2. When Consumer throw exception, DeadLetterPublishingRecoverer will use Producer sends the ConsumerRecord to the retry topic that use ByteArraySerializer to Serialization record value, then cast String to byte[] fail and throw ClassCastException, see SeekUtils.doSeeks line 108: https://github.com/spring-projects/spring-kafka/blob/5446be0d0c5328e4cc2904a7c8a26198907428a8/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java#L98-L135

and then see line 121, skipped.set(false) when skipped set false, fail record not skip.


Create producerConfig with StringOrBytesSerializer can solve this problem. Detail see https://github.com/Wzy19930507/kafka_retryable_infinite_loop.

sobychacko commented 6 months ago

@Wzy19930507 Thanks for putting this together. I just ran your forked app and verified that the fix with StringOrBytesSerializer works fine.

@jozbar Could you verify if that is something you can use as a proper solution for this issue? i.e., using StringOrBytesSerializer from your producer? See this for more details.

Since it seems like nothing needs to be done from the framework side, we can close this issue once you confirm. Thanks!

sobychacko commented 5 months ago

Closing the issue as there is nothing to fix in the framework. Please see the suggestions above from @Wzy19930507 for solutions if anyone runs into this situation. Feel free to re-open the issue if need be.