spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.18k stars 1.56k forks source link

Handling connection errors with KafkaTemplate when cluster not available #2251

Open garyrussell opened 2 years ago

garyrussell commented 2 years ago

Discussed in https://github.com/spring-projects/spring-kafka/discussions/2250

Originally posted by **Walnussbaer** May 3, 2022 Hi everyone, I'm really strunggeling with some basic functionality that I would like to achieve using KafkaTemplate and I hope someone can help me out. I already created a Stackoverflow entry, but didn't get much help there yet (https://stackoverflow.com/questions/72055135/spring-apache-kafka-onfailure-callback-of-kafkatemplate-not-fired-on-connection/72059673?noredirect=1#comment127371838_72059673). The Apache Kafka Spring Docs weren't much help either, as well as a very extensive Google search. Consider the basic following scenario: I have a simple KafkaTemplate, that shall send data to a KafkaCluster. Now consider, that the KafkaCluster goes down (not just temporarily). How can i configure the KafkaTemplate so that the wrapped producer stops trying to connect to the KafkaCluster to fetch the metadata for the given topic in case the Kafka Cluster is not reachable? Why do I want to achieve that? Well, in a production environment, it can always be the case, that the Kafka Cluster goes down for some reason. I want to be able to detect that during the sending process of data. And worst problem ist, that, as of now, my producer thread will starve to death, because it goes into in infinite loop trying to connect to the broker. This is my producer config: ``` @Configuration public class KafkaProducerConfig { private String bootstrapServers = "[::1]:9091"; // wrong port to simulate unavailable connection @Bean public Map producerConfig() { // config settings for creating producers Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,10000); configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,4000); configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,6000); configProps.put(ProducerConfig.RETRIES_CONFIG,0); return configProps; } @Bean public ProducerFactory producerFactory() { // creates a kafka producer return new DefaultKafkaProducerFactory<>(producerConfig()); } @Bean("kafkaTemplate") public KafkaTemplate kafkaTemplate(){ // template which abstracts sending data to kafka KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } } ``` This is my service. My first approach sendMessageWithCallback() does not work, because the onFailure() method won't get invoked if the KafkaProducer cannot establish a connection to the Kafka cluster. Using my second service method sendMessageWithProperErrorHandling(), I can at least catch the TimeoutException which is thrown by the KafkaProducer when the metadata for the topic could not be fetched within MAX_BLOCK_MS_CONFIG, but still, I can't stop the producer from going into an infite loop after that first timeout. Below you also find a picutre of the infinite loop. The KafkaProducer will essentially try to connect to the KafkaCluster for the rest of it's life, creating thread starving to death. It also looks like that it completly ignores my RETRIES_CONFIG which is set to zero retires ... ``` @Service public class KafkaSenderService { Logger logger = LoggerFactory.getLogger(KafkaSenderService.class); @Qualifier("kafkaTemplate") private final KafkaTemplate kafkaTemplate; @Autowired public KafkaSenderService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String message, String topicName) { kafkaTemplate.send(topicName,message); } public void sendMessageWithCallback(String message, String topicName) { // possibility to add callbacks to define what shall happen in success/ error case ListenableFuture> future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback>() { @Override // DOES NOT WORK IF THE BROKER IS NOT AVAILABLE public void onFailure(Throwable ex) { logger.warn("Message could not be delivered. " + ex.getMessage()); } @Override public void onSuccess(SendResult result) { logger.info("Your message was delivered with following offset: " + result.getRecordMetadata().offset()); } }); } public void sendMessageWithProperErrorHandling(String message, String topicName){ // TODO use AdminClient to check connectivity --> sensless, what if the cluster goes down after the check was made?! try { SendResult sendResult = kafkaTemplate.send(topicName, message).get(5000, TimeUnit.MILLISECONDS); } catch (Exception te) { System.out.println("Could not connect" + te.getMessage()); te.printStackTrace(); } } } ``` Now my simple question: What is the best practice to detect connection errors during a send process and stop the sending process, when that error occurs? The infinite loop I mentioned can be seen here: ![grafik](https://user-images.githubusercontent.com/49414693/166446208-04545bcc-fe4e-4acc-ac05-4052a2090a78.png)
sjvolpe commented 1 year ago

Did you ever find a solution to this gary as I'm facing the same issue.

garyrussell commented 1 year ago

Not yet. See the linked discussion for a work around.

sjvolpe commented 1 year ago

Okay that should work - thanks for the quick response

JooHyukKim commented 7 months ago

Idk if I should file a new feature request or start from here 🤔 Can we consider a new feature? I am facing the same issue (or personal struggle) Suggested workaround (calling reset()) does work, but it would be nice to have some sort of strategy for this, especially when you need to implement the detection+reset functionally across multiple applications.

Combining keywords like broker + disconnection + detection + strategy + listener, we may be able to come up with something :)