spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.16k stars 1.54k forks source link

BUG: DefaultKafkaProducerFactory creates producers which are never closed #1727

Closed NilValue closed 3 years ago

NilValue commented 3 years ago

I am using spring-kafka version: 2.6.3 (spring-boot-starter-parent:2.4.0) but the problem should be present in the current version as well.

Preconditions:

Problem: The DefaultKafkaProducerFactory will create a new KafkaProducer.

protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
    KafkaProducer<K, V> kafkaProducer =
            new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
    this.postProcessors.forEach(pp -> pp.apply(kafkaProducer));
    return kafkaProducer;
}

This KafkaProducer automatically starts a new thread which runs in an infinite while loop until the producer is closed. Right after, the KafkaProducer.initTransactions() method is called:

private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
        BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
    Producer<K, V> newProducer;
    Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
    String txId = prefix + suffix;
    newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId);
    if (this.clientIdPrefix != null) {
        newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
                this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
    }
    checkBootstrap(newProducerConfigs);
    newProducer = createRawProducer(newProducerConfigs);
    newProducer.initTransactions();
    CloseSafeProducer<K, V> closeSafeProducer =
            new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName);
    this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
    return closeSafeProducer;
}

If the Kafka broker is not available, the initTransactions() method will throw a TimeoutException. As a result, this Producer is never closed and the aforementioned thread runs infinitely. The application does not recover from this even if the Kafka broker comes back, because the DefaultKafkaProducerFactory is not aware of them. This results in "dead" threads running in the background.

Solution: Make a try-catch for any RuntimeException around newProducer.initTransactions(). In the catch block, close the producer and rethrow the exception.

private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
                                                   BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
    Producer<K, V> newProducer;
    Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
    String txId = prefix + suffix;
    newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId);
    if (this.clientIdPrefix != null) {
        newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
                               this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
    }
    checkBootstrap(newProducerConfigs);
    newProducer = createRawProducer(newProducerConfigs);
    try {
        newProducer.initTransactions();
    } catch (RuntimeException e) {
        newProducer.close();
        throw e;
    }
    CloseSafeProducer<K, V> closeSafeProducer =
            new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName);
    this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
    return closeSafeProducer;
}
NilValue commented 3 years ago

Thank you for the quick response and fix. This is amazing. Can I expect this fix to be included in 2.7.0-RC1? If so, when does it release? I want to remove my own workaround from our code base asap :)

garyrussell commented 3 years ago

@NilValue You should check the milestones for scheduled release dates, or the Spring Calendar.

2.7.0-RC1 and 2.6.7 are currently scheduled for March 17.

NilValue commented 3 years ago

@garyrussell Okay thank you very much :)