mp911de / spinach

Scalable Java Disque client
Apache License 2.0
34 stars 4 forks source link

Rx triggers Max CPU and few addjobs #32

Open kiddouk opened 6 years ago

kiddouk commented 6 years ago

Version : 1.0.0-SNAPSHOT API: reactive. Use case: trying to spawn 6M+ messages to disque Reproduction :

        val disqueClient = DisqueClient.create(DisqueURI.create("disque://localhost:7711"))
        var conn = disqueClient.connect().reactive()

        Observable.just("channel")
                           .flatMap { _ ->
                                  val r = (1..6000000).asIterable().map { it.toString() }
                                  Observable.from(r)
                            }
                            .compose(monitor("About to spool"))
                           .flatMap { w -> conn.addjob("test", w, 1000, TimeUnit.MILLISECONDS) }
                           .compose(monitor("Spooled"))
                          .subscribe(
            { x -> },
            {err -> logger.catching(err)},
            { println("Completed") }
           )

with

    fun <T> monitor(message: String) = Transformer<T, T>() {
        observable -> observable.buffer(10, TimeUnit.SECONDS)
                                .doOnNext { x -> logger.info("${message} : collected ${x.size}" ) }
                                .flatMap { x -> Observable.from(x) }
    }

What I am experiencing:

A few messages will be sent to the disque server and after a while (random), spinach will stop sending messages. I have 100% CPU on all core of my machine. a jstack indicates that we are busy in kevent (yes, I am on a mac).

Reading a bit on Netty, I decided to recompile Lettuce and Spinach with Netty 4.1.28.Final and to add the native kqueue to spinach in order to enjoy some native polling. I read here and ther that Netty has a bug with epoll. I havent been able to see if the problem remains also fo OSX but since I also need that code to run on Linux (and that I experience the same issue on this plateform), I thought that this was worth a try. No luck.

Any idea of what is going on?

mp911de commented 6 years ago

Thanks for the reproducer. Looking at your code, you're creating a lot of inner subscriptions. Depending on CPU and memory limits, this can easily eat up a lot of resources as the flapMap operators create a merged stream. On my machine, I get the following trace:

  java.lang.Thread.State: RUNNABLE
      at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:691)
      at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:562)
      at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:851)
      at com.lambdaworks.redis.ReactiveCommandDispatcher$ObservableCommand.complete(ReactiveCommandDispatcher.java:127)
      at com.lambdaworks.redis.protocol.CommandHandler.decode(CommandHandler.java:187)
      at com.lambdaworks.redis.protocol.CommandHandler.channelRead(CommandHandler.java:153)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
      at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
      at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
      at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
      at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:326)
      at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1320)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
      at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:334)
      at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:905)
      at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:563)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:504)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:418)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:390)
      at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
      at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
      at java.lang.Thread.run(Thread.java:745)

After about 75k messages, the merge operator is kept busy with iterating over its inner children consuming a lot of memory so your CPU is either busy spinning or busy with GC.

Please note that Spinach isn't really maintained anymore until Disque gets reanimated through Redis Modules.