ReactiveX / RxNetty

Reactive Extension (Rx) Adaptor for Netty
Apache License 2.0
1.38k stars 255 forks source link

Possible memory Leak #583

Closed jonhkr closed 7 years ago

jonhkr commented 7 years ago

I'm testing RxNetty and got the following log message:

Apr 23, 2017 9:44:25 PM io.netty.util.ResourceLeakDetector reportUntracedLeak
SEVERE: LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetection.level=advanced' or call ResourceLeakDetector.setLevel() See http://netty.io/wiki/reference-counted-objects.html for more information.

The implementation is as follows:

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import lombok.Data;
import rx.Observable;

import java.nio.charset.Charset;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class App {
    public static void main(String[] args) {

        Events events = new Events();

        TcpServer<ByteBuf, ByteBuf> server = TcpServer.newServer(8080)
                .enableWireLogging("echo-server", LogLevel.DEBUG)
                .start(c -> {
                    c.getInput()
                            .map(bb -> bb.toString(Charset.defaultCharset()))
                            .subscribe(m -> events.event(c.unsafeNettyChannel().remoteAddress().toString(), m));

                    return c.writeStringAndFlushOnEach(events.events
                            .filter(e -> e.getFrom() == null || !e.getFrom().equals(c.unsafeNettyChannel().remoteAddress().toString()))
                            .map(Events.Event::getEvent));
                });

        server.awaitShutdown();
    }

    static class Events {
        BlockingQueue<Event> queue = new ArrayBlockingQueue<>(200);
        Observable<Event> events;

        public Events() {
            events = Observable.<Event>create( e -> new Thread(() -> {
                while(true) {
                    try {
                        e.onNext(queue.take());
                    } catch (InterruptedException e1) {
                        e.onError(e1);
                    }
                }
            }).start()).publish().autoConnect();
        }

        public void event(String event) {
            queue.offer(new Event(null, event));
        }

        public void event(String from, String event) {
            queue.offer(new Event(from, event));
        }

        @Data
        class Event {
            private final String from;
            private final String event;
        }
    }
}

It is a simple chat application, every message received is broadcasted to everyone connected to the server.

I'm using tcpkali (https://github.com/machinezone/tcpkali) to make requests to the server and I get that error when I run the following command:

$ tcpkali -em "hi" -r 10 127.0.0.1:8080 -c 10

You can see messages coming by running:

telnet 127.0.0.1 8080

Am I doing something wrong or is that a bug in RxNetty?

jonhkr commented 7 years ago

These are the dependencies:

compile 'org.projectlombok:lombok:1.16.16'
compile 'io.reactivex:rxnetty-tcp:0.5.2'
jamesgorman2 commented 7 years ago

ByteBufs are stored off-heap by default and need to be manually released. You're leak is at:

c.getInput()
   .map(bb -> bb.toString(Charset.defaultCharset()))
   .subscribe(m -> events.event(c.unsafeNettyChannel().remoteAddress().toString(), m));

You can use #autoRelease() to free up the memory automatically

c.getInput().autoRelease()
  .map(bb -> bb.toString(Charset.defaultCharset()))
  .subscribe(m -> events.event(c.unsafeNettyChannel().remoteAddress().toString(), m));

You may also want to look at RxJavaString to re-frame incoming events in case a message is broken across two TCP frames (in particular #split(...) and #byLine()). This would look like:

c.getInput().autoRelease()
  .map(bb -> bb.toString(Charset.defaultCharset()))
  .compose(StringObservable::byLine)
  .subscribe(m -> events.event(c.unsafeNettyChannel().remoteAddress().toString(), m));
jonhkr commented 7 years ago

Thank you @jamesgorman2 for the quick reply. The #autoRelease() method did the trick. 😃

jamesgorman2 commented 7 years ago

np @jonhkr been there, done that when I was learning this :-)