reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.96k stars 1.2k forks source link

BlockingSingleSubscriber causes memory leak #3371

Open manoj-mathivanan opened 1 year ago

manoj-mathivanan commented 1 year ago

BlockingSingleSubscriber.java accumulates exception in suppressed exception list in the same error object which gets accumulated over time leading to memory leak.

Expected Behavior

New error object should be created and accumulation on old error object should be avoided.

Actual Behavior

Error object is cached internally in the kafka sender which is used as a parent error. Future errors while performing send operations which results in error are accumulated as suppressed exceptions inside the parent error.

Steps to Reproduce

Create a normal kafka sender. Introduce a config exception like below:

delivery.timeout.ms=1200
max.block.ms=1000
request.timeout.ms=2000
linger.ms=10

This will throw an error while creating a kafka sender - org.apache.kafka.common.config.ConfigException: delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms Next, send some messages using the kafka sender.

kafkaSender.send(Mono.just(SenderRecord.create(
                        new ProducerRecord<>("topic", "test"),1)))
                .doOnError((Throwable throwable) -> {

                })
               .blockFirst();

Possible Solution

Your Environment

reactor-core:3.4.16 200 messages/sec creates 200 exceptions in the suppressed exception list. This keeps growing resulting in memory leak.

OlegDokuka commented 1 year ago

@manoj-mathivanan do you have a pointer to an exception which is accumulating errors?

if it is out of the reactor-core then we can do nothing about it

manoj-mathivanan commented 1 year ago

@OlegDokuka This is the place where the accumulation happens: https://github.com/reactor/reactor-core/blob/main/reactor-core/src/main/java/reactor/core/publisher/BlockingSingleSubscriber.java#L99 which is in reactor core.

OlegDokuka commented 1 year ago

Right, but that should be okay unless propagated exception is a static one. That say it could be a problem of a different library

manoj-mathivanan commented 1 year ago

@OlegDokuka, you are right. The propagated exception is always the same and the new errors are getting added to the same exception inside suppressed exception list. Let me try to explain with a scenario.

I create a kafka sender in my bean with wrong configuration

SenderOptions<String, String> senderOptions = SenderOptions.<String, String>create(producerConfiguration);
KafkaSender kafkaSender = KafkaSender.create(senderOptions);

The above kafkaSender is initialized in the bean and is by default lazy loaded when the first message is sent.

Now when I try to send a message like below:

try{
   kafkaSender.send(Mono.just(SenderRecord.create(new ProducerRecord<>("topic", "test"),1))).blockFirst();
} catch(Exception ex){
   logger.error(ex.getMessage(),ex);
}

The same ex object is thrown everytime I use the above code to send message. Since the application is started and running, it needs to send around 200 messages/sec. Everytime the message is sent using the above code, the suppressed exception list keeps adding up inside the same ex object.

In 5 mins, 6K messages are trying to be sent, all the 6K messages end up in error and there are 6K suppressed exceptions in the list and every suppressed exception has completed stacktrace on its own. This memory keeps on growing.

In the above example, I have shown you an example of logging the errors. Apart from the memory, the logs also gets filled by exponentialy cause every error has the details of previous errors. If there is a new log appender, the issue get amplified even more.

OlegDokuka commented 1 year ago

related https://github.com/reactor/reactor-core/issues/1872

OlegDokuka commented 1 year ago

@manoj-mathivanan if we fix that on the reactor level now then it is going to be a breaking change. I'm not sure we can do it now safely. However, we can probably make it into 3.6.x//4.x. I

In the meanwhile, I suggest you use .onErrorMap() operator to explicitly map your error into a different representation to avoid classloader leak

manoj-mathivanan commented 1 year ago

@OlegDokuka Thanks for picking it up to future release. Thanks a lot for the suggestion too.