bitrich-info / xchange-stream

XChange-stream is a Java library providing a simple and consistent streaming API for interacting with Bitcoin and other crypto currency exchanges via WebSocket protocol. It is build on top of of XChange library providing new interfaces for streaming API. User can subscribe for live updates via reactive streams of RxJava library.
Apache License 2.0
413 stars 220 forks source link

Exchange doesn't disconnect gracefully #35

Open jpink opened 6 years ago

jpink commented 6 years ago

When I repeat following steps as in example:

  1. Connect to GDAX exchange.
  2. Subscribe to trades.
  3. Check that there really come trades.
  4. Dispose all my subscriptions.
  5. Disconnect from exchange.
  6. Program main loop goes to its end.

But program doesn't stop! There is still threads running somewhere.

I'm using Spring Boot web app and I'm stopping web server via POST /shutdown request. All other services are then shutdown.

This is not big deal. Currently I call System.exit() 15 s after that request. But this is annoying and smells that there is thread leak somewhere.

dozd commented 6 years ago

@jpink Yeah, I can reproduce that and it's bug of exchanges based on NettyStreamingService.

dcarr45 commented 6 years ago

Did anyone ever do anything about this? It's the nioEventLoopGroup thread that's still running I think, but I don't know enough about these things. I got around it by assigning the result of new NioEventLoopGroup() in connect() to a class variable and calling eventLoopGroup.shutdownGracefully() on disconnect(), but I'm not sure if that's the most elegant solution

bryantharris commented 6 years ago

This one is pretty annoying, sure I can add a System.exit() call so that I'm not chasing down PIDs but it breaks WAR hot deploy as I need to stop tomcat and restart it to pick up code changes, I should be able to just trigger the deploy from rebuilding.

bryantharris commented 6 years ago

This issue also seems to effect Gemini.

badgerwithagun commented 6 years ago

Possibly related - I get an NPE whenever I disconnect from GDAX due to a background socket thread:

java.lang.NullPointerException: null
    at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:288)
    at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:278)
    at info.bitrich.xchangestream.gdax.GDAXStreamingService.handleMessage(GDAXStreamingService.java:84)
    at info.bitrich.xchangestream.gdax.GDAXStreamingService.handleMessage(GDAXStreamingService.java:21)
    at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:42)
    at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:79)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1388)
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1159)
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1202)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745)
badgerwithagun commented 6 years ago

I'm trying to make some progress with all this stuff on #191 (a little bit blindly as I'm not familiar with Netty) and am keeping the PR rebased against develop.

In the meantime, in the absence of PR approvals right now, I am producing snapshot builds myself which pull together develop, all the unmerged PRs on here (which I like and need), XChange 4.3.6 support, updated dependencies, plus my work in progress. You're welcome to use it:

    <repositories>
        <repository>
            <id>xchange-stream-mvn-repo</id>
            <url>https://raw.github.com/badgerwithagun/xchange-stream/mvn-repo/</url>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
            </snapshots>
        </repository>
    </repositories>

Just make sure you're running your Maven build with --update-snapshots.

badgerwithagun commented 6 years ago

Test to simulate thread leak:

  @Test
  public void testSimulateMemoryLeak() throws InterruptedException, IOException {
    try {
      for (int i = 0 ; i < 5 ; i++) {
        StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(GDAXStreamingExchange.class.getName());
        exchange.connect(ProductSubscription.create().addAll(BTC_USD).build()).blockingAwait();
        try {
          CountDownLatch latch = new CountDownLatch(3);
          Disposable sub = exchange.getStreamingMarketDataService().getOrderBook(BTC_USD).subscribe(o -> latch.countDown());
          latch.await();
          sub.dispose();
        } finally {
          exchange.disconnect().blockingAwait();
        }
        Thread.sleep(2000);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    System.out.println("Stopped. Press a key to terminate VM");
    System.in.read();
  }

image

badgerwithagun commented 6 years ago

Now, for no apparent reason, it's behaving. Heisenbug. image

tomislav011 commented 6 years ago

Try to update netty service dependency

info.bitrich.xchange-stream service-netty 4.3.3-SNAPSHOT
pchertalev commented 5 years ago

looks like i have fixed problem here: https://github.com/bitrich-info/xchange-stream/pull/225

dpisklov commented 5 years ago

Not sure if related but I just go my process dying with Too many open files, and it turns out Netty spun up a LOT of threads, at the moment 651 alive of format:

nioEventLoopGroup-4-1
...
nioEventLoopGroup-4208277-1

and clearly a lot more were actually created, about 4 millions judging by the thread names above, all with the same stacktrace:

"nioEventLoopGroup-4600-1" #4144 prio=10 os_prio=0 tid=0x00007f67f01ca000 nid=0x192c0 runnable [0x00007f668d153000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000000b26c7a38> (a io.netty.channel.nio.SelectedSelectionKeySet)
        - locked <0x00000000b26c7a28> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000000b26c7a50> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:62)
        at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:786)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:434)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

and 4150 (!) sockets opened...