reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
615 stars 229 forks source link

KafkaProducer#close() seems to leak Kafka resources. #341

Open basking2 opened 1 year ago

basking2 commented 1 year ago

I create a KafkaSender and give it a Flux that produces records. In the onFinally() of the Flux I close (.close()) the sender.

I observe that the Kafka library says that it prefers the close(Duration) call be used, but closes the underlying object anyway.

I further observe that, as I create and close many senders, my heap use grows and results in an OOM, eventually (~12 hours of operation).

Using jmap I see Kafka Node objects constantly growing. They seem to eventually collect, but the memory is then used by more generic objects like ByteArrays or similar.

When I use only 1 sender for the application, I do not observe a leak.

Expected Behavior

Calling .close() will reliably make all resources collectable by the GC and OOMs will not happen.

Actual Behavior

Over 12 hours, heap grows to 1.5GB and eventually the JVM exits with an out of heap space error.

Steps to Reproduce

In a loop, create 1 sender, send records to Kafka, close the sender in the Flux's onFinally() callback.

Repeat this and Kafka seems to leak resources.

Possible Solution

Do we need to use close(Duration) or call flush() first?

Your Environment

artembilan commented 1 year ago

The DefaultKafkaSender has this logic:

    public void close() {
        if (!hasProducer.getAndSet(false)) {
            return;
        }
        producerMono.doOnNext(producer -> producer.close(senderOptions.closeTimeout()))
                    .block();
        if (senderOptions.isTransactional()) {
            senderOptions.scheduler().dispose();
        }
        scheduler.dispose();
    }

So, we really do call close(Duration). It uses a Long.MAX_VALUE for default timeout. But I don't think it is relevant somehow to the problem.

Would be great to have a simple sample from you to reproduce.

basking2 commented 1 year ago

@artembilan , I've packaged up a program that exhibits the behavior in VisualVM here.

Thank you!

artembilan commented 1 year ago

May we see a memory report where you think it is a leak? I'm not sure that I'm going to run your program for 12 hours...

May this behavior can be reproduced just with a plain KafkaProducer? Then it is already outside of this project scope and has to be reported against Apache Kafka by itself.

basking2 commented 1 year ago

@artembilan The sample code takes about 5 minutes to fail with a 300m heap limit. Attached is a VisualVM screen shot.

reactor-kafka-341
artembilan commented 1 year ago

OK. When I moved that sender.close() out of the doFinally(), just as the last line of that cycle, I don't see any problems with the memory from VisualVM.

So, it looks like that doFinally() is performed within a producer thread, so, we just cannot call the close() while we are sending yet. See respective logs:

[2023-04-24 15:13:43,039] [kafka-producer-network-thread | producer-223] [org.apache.kafka.clients.producer.KafkaProducer] INFO [Producer clientId=producer-223] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2023-04-24 15:13:43,039] [kafka-producer-network-thread | producer-223] [org.apache.kafka.clients.producer.KafkaProducer] WARN [Producer clientId=producer-223] Overriding close timeout 9223372036854775807 ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.
[2023-04-24 15:13:43,039] [kafka-producer-network-thread | producer-223] [org.apache.kafka.clients.producer.KafkaProducer] INFO [Producer clientId=producer-223] Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.

So, according to the info from Kafka Client this doFinally() is called from the mentioned "producer call-back". And even if the rest info looks OK, according to our memory report the producer is really not closed.

Or better to say it is dead-locked. We use producerMono property in a send() operation and then we call close() for this producerMono with a block() exactly from a send() operation.

Technically it is always better to use a single Kafka producer for the application, but if you cannot, you must close it when you really think you are done with it. And, yeah, turns out this doFinally() in not the way to go based on a Flux<SenderResult<T>> send() result.

You need to revise your logic to call sender.close() from some other place.

basking2 commented 1 year ago

@artembilan , that's for the great writeup. We already revised the logic when the leak was attributed to the sender. I opened this issue to as this seems a natural and supported use case when, in fact, it's not. I've seen the logs you shared, but they didn't warn of leaking memory or resources.

I would love to warn future users to not try what I did, but I don't have a great idea of how to do that.

Again, thanks!

artembilan commented 1 year ago

Yeah... I see your point. We will discuss with @garyrussell what we can do on the matter (e.g. exception), when he comes back from PTO next week.

garyrussell commented 1 year ago

Just curious why you want to close each time; the KafkaProducer javadocs recommend a single producer per app (although when using transactions, you will likely need more if concurrency is needed).

But, yes, it would be nice if we could detect it.

basking2 commented 1 year ago

@garyrussell - Ephemeral senders are not necessary for my use case. This issue is because close() seemed to fail on its promise and caused OOM crashes in early versions of our solution.