rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

SendUtils onDiscard handler class cast exception #1077

Closed Svenskunganka closed 1 year ago

Svenskunganka commented 1 year ago

I'm using RSocket Java with Spring Webflux and RSocket JS in the front-end, using REQUEST_STREAM with an infinite Kafka source. The returned Flux receives quite a lot of small messages and does quite a bit of filtering and buffering. The relevant code in SendUtils does an unchecked cast of each discarded object to a ReferenceCounted which seems to throw a class cast exception that is caught and ignored.
The problem is that due to the sheer amount of objects being discarded (due to filtering and discarded buffers on disconnect), this uses up a substantial amount of CPU resources looking at the profiler.

profiler

The returned Fluxes are scheduled on Schedulers.boundedElastic(), and for some reason entire threads are started up just for this exception: flamegraph

Expected Behavior

Not use up so much CPU resources

Actual Behavior

Uses a lot of CPU resources

Steps to Reproduce

Possible Solution

Not exactly sure, but perhaps an instanceof check:

if (data instanceof ReferenceCounted) ((ReferenceCounted) data).release();

Your Environment

OlegDokuka commented 1 year ago

@Svenskunganka! Sounds relevant!

Feel free to contribute if you see the direct solution, otherwise, I will investigate it!

Best, Oleh

Svenskunganka commented 1 year ago

Hi Oleh!

If my possible solution is acceptable I can definitely contribute it! But if you perhaps know of another way this could be solved without needing to perform an instanceof check for every discarded object that might be a better route to explore.
From my limited experience the only thing I can think of is using e.g Mono.using()/Flux.using() when creating a Payload.

input.flatMap(bytes -> Mono.using(
    () -> ByteBufPayload.create(bytes),
    data -> ...,
    Payload::release
));

Kind regards, Tom

luczus commented 1 year ago

Hi Oleh, @Svenskunganka!

I've created PR #1079 with a fix. Please let me know what you think about these changes.

Best Lukasz

tukez commented 1 year ago

I noticed the same problem and in my case at least the offending instance is of type org.springframework.core.io.buffer.NettyDataBuffer which holds a ByteBuf. Should the inner ByteBuf be released? That is not possible (other than reflection) in rsocket-java though?

kmccarp commented 1 year ago

@OlegDokuka are there any plans to integrate the changes made by @luczus? This is heavily polluting our logs.