typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.35k stars 596 forks source link

Broadcast. High CPU/Memory usage #2178

Open iRevive opened 3 years ago

iRevive commented 3 years ago

Using Stream#broadcastTo with the high message rate source leads to increased CPU usage. Meanwhile, Akka shows 2-3x less CPU usage.

I tried different JVMs: OpenJDK 11, OpenJDK 14, OpenJDK 14 OpenJ9. The CPU usage is more or less the same on every virtual machine.

Prerequisites

JVM: OpenJDK 64-Bit Server VM (11.0.4+11, mixed mode) fs2: 2.4.6 akka-streams: 2.6.6

Scenario 1. fs2. map. ~15% CPU usage

import cats.effect.{ExitCode, IO, IOApp}
import fs2._

object CPUTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val source: Stream[Pure, Int] = Stream.range(1, Int.MaxValue)
    val discard: Pipe[IO, Int, Unit] = _.map(_ => ())

    source.map(discard).compile.drain.as(ExitCode.Success)
  }
}

image

Scenario 2. fs2. Broadcast to 1 pipe. ~28% CPU usage

import cats.effect.{ExitCode, IO, IOApp}
import fs2._

object CPUTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val source: Stream[Pure, Int] = Stream.range(1, Int.MaxValue)
    val discard: Pipe[IO, Int, Unit] = _.map(_ => ())

    source.broadcastTo(pipe).compile.drain.as(ExitCode.Success)
  }
}

image

Scenario 3. fs2. Broadcast to 100 pipes. 50-80% CPU usage

import cats.effect.{ExitCode, IO, IOApp}
import fs2._

object CPUTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val source: Stream[Pure, Int] = Stream.range(1, Int.MaxValue)
    val discard: Pipe[IO, Int, Unit] = _.map(_ => ())
    val pipes: List[Pipe[IO, Int, Unit]] = List.fill(100)(discard)

    source.broadcastTo(pipes: _*).compile.drain.as(ExitCode.Success)
  }
}

image

Scenario 4. Akka. Broadcast to 100 pipes. ~28% CPU usage

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ClosedShape, Materializer}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, RunnableGraph, Sink, Source}
import cats.effect.{ExitCode, IO, IOApp}

object AkkaCPUTest {

  def main(args: Array[String]): Unit = {
    implicit val actorSystem: ActorSystem = ActorSystem()
    implicit val mat: Materializer = Materializer(actorSystem)

    val source: Source[Int, NotUsed] = Source(Range(1, Int.MaxValue))
    val discard: Flow[Int, Unit, NotUsed] =  Flow[Int].map(_ => ())
    val pipes: List[Flow[Int, Unit, NotUsed]] = List.fill(100)(discard)
    val sinks = pipes.map(_.toMat(Sink.ignore)(Keep.right))

    val broadcastGraph = GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Int](pipes.size))

      source ~> broadcast
      sinks.foreach(sink => broadcast ~> sink)

      ClosedShape
    }

    val _ = RunnableGraph.fromGraph(broadcastGraph).run()
  }

}

image

Related issues: https://github.com/typelevel/fs2/issues/1406 https://github.com/typelevel/fs2/issues/1469

iRevive commented 3 years ago

The use-case

The service consumes messages from the queue/socket (Kafka, ZMQ, etc) and broadcasts every message to N different pipelines.

Perhaps I'm doing it wrong and I don't need broadcastTo(...) at all? Can I achieve similar behavior using a different API? I was thinking about fs2.concurrent.Queue per pipeline as an alternative solution.

nikiforo commented 3 years ago

I haven't measured perfomance of akka-streams yet, but I've repeated almost everything you've written about fs2 using jmh. This is the benchmark

@State(Scope.Thread)
class BroadcastBenchmark {
  val executor = Executors.newSingleThreadExecutor
  implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))
  implicit val concurrent: Concurrent[IO] = IO.ioConcurrentEffect

  @Param(Array("1000", "100000"))
  var streamSize: Int = _

  val fs2Stream = Stream.range(1, streamSize).covary[IO] // Every number goes to it own chunk
  val drainPipe: Pipe[IO, Int, INothing] = _.drain

  @Benchmark
  def noPipe(): Unit =
    fs2Stream.compile.drain.unsafeRunSync()

  @Benchmark
  def through(): Unit =
    fs2Stream.through(drainPipe).compile.drain.unsafeRunSync()

  @Benchmark
  def broadcastTo1(): Unit =
    fs2Stream.broadcastThrough(1)(drainPipe).compile.drain.unsafeRunSync()

  @Benchmark
  def broadcastTo10(): Unit =
    fs2Stream.broadcastThrough(10)(drainPipe).compile.drain.unsafeRunSync()

  @Benchmark
  def broadcastTo100(): Unit =
    fs2Stream.broadcastThrough(100)(drainPipe).compile.drain.unsafeRunSync()

  @TearDown
  def shutdown(): Unit = executor.shutdown()
}

This is the result:

[info] Benchmark                           (streamSize)   Mode  Cnt        Score        Error  Units
[info] AkkaStreamBenchmark.broadcastTo1            1000  thrpt    5     4245,855 ±   1822,801  ops/s
[info] AkkaStreamBenchmark.broadcastTo1          100000  thrpt    5     4369,746 ±    117,701  ops/s
[info] AkkaStreamBenchmark.broadcastTo10           1000  thrpt    5     1097,015 ±     32,263  ops/s
[info] AkkaStreamBenchmark.broadcastTo10         100000  thrpt    5     1104,885 ±    113,161  ops/s
[info] AkkaStreamBenchmark.broadcastTo100          1000  thrpt    5      119,786 ±      1,063  ops/s
[info] AkkaStreamBenchmark.broadcastTo100        100000  thrpt    5      121,689 ±     19,877  ops/s
[info] AkkaStreamBenchmark.noPipe                  1000  thrpt    5  1028299,135 ± 148425,624  ops/s
[info] AkkaStreamBenchmark.noPipe                100000  thrpt    5  1054733,488 ±  66014,908  ops/s
[info] AkkaStreamBenchmark.through                 1000  thrpt    5   405338,190 ±   4622,959  ops/s
[info] AkkaStreamBenchmark.through               100000  thrpt    5   382236,503 ±  37407,147  ops/s

The most interesting part of the result is that broadcast1 is one hundred times slower, than through, in spite of being semantically equivalent.

iRevive commented 3 years ago

@nikiforo thank you for the benchmark. The difference between through and broadcast is significant.

balthz commented 3 years ago

Am I reading the benchmark output lines for streamSize ∈ {1000, 100000} correctly that even though we’re processing 100 times more elements in the streamSize = 100000 run, the runtime is more or less the same as with streamSize = 100?

nikiforo commented 3 years ago

Am I reading the benchmark output lines for streamSize ∈ {1000, 100000} correctly that even though we’re processing 100 times more elements in the streamSize = 100000 run, the runtime is more or less the same as with streamSize = 100?

@balthz, :blush: seems that fs2Stream should be setted in @Setup, I'll fix it and resubmit

nikiforo commented 3 years ago

@balthz, I've changed two things in the Benchmark:

The benchmark:

@State(Scope.Thread)
class BroadcastBenchmark {
  val jExecutor = Executors.newSingleThreadExecutor
  val sExecutor = ExecutionContext.fromExecutor(jExecutor)
  val blocker = Blocker.liftExecutionContext(sExecutor)
  implicit val cs: ContextShift[IO] = IO.contextShift(sExecutor)
  implicit val concurrent: Concurrent[IO] = IO.ioConcurrentEffect

  val drainPipe: Pipe[IO, Int, INothing] = _.drain

  @Param(Array("10", "1000", "100000"))
  var streamSize: Int = _

  var fs2Stream: Stream[IO, Int] = _

  @Setup
  def start(): Unit = {
    fs2Stream = Stream.range(1, streamSize).covary[IO] // Every number goes to it own chunk
  }

  @Benchmark
  def noPipe(): Unit = execute(fs2Stream)

  @Benchmark
  def through(): Unit = execute(fs2Stream.through(drainPipe))

  @Benchmark
  def broadcastTo1(): Unit = execute(fs2Stream.broadcastThrough(1)(drainPipe))

  @Benchmark
  def broadcastTo10(): Unit = execute(fs2Stream.broadcastThrough(10)(drainPipe))

  @Benchmark
  def broadcastTo100(): Unit = execute(fs2Stream.broadcastThrough(100)(drainPipe))

  @TearDown
  def shutdown(): Unit = jExecutor.shutdown()

  private def execute[O](s: Stream[IO, O]) = blocker.blockOn(s.compile.drain).unsafeRunSync()
}

The result

[info] Benchmark                          (streamSize)   Mode  Cnt      Score      Error  Units
[info] BroadcastBenchmark.broadcastTo1              10  thrpt    7   3011,053 ±  168,120  ops/s
[info] BroadcastBenchmark.broadcastTo1            1000  thrpt    7    107,267 ±    3,405  ops/s
[info] BroadcastBenchmark.broadcastTo1          100000  thrpt    7      1,110 ±    0,023  ops/s
[info] BroadcastBenchmark.broadcastTo10             10  thrpt    7    642,015 ±   37,393  ops/s
[info] BroadcastBenchmark.broadcastTo10           1000  thrpt    7     15,928 ±    0,631  ops/s
[info] BroadcastBenchmark.broadcastTo10         100000  thrpt    7      0,159 ±    0,006  ops/s
[info] BroadcastBenchmark.broadcastTo100            10  thrpt    7     51,828 ±    3,519  ops/s
[info] BroadcastBenchmark.broadcastTo100          1000  thrpt    7      0,809 ±    0,077  ops/s
[info] BroadcastBenchmark.broadcastTo100        100000  thrpt    7      0,008 ±    0,001  ops/s
[info] BroadcastBenchmark.noPipe                    10  thrpt    7  86335,522 ± 2813,905  ops/s
[info] BroadcastBenchmark.noPipe                  1000  thrpt    7   8454,641 ±  520,836  ops/s
[info] BroadcastBenchmark.noPipe                100000  thrpt    7     96,398 ±    1,572  ops/s
[info] BroadcastBenchmark.through                   10  thrpt    7  55143,756 ± 2378,198  ops/s
[info] BroadcastBenchmark.through                 1000  thrpt    7   2022,979 ±   62,640  ops/s
[info] BroadcastBenchmark.through               100000  thrpt    7     20,612 ±    0,794  ops/s

Ration of throughputs BroadcastBenchmark.broadcastTo1 to BroadcastBenchmark.through is now 1 : 20

diesalbla commented 3 years ago

@nikiforo I have run today the benchmarks, using the current main branch (to be 3.0.0 release). It seems that the problem has been solved in this release. Given the differences between these two branches, not sure if any fix can easily be backported.

[info] Benchmark                          (streamSize)   Mode  Cnt      Score       Error  Units
[info] BroadcastBenchmark.broadcastTo1              10  thrpt    7  59585.114 ±   779.182  ops/s
[info] BroadcastBenchmark.broadcastTo1            1000  thrpt    7  59281.966 ±   414.307  ops/s
[info] BroadcastBenchmark.broadcastTo1          100000  thrpt    7  57560.715 ±  5457.228  ops/s
[info] BroadcastBenchmark.broadcastTo10             10  thrpt    7  58253.115 ±   779.112  ops/s
[info] BroadcastBenchmark.broadcastTo10           1000  thrpt    7  57614.055 ±  3648.170  ops/s
[info] BroadcastBenchmark.broadcastTo10         100000  thrpt    7  54289.644 ± 11283.495  ops/s
[info] BroadcastBenchmark.broadcastTo100            10  thrpt    7  56528.557 ±   324.216  ops/s
[info] BroadcastBenchmark.broadcastTo100          1000  thrpt    7  57334.595 ±  1157.871  ops/s
[info] BroadcastBenchmark.broadcastTo100        100000  thrpt    7  57276.760 ±   558.260  ops/s
[info] BroadcastBenchmark.noPipe                    10  thrpt    7  60658.243 ±  1057.896  ops/s
[info] BroadcastBenchmark.noPipe                  1000  thrpt    7  60790.862 ±   363.710  ops/s
[info] BroadcastBenchmark.noPipe                100000  thrpt    7  61149.630 ±   466.613  ops/s
[info] BroadcastBenchmark.through                   10  thrpt    7  59219.496 ±   581.634  ops/s
[info] BroadcastBenchmark.through                 1000  thrpt    7  59507.022 ±   326.828  ops/s
[info] BroadcastBenchmark.through               100000  thrpt    7  58832.862 ±   248.395  ops/s
SystemFw commented 3 years ago

Well I'm about to reimplement broadcastThrough anyway (and remove broadcast and Broadcast). I do wonder why the performance has changed, I don't think we've touched it yet, maybe it's just ce3 being faster

SystemFw commented 3 years ago
val executor = Executors.newSingleThreadExecutor

mmm

diesalbla commented 3 years ago

@SystemFw So, the benchmarks that I run with the new version may have been different, on the translation of CE2 to CE3. https://github.com/typelevel/fs2/pull/2296. It does not seem as if that single threaded executor got applied to them.

nikiforo commented 3 years ago
val executor = Executors.newSingleThreadExecutor

mmm

Actually that was intentional! :))) And I want to discuss this a bit, if you don't mind. I'm not sure whether that is correct. Moreover, I've seen that .global is usually used in benchmarks. I've read the recommendation to have at least two threads to eliminate halts(but from my POV this rule should be applied to real code, not benchmarks). However, I think that it's more fair to compare perfomance(at least in this particular example) using single threaded concurrent.

I've also tried to goolge smth out with jmh, thread pool etc, but found nothing. Do you have any guidelines or recommendations about jmh-benchmarking and thread pools? And why singleThreadExecutor is a no go?

@SystemFw @diesalbla

nikiforo commented 3 years ago

@nikiforo I have run today the benchmarks, using the current main branch (to be 3.0.0 release). It seems that the problem has been solved in this release.```

@diesalbla, I expected to see broadcastTo100 ten times slower than broadcastTo10, which in turn should be ten times slower than broadcastTo1 :confused: Moreover I thought that time should increase linear to the size of the supplied stream :fearful:

SystemFw commented 3 years ago

@nikiforo The reason I commented that is that a large amount of Cats Effect 3 performance improvements (in real world scenarios, not just flatMap loops) is due to its new work stealing thread pool, so if you don't use it you might miss out on significant perf

nikiforo commented 3 years ago

@diesalbla @SystemFw,

package fs2
package benchmark

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.openjdk.jmh.annotations.{Benchmark, Param, Scope, State}

@State(Scope.Thread)
class BroadcastBenchmark {

  val drainPipe: Pipe[IO, Int, INothing] = _.drain

  @Param(Array("10", "1000", "100000"))
  var streamSize: Int = _

  def fs2Stream: Stream[IO, Int] = Stream.range(1, streamSize).covary[IO] // Every number goes to it own chunk

  @Benchmark
  def noPipe(): Unit = execute(fs2Stream)

  @Benchmark
  def through(): Unit = execute(fs2Stream.through(drainPipe))

  @Benchmark
  def broadcastTo1(): Unit = execute(fs2Stream.broadcastThrough(drainPipe))

  @Benchmark
  def broadcastTo10(): Unit = execute(fs2Stream.broadcastThrough(Seq.fill(10)(drainPipe): _*))

  @Benchmark
  def broadcastTo100(): Unit = execute(fs2Stream.broadcastThrough(Seq.fill(100)(drainPipe): _*))

  private def execute[O](s: Stream[IO, O]) = s.compile.drain.unsafeRunSync()
}
commit 8711623915614d1a789acf88c5e488c735cf32f7 (HEAD, origin/main)
Merge: a95ee4491 25a9cfa7f
Author: Lars Hupel <lars.hupel@mytum.de>
Date:   Wed Mar 10 20:37:19 2021 +0100
benchmark/ jmh:run -i 5 -wi 3 -f1 -t1 fs2.benchmark.BroadcastBenchmark
[info] Benchmark                          (streamSize)   Mode  Cnt      Score       Error  Units
[info] BroadcastBenchmark.broadcastTo1              10  thrpt    5   2417,671 ±   250,849  ops/s
[info] BroadcastBenchmark.broadcastTo1            1000  thrpt    5     65,955 ±    24,632  ops/s
[info] BroadcastBenchmark.broadcastTo1          100000  thrpt    5      0,809 ±     0,209  ops/s
[info] BroadcastBenchmark.broadcastTo10             10  thrpt    5   1040,351 ±   108,177  ops/s
[info] BroadcastBenchmark.broadcastTo10           1000  thrpt    5     26,999 ±     3,637  ops/s
[info] BroadcastBenchmark.broadcastTo10         100000  thrpt    5      0,272 ±     0,057  ops/s
[info] BroadcastBenchmark.broadcastTo100            10  thrpt    5    136,757 ±    21,407  ops/s
[info] BroadcastBenchmark.broadcastTo100          1000  thrpt    5      2,850 ±     0,404  ops/s
[info] BroadcastBenchmark.broadcastTo100        100000  thrpt    5      0,031 ±     0,003  ops/s
[info] BroadcastBenchmark.noPipe                    10  thrpt    5  87944,581 ± 11867,569  ops/s
[info] BroadcastBenchmark.noPipe                  1000  thrpt    5  10571,977 ±   778,623  ops/s
[info] BroadcastBenchmark.noPipe                100000  thrpt    5    118,984 ±     7,147  ops/s
[info] BroadcastBenchmark.through                   10  thrpt    5  63908,399 ±  2748,942  ops/s
[info] BroadcastBenchmark.through                 1000  thrpt    5   3379,372 ±    34,668  ops/s
[info] BroadcastBenchmark.through               100000  thrpt    5     37,355 ±     1,078  ops/s

Ration of throughputs BroadcastBenchmark.broadcastTo1 to BroadcastBenchmark.through is now 1 : 25 :worried: :scream:

He-Pin commented 2 years ago

Hi team, is there an update about this, thanks.

diesalbla commented 2 years ago

I have retried the benchmarks today, on my local laptop, with current main branch:

Benchmark       (streamSize)   Mode  Cnt      Score      Error  Units
broadcastTo1              10  thrpt    5   1610.128 ±  981.404  ops/s
broadcastTo1            1000  thrpt    5     69.026 ±    5.545  ops/s
broadcastTo1          100000  thrpt    5      0.564 ±    0.297  ops/s
broadcastTo10             10  thrpt    5    703.255 ±  106.253  ops/s
broadcastTo10           1000  thrpt    5     17.701 ±    3.800  ops/s
broadcastTo10         100000  thrpt    5      0.169 ±    0.032  ops/s
broadcastTo100            10  thrpt    5     99.769 ±    2.559  ops/s
broadcastTo100          1000  thrpt    5      1.997 ±    0.185  ops/s
broadcastTo100        100000  thrpt    5      0.019 ±    0.003  ops/s
noPipe                    10  thrpt    5  84179.766 ± 1327.783  ops/s
noPipe                  1000  thrpt    5   5933.968 ±   66.318  ops/s
noPipe                100000  thrpt    5     56.078 ±    0.651  ops/s
through                   10  thrpt    5  54918.753 ±  255.205  ops/s
through                 1000  thrpt    5   1702.249 ±   37.491  ops/s
through               100000  thrpt    5     15.439 ±    0.072  ops/s

There is still the performance difference between through and broadcastTo1.