reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.61k stars 646 forks source link

Spring Stomp Websocket over RabbitMQ LEAK: ByteBuf.release() was not called before it's garbage-collected #1863

Closed jan-schumacher closed 3 years ago

jan-schumacher commented 3 years ago

I recently got this Exception while using ReactorNettyTcpClient with RabbitMQ as Stomp Broker Relay.

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:174)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:108)
    org.springframework.messaging.tcp.reactor.ReactorNettyTcpConnection.send(ReactorNettyTcpConnection.java:60)
    org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$RelayConnectionHandler.forward(StompBrokerRelayMessageHandler.java:896)
    org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:579)
    org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:281)
    org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:144)
    com.stuff.tracing.domain.ExecutionCtxTaskDecorator.lambda$createNewRunnable$0(ExecutionCtxTaskDecorator.java:38)
    java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    java.base/java.lang.Thread.run(Thread.java:829)

Since i have no clue of whats going on behind the scene in spring, I hope you guys can help a bit.

This is my configuration:

    override fun configureMessageBroker(config: MessageBrokerRegistry) {

        publicLog.info { "load stomp broker relay configuration $stompBrokerProperties" }
        val sslContextBuilder = stompBrokerProperties.ssl?.let(this::generateSslContext)

        val client = ReactorNettyTcpClient({ tcpClient ->
            tcpClient
                .host(stompBrokerProperties.host)
                .port(stompBrokerProperties.port)
        }, StompReactorNettyCodec())

        config.setApplicationDestinationPrefixes("/app")
            .enableStompBrokerRelay("/topic")
            .setTaskScheduler(stompHeartbeatTaskScheduler)
            .setTcpClient(client)
            .setRelayHost(stompBrokerProperties.host)
            .setRelayPort(stompBrokerProperties.port)
            .setSystemLogin(stompBrokerProperties.username)
            .setSystemPasscode(stompBrokerProperties.password)
            .setClientLogin(stompBrokerProperties.username)
            .setClientPasscode(stompBrokerProperties.password)
    }

Your Environment

violetagg commented 3 years ago

@jan-schumacher We need from you to reproduce with

violetagg commented 3 years ago

@jan-schumacher Were you able to test with the extended logging from my previous comment?

violetagg commented 3 years ago

@jan-schumacher I'm closing this. If you are able to provide the requested information, we can reopen it.