typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.36k stars 597 forks source link

Topic suffers under heavy load, `Set` comparison CPU intensive #1406

Open SystemFw opened 5 years ago

SystemFw commented 5 years ago

There is a problem with Topic (and I suspect with Signal as well), probably due to how CPU intensive Set comparison is. One of my teams had problems where reasonably heavy traffic was causing thread starvation due to the CPU work needed for that comparison. I was able to fix by porting the old Topic (without PubSub), but we need to address this because it's a pretty serious regression right now. I'm trying to come up with a reproducible example, which isn't super easy.

SystemFw commented 5 years ago

In the meantime, I can share the stack trace commonly seen in the problematic scenario

"ForkJoinPool-2-worker-6" #83 daemon prio=5 os_prio=0 tid=0x00007f56fb6d3000 nid=0x13bc runnable [0x00007f56db7fd000]
   java.lang.Thread.State: RUNNABLE
        at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:149)
        at scala.Tuple2.hashCode(Tuple2.scala:24)
        at scala.runtime.Statics.anyHash(Statics.java:122)
        at scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87)
        at scala.collection.immutable.HashMap.computeHash(HashMap.scala:96)
        at scala.collection.immutable.HashMap.contains(HashMap.scala:59)
        at scala.collection.MapLike$DefaultKeySet.contains(MapLike.scala:176)
        at scala.collection.GenSetLike.apply(GenSetLike.scala:48)
        at scala.collection.GenSetLike.apply$(GenSetLike.scala:48)
        at scala.collection.AbstractSet.apply(Set.scala:51)
        at scala.collection.immutable.MapLike$ImmutableDefaultKeySet.apply(MapLike.scala:112)
        at scala.collection.Iterator.forall(Iterator.scala:953)
        at scala.collection.Iterator.forall$(Iterator.scala:951)
        at scala.collection.AbstractIterator.forall(Iterator.scala:1429)
        at scala.collection.IterableLike.forall(IterableLike.scala:77)
        at scala.collection.IterableLike.forall$(IterableLike.scala:76)
        at scala.collection.AbstractIterable.forall(Iterable.scala:56)
        at scala.collection.GenSetLike.subsetOf(GenSetLike.scala:107)
        at scala.collection.GenSetLike.subsetOf$(GenSetLike.scala:107)
        at scala.collection.AbstractSet.subsetOf(Set.scala:51)
        at scala.collection.GenSetLike.liftedTree1$1(GenSetLike.scala:124)
        at scala.collection.GenSetLike.equals(GenSetLike.scala:124)
        at scala.collection.GenSetLike.equals$(GenSetLike.scala:119)
        at scala.collection.AbstractSet.equals(Set.scala:51)
        at fs2.concurrent.Topic$.$anonfun$apply$1(Topic.scala:89)
        at fs2.concurrent.Topic$.$anonfun$apply$1$adapted(Topic.scala:89)
        at fs2.concurrent.Topic$$$Lambda$4862/705260399.apply(Unknown Source)
        at cats.kernel.Eq$$anon$106.eqv(Eq.scala:85)
        at fs2.concurrent.PubSub$Strategy$Inspectable$$anon$7.get(PubSub.scala:662)
        at fs2.concurrent.PubSub$Strategy$Inspectable$$anon$7.get(PubSub.scala:637)
        at fs2.concurrent.PubSub$.go$1(PubSub.scala:123)
        at fs2.concurrent.PubSub$.consumeSubscribers$1(PubSub.scala:132)
        at fs2.concurrent.PubSub$.loop$1(PubSub.scala:171)
        at fs2.concurrent.PubSub$.$anonfun$apply$13(PubSub.scala:196)
        at fs2.concurrent.PubSub$$$Lambda$5107/462313517.apply(Unknown Source)
        at cats.effect.concurrent.Ref$SyncRef.spin$1(Ref.scala:249)
        at cats.effect.concurrent.Ref$SyncRef.$anonfun$modify$1(Ref.scala:253)

As the number of subscribers grows, more and more threads end up doing this most of the time, until complete starvation (in our case, kubernetes was restarting the service since the healthcheck endpoint would become unresponsive)

vladimir-popov commented 5 years ago

We have the same issue in our team. In our case we have an income stream with ~ 1k elements per second and ~100 subscribers who just put every elements to the cache. Simple example can looks like:

object Example extends IOApp {
  def doSomething(x: Any) = IO(println(x))
  def arbitraryData = IO(Random.nextInt(100))

  override def run(args: List[String]): IO[ExitCode] = {
    for {
      // topic ← CustomTopic[IO, Int] <- will be used in comparison with default implementation
      topic ← fs2.concurrent.Topic[IO, Int](Random.nextInt(100))
      subscriptions = (1 to 100).map(
        i ⇒ topic.subscribe(1).collect({ case d if d == i ⇒ d }).evalMap(doSomething)
      )
      _ ← (Stream.fixedDelay(10.millis).evalMap(_ ⇒ arbitraryData).to(topic.publish) merge Stream
        .emits(subscriptions)
        .parJoinUnbounded).compile.drain
    } yield {
      ExitCode.Success
    }
  }
}

We tried to solve it by using implementation of the Topic with fs2.concurrent.Queue (not full example):

class CustomTopic[F[_], A](cache: Ref[F, List[Queue[F, A]]])(implicit C: Concurrent[F])
    extends Topic[F, A] {
  override def publish: Sink[F, A] =
    _.evalMap(publish1)

  override def publish1(a: A): F[Unit] =
    cache.get.flatMap { subscribers ⇒
      subscribers.traverse(_.enqueue1(a)).void
    }

  override def subscribe(maxQueued: Int): Stream[F, A] =
    emptyQueue(maxQueued).evalTap(q ⇒ cache.update(_ :+ q)).flatMap(_.dequeue)

  private def emptyQueue(maxQueued: Int): Stream[F, fs2.concurrent.Queue[F, A]] = {
    Stream.bracket(fs2.concurrent.Queue.bounded[F, A](maxQueued))(
      queue ⇒ cache.update(_.filter(_ ne queue))
    )
  }
}

object CustomTopic {
  def apply[F[_], A](implicit C: Concurrent[F]): F[CustomTopic[F, A]] =
    Ref.of[F, List[fs2.concurrent.Queue[F, A]]](List.empty).map(new CustomTopic(_))
}

When we use our custom topic in the example above then we have a very different picture in the profiler: Results with default topic:

results with default Topic

Result with our implementation:

results with custom Topic

UPD: I created PR to illustrate our full solution: #1407

vladimir-popov commented 5 years ago

Besides it we noticed that current implementation of the Topic in our case generates a lot of living instances of the immutable.Queue and the PubSub.Strategy.Inspectable.State:

screenshot 2019-01-29 at 20 05 28
pchlupacek commented 5 years ago

@SystemFw the current implementation of topic won't perform super-well on heavy contention between busy publisher and lot of subscribers. The older implemenatiton was slightly better, specifically due the fact that each subscriber has its own queue where things were fed to, that was in fact registered with the state, and set during publish.

The observation of lot of queues and states is correct, as what you see is a lot of spins and attempts to fight for setting that single ref, which has own single state. Simply too many subscribers :-)

I think there is a workaround to reimplement Topic with slightly modified State and PubSub, again part of registration will be actually the queue of the subscriber.

Another alternatives with current implementation:

Change the topic to deeper hierarchy, i.e. 1 topic handling 16 subs, and these will handle each lets say 16 subs will give you 6 pow 16 subscribers with really low contention.

I wouldn't suggest yet to give up and go to old implementation, as that had different problems, i.e. flow control etc.

pchlupacek commented 5 years ago

@vladimir-popov I am not surprised, the current implementation of topic will perfrom poorly under your load. This is sort of worst case for the current implementation. However I am quite confident topic may be improved with PubSub to perform similarly like your solution.

pchlupacek commented 5 years ago

Also @SystemFw, @vladimir-popov perhaps if we just change the internals of how pubsub works, perhaps we get a better performance w/o changing anything in state logic. Essentially we could try instead of using the ref to store state, to use Lock, and then use guarded var for state. That will cause no waste in state calculations and could be perhaps almost in pair for good paths of pubsub.

SystemFw commented 5 years ago

Actually I'm not sure the problem is contention on Ref, and I don't even know whether it's a problem with PubSub in general, necessarily. I can't be sure cause I don't have an exact reproduction yet, but the initial hypothesis is just that relying on Set comparison like the Strategy for Topic does just wastes a lot of CPU cycles.

The older implemenatiton was slightly better,

Well, the thing is that in my scenario the old implementation just works with no issues, whereas the new one chokes and causes the service to restart continuously, so that's more than slightly better :P

I wouldn't suggest yet to give up and go to old implementation, as that had different problems, i.e. flow control etc.

Well, that's a hot fix that I need to do right now, otherwise everything just chokes, but ideally I would want something based on PubSub, but more scalable, so I agree with you on that. Whether that is striping with 16 subs or something else, I guess we will have to see, but for now I just wanted to signal this, given that it's effectively a regression compared to the old implementation

pchlupacek commented 5 years ago

@SystemFw I think the issue, this demonstrates here is that you have so many concurrent attempts to CAS the State, so the overhead of this is killing the whole thing.

I hope to squeeze this over weekend to ake a look on this, but I feel these are two approach that may fix this

pchlupacek commented 5 years ago

@SystemFw, @vladimir-popov, I have diven a bit into it, and wrote a simple bechmark, that was run with 8 threads, 500 messages in topic and 1 to 128 subscribers.

Code is here(https://gist.github.com/pchlupacek/f033993302ee2a741a6473286306c9b3), the results are below:

[info] Benchmark                          Mode  Cnt      Score       Error  Units
[info] TopicBenchmark.topicBroadcast_1    avgt    3      1.124 ±     0.059  ms/op
[info] TopicBenchmark.topicBroadcast_4    avgt    3      5.128 ±     0.905  ms/op
[info] TopicBenchmark.topicBroadcast_8    avgt    3     20.289 ±     2.331  ms/op
[info] TopicBenchmark.topicBroadcast_16   avgt    3     27.693 ±     7.103  ms/op
[info] TopicBenchmark.topicBroadcast_32   avgt    3    200.506 ±   106.002  ms/op
[info] TopicBenchmark.topicBroadcast_64   avgt    3   1965.638 ±   316.895  ms/op
[info] TopicBenchmark.topicBroadcast_128  avgt    3  12560.988 ± 17843.409  ms/op

From the code you see there are no operations on messages, so this shall measure overhead only.

Hence this is run on 6 core / 12 hyper machine but on 8 jvm threads I would expect the sweet point to be somewhere at 8-16 subscribers. For the larger amount of subscribers, in ideal state the execution shall be something like (Nx/8)*T8, where Nx is number of subscribers to execute, and T8 is the time for 8 subscribers.

I do not understand that much the Error ranges, they seem a bit off for me, and when I compared the individual results they were usually +/- 5% range, not like the 128 shows to be more than 100%. So I suspect this will be something wrong with jmh, or my lack of understanding it.

As you see the results are somewhat expected for 1-16 subscribers, where there is just different from setting up the program and subscribers. Interesting is the difference between 8-16 subscribers, which is getting close to ideal.

Anything after that is complete disaster, and clearly demonstrates the issue with concurrency. I think it confirm my original suspicion of single point contention problem.

I will follow up with more experiments to see how we can avoid the problem.

SystemFw commented 5 years ago

Thanks for looking into this @pchlupacek :)

FYI, the old implementation in the service I mentioned above is handling 1400 subs at peak load without breaking a sweat, on a 2 CPU machine which also has some blocking code on it

pchlupacek commented 5 years ago

@SystemFw I am not surprised that old implementation performs much better. I would say the only limit is the RAM, and obvioulsy more subscribers -> more time to publish one element. Problem of that implementation is that it is highly specialised and yours only control over the OOM in case of slow subscriber is to use bounded queues.

pchlupacek commented 5 years ago

Ok, so I have changed the PubSub to use Lock + Var and this is the result :

[info] Benchmark                          Mode  Cnt     Score    Error  Units
[info] TopicBenchmark.topicBroadcast_1    avgt    3     7.103 ±  0.588  ms/op
[info] TopicBenchmark.topicBroadcast_4    avgt    3    22.579 ±  2.968  ms/op
[info] TopicBenchmark.topicBroadcast_8    avgt    3    42.171 ±  1.685  ms/op
[info] TopicBenchmark.topicBroadcast_16   avgt    3    83.709 ± 13.964  ms/op
[info] TopicBenchmark.topicBroadcast_32   avgt    3   187.778 ± 11.095  ms/op
[info] TopicBenchmark.topicBroadcast_64   avgt    3   438.190 ± 36.365  ms/op
[info] TopicBenchmark.topicBroadcast_128  avgt    3  1192.197 ± 94.964  ms/op

Now this is much better, however still you see that we are blocked by the single contention, so that why you see almost linear time, instead of using parallelism. Also, you will see that this uses only 1.5 cores, clearly showing that the bottleneck is the single ref, however, now, there is no wasted CPU cycles, overall resulting in much better performance.

SystemFw commented 5 years ago

The non lock implementation has the advantage of being...well, lock free, so I'd prefer it if possible. Are there any other strategies to improve this case, for example striping the topic as you suggested above to reduce contention?

I really like the abstraction given by PubSub, however the old Topic was both lock-free and had lower contention, so I feel that anything we do should be proven to be better than the old implementation

pchlupacek commented 5 years ago

@SystemFw this is still lock free. It uses semaphore. So it is semantically locking.

I am not sure that old topic had lower contention per se. It just distributed tasks a bit better. So in situations where you have one publishers and many subscribers it performed better.

SystemFw commented 5 years ago

If it uses semaphore it's not blocking, but it's not lock-free either: if the thread holding the lock cannot proceed you get stuck all the same. More seriously, in the current cancelation model writing safe code with semaphore is hard or even impossible in some cases (that is a problem per se, but for another day).

I am not sure that old topic had lower contention per se. It just distributed tasks a bit better. So in situations where you have one publishers and many subscribers it performed better.

You're probably right, it still has a single Ref, but it's spending less time on it because then you go on the queues. The thing is that one publisher and many subscribers is like the perfect Topic use case, and we're talking an order of magnitude better, so that's great (and after all, you wrote both :P )

pchlupacek commented 5 years ago

@SystemFw yes, and I think there is a solution with this. Event with Ref implementation. I'll update you. Essentially we use PubSub to implement topic in old way :-)

SystemFw commented 5 years ago

Essentially we use PubSub to implement topic in old way

that would be ideal :)

pchlupacek commented 5 years ago

@SystemFw I think current pubsub signatures doesn't allow for the more concurrent subscriber behaviour. I have some ideas on improving it.

I think, that the improved implementation with locking reduces significantly contention. However as you have pointed out still, the one element to pub/sub is taking about 2*N+1 exclusive accesses on single contention ref, whereas the former implementation had only one exclusive access, and N exclusive accesses distributed over individual subscribers, resulting in much better contention control.

I need some time to think how to improve pubsub to have same performance characteristics, whilst not losing the features of it as we have today.

pchlupacek commented 5 years ago

@SystemFw I went ahead a bit and just created the benchmark of the queue-based topic implementation. I took liberty of using the code submitted by @vladimir-popov, as a reference point. Alos I have added 1k, and 2k subscriber benhcmarks.

These are the results :

[info] Benchmark                           Mode  Cnt     Score      Error  Units
[info] TopicBenchmark.topicBroadcast_1     avgt    3     5.468 ±    1.231  ms/op
[info] TopicBenchmark.topicBroadcast_4     avgt    3    13.200 ±    1.766  ms/op
[info] TopicBenchmark.topicBroadcast_8     avgt    3    23.985 ±    2.859  ms/op
[info] TopicBenchmark.topicBroadcast_16    avgt    3    41.069 ±    7.160  ms/op
[info] TopicBenchmark.topicBroadcast_32    avgt    3    78.257 ±   16.757  ms/op
[info] TopicBenchmark.topicBroadcast_64    avgt    3   137.253 ±   80.961  ms/op
[info] TopicBenchmark.topicBroadcast_128   avgt    3   276.561 ±   10.986  ms/op
[info] TopicBenchmark.topicBroadcast_512   avgt    3  1198.688 ±  450.535  ms/op
[info] TopicBenchmark.topicBroadcast_1024  avgt    3  2518.192 ± 1873.308  ms/op
[info] TopicBenchmark.topicBroadcast_2048  avgt    3  5719.329 ± 1568.653  ms/op

One interesting observation I made is that neither this implementation was able to utilize cores at full. Also from the performance standpoint, it is interesting that this is not that better compare to semantically locking solution. That means there is good chance we can get in pair performance with PubSub

pchlupacek commented 5 years ago

update: Somehow current implementation when Semantic lock is put in place is always using single thread. This should not be the case, as the F.start is used on completion of subscriber. I am investigating this more.

pchlupacek commented 5 years ago

update: these are top contributors to running a lot of subscribers under pub/sub:

screenshot 2019-02-10 at 09 32 30

Neither of them is related to concurrency. My theory is that cost of looking up the subscriber status in map is extraordinaly high.

pchlupacek commented 5 years ago

Just a quick update on this one. I am still experimenting with the ways how to tackle on this one. It seems that i may need to touch pubsub strategy signature a bit, but not yet sure 100% how.

Not sure about urgency of this one, but I am targeting this to be resolved until next major release

In the menarime there is workaround with manual implementatio.

Let me know if that works with you.

SystemFw commented 5 years ago

It does work for me, but perhaps we should make it a bit easier for other people to grab the old implementation if they stumble upon this

mpilquist commented 5 years ago

@pchlupacek Do you think this is solvable in next week or so? We'll have a minor API breakage release in 1.1 where you could change PubSub signature if needed.

epifab commented 3 years ago

It does work for me, but perhaps we should make it a bit easier for other people to grab the old implementation if they stumble upon this

yes this would be ideal. @SystemFw any pointers? I'd love to find a workaround while this gets fixed properly

SystemFw commented 3 years ago

I think the latest commit with the old implementation is this one https://github.com/typelevel/fs2/blob/4ddd75a2dc032b7604dc1205c86d7d6adc993859/core/shared/src/main/scala/fs2/concurrent/Topic.scala

Which should be mostly source compatible. Are you hitting a problem because of Topic under load?

epifab commented 3 years ago

@SystemFw awesome, thanks a lot! I wasn't 100% sure that the poor performances were indeed caused by Topic under heavy load, but I am indeed now. Blindly reverting to the old implementation made a huge impact, talking about 2x or 3x times better...

epifab commented 3 years ago

actually, the performance boost is even more impressive than that. I think I can handle approximately 5x more messages. what I didn't discover yet though are the implications of this workaround

SystemFw commented 3 years ago

FYI in fs2 3.0 we have reinstated the implementation strategy of the old version of Topic

vasilmkd commented 3 years ago

As far as I understand, this issue is now present only on series/2.5.x?