paoloboni / binance-scala-client

A functional Scala client for Binance
MIT License
41 stars 16 forks source link

WS Streams creates memory leak issue #133

Closed Swoorup closed 3 years ago

Swoorup commented 3 years ago

I am running a simple code like below on version 1.3.3

import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
import io.github.paoloboni.binance.BinanceClient
import io.github.paoloboni.binance.common.SpotConfig
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration.DurationInt

object PriceMonitor extends IOApp {
  val config = SpotConfig.Default[IO](
    apiKey = "***",
    apiSecret = "***"
  )
  override def run(args: List[String]): IO[ExitCode] = {
    given log: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]
    BinanceClient
      .createSpotClient[IO](config)
      .use { client =>
        for {
          btc_trades <- client.tradeStreams("btcusdt").evalMap(trade => IO.unit).compile.drain.start
          btc_ob <- client.diffDepthStream("btcusdt").evalMap(trade => IO.unit).compile.drain.start
          bch_trades <- client.tradeStreams("bchusdt").evalMap(trade => IO.unit).compile.drain.start
          bch_ob <- client.diffDepthStream("bchusdt").evalMap(trade => IO.unit).compile.drain.start
          xrp_trades <- client.tradeStreams("xrpusdt").evalMap(trade => IO.unit).compile.drain.start
          xrp_ob <- client.diffDepthStream("xrpusdt").evalMap(trade => IO.unit).compile.drain.start

          _ <- IO.readLine

          _ <- btc_trades.cancel
          _ <- btc_ob.cancel
          _ <- bch_trades.cancel
          _ <- bch_ob.cancel
          _ <- xrp_trades.cancel
          _ <- xrp_ob.cancel

          _ <- IO.println("Now do a heap dump")
          _ <- IO.readLine
        } yield ()
      }
      .redeem(
        { t =>
          log.error(t)("Something went wrong")
          ExitCode(1)
        },
        _ => ExitCode.Success
      )
  }
}

After running for a while around 3 hours I get OOM.

60.51000000,0E-8), Ask(47767.99000000,7.49637200), Ask(47768.54000000,0E-8), Ask(47774.34000000,0E-8), Ask(47792.81000000,0E-8), Ask(47919.03000000,0.00120000), Ask(49800.00000000,36.23718600), Ask(51600.74000000,0.00211700)))
java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
[error] (io-compute-7) java.lang.OutOfMemoryError: Java heap space
TRADE: TradeStream(trade,1629468372221,BCHUSDT,102933218,669.01000000,1.04021000,2281597111,2281597022,1629468372220,false,true)
[error] (io-blocking-75) java.lang.OutOfMemoryError: Java heap space
[error] (run-main-3) java.lang.OutOfMemoryError: Java heap space
[warn] In the last 10 seconds, 42.075 (455.1%) were spent in GC. [Heap: 0.09GB free of 1.00GB, max 1.00GB] Consider increasing the JVM heap using `-Xmx` or try a different collector, e.g. `-XX:+UseG1GC`, for better performance.
[error] java.lang.OutOfMemoryError: Java heap space
[error] stack trace is suppressed; run last Compile / bgRun for the full output
[error] java.lang.OutOfMemoryError: Java heap space
[error] stack trace is suppressed; run last Compile / bgRun for the full output
[error] java.lang.OutOfMemoryError: Java heap space
[error] stack trace is suppressed; run last Compile / bgRun for the full output
Aug. 21, 2021 12:07:27 AM io.netty.util.HashedWheelTimer$HashedWheelTimeout expire
WARNING: An exception was thrown by TimerTask.
java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.iterator(ConcurrentHashMap.java:4732)
        at org.asynchttpclient.netty.channel.DefaultChannelPool$IdleChannelDetector.run(DefaultChannelPool.java:356)
        at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
        at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

java.lang.InterruptedException
        at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:207)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:225)
        at scala.concurrent.Await$.$anonfun$result$1(package.scala:201)
        at cats.effect.unsafe.WorkerThread.blockOn(WorkerThread.scala:428)
        at scala.concurrent.Await$.result(package.scala:124)
        at cats.effect.std.DispatcherPlatform.unsafeRunTimed(DispatcherPlatform.scala:61)
        at cats.effect.std.DispatcherPlatform.unsafeRunTimed$(DispatcherPlatform.scala:24)
        at cats.effect.std.Dispatcher$$anon$2.unsafeRunTimed(Dispatcher.scala:190)
        at cats.effect.std.DispatcherPlatform.unsafeRunSync(DispatcherPlatform.scala:52)
        at cats.effect.std.DispatcherPlatform.unsafeRunSync$(DispatcherPlatform.scala:24)
        at cats.effect.std.Dispatcher$$anon$2.unsafeRunSync(Dispatcher.scala:190)
        at sttp.client3.impl.fs2.Fs2SimpleQueue.offer(Fs2SimpleQueue.scala:19)
        at sttp.client3.asynchttpclient.WebSocketImpl.send$$anonfun$1(WebSocketImpl.scala:53)
        at cats.effect.IOFiber.runLoop(IOFiber.scala:385)
        at cats.effect.IOFiber.asyncContinueSuccessfulR(IOFiber.scala:1172)
        at cats.effect.IOFiber.run(IOFiber.scala:129)
        at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:384)
java.lang.NullPointerException
        at delay @ sttp.client3.impl.cats.CatsMonadError.eval(CatsMonadError.scala:19)
        at flatten @ sttp.client3.impl.cats.CatsMonadError.flatten(CatsMonadError.scala:21)
        at *> @ io.github.paoloboni.http.ratelimit.RateLimiter$.make(RateLimiter.scala:55)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)
        at flatMap @ fs2.Compiler$TargetLowPriority$MonadErrorTarget.flatMap(Compiler.scala:152)

I can't pinpoint where this is coming from though. But visualVM clearly indicates the growing lowest baseline memory usage pattern.

image

A quick heapdump yields image

The heapdump is available at: https://drive.google.com/file/d/1qXVp1qeBrsYHcub7b-2AwO__PMGa1KNl/view?usp=sharing

Swoorup commented 3 years ago

Despite updating to simply ignore the event from the stream as mentioned here: https://github.com/typelevel/fs2/issues/2568#issuecomment-903274824, I still run into OOM issues.

Swoorup commented 3 years ago

Tried both circular (drop earliest) and bounded (backpressure) without any success. Maybe it is sttp related?

 (for {
      _     <- Stream.eval(Logger[F].debug("ws connecting to: " + uri.toString()))
      queue <- Stream.eval(Queue.circularBuffer[F, Option[DataFrame]](10))
      _ <- Stream.resource(
        F.background(
          basicRequest
            .get(uri)
            .response(asWebSocketStreamAlways(Fs2Streams[F])(webSocketFramePipe(queue)))
            .send(client)
            .flatMap { response =>
              Logger[F].debug("response: " + response)
            }
            .void
        )
      )
    } yield queue).flatMap(Stream.fromQueueNoneTerminated(_))
paoloboni commented 3 years ago

I don't think the problem is in sttp, because if you change the pipe to not use the queue, and just consume directly the elements in it, then you won't see the leak.

Swoorup commented 3 years ago

Yep, you are right, something to do with fs2.Stream.map.

Swoorup commented 3 years ago

Can confirm no further issues, fixed in https://github.com/paoloboni/binance-scala-client/pull/136

paoloboni commented 3 years ago

Fixed in v1.3.4