typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.36k stars 596 forks source link

Performance Megathread #768

Closed djspiewak closed 7 years ago

djspiewak commented 7 years ago

So @rossabaker has been doing a lot of benchmarking lately, with some very minor assistance from myself, and we've been working to analyze the results. We're still lacking a good concurrency benchmark (though I have an idea that I think is reasonable), but we have semi-realistic benchmarks for most of the rest.

You can run the benchmarks yourself by cloning the project and running sbt jmh:run. Suffice it to say: fs2 is slow (sort of).

Stream benchmarking is complex, so an assertion like that really isn't, in and of itself, justifiable. But we can break things down more precisely:

Focusing in on the per-chunk overhead, I've been spending a fair bit of time with StreamCore recently, as well as staring at profiler reports. A couple things jumped out at me:

Data is still coming in. Ideas still flowing. I'm going to do some more in-depth profiling over the weekend to see if I can suss out a clearer picture from all the indirection. But I figured that, at this point, we have enough that it's worth sharing so other people can chime in. The biggest tl;dr really is that fs2's per-chunk overhead is so incredibly high that it entirely washes out the benefits of chunking on many workloads (jury's out on "many" vs "most"). This per-chunk overhead, relative to scalaz-stream, almost certainly comes from the highly indirect and multi-tier algebraic encoding.

Both for reasons of making the codebase less insane and impenetrable, and to improve performance, I think we should seriously explore more direct encodings. Remember that two major things have changed in scalac since @pchiusano did his initial work on this: GADT support improved considerably in 2.11.8 (and incrementally again in 2.12), and SI-2712 was addressed. Those two changes alone likely open up several avenues which were impossible in the early days. Additionally, there are some inner-class skolem techniques which were not explored even at the time, which may be applicable here. In any case, I feel I'm on pretty firm ground saying that the current encoding is causing problems.


For those who just want to look at some profiles, you can dig through traces using Java MissionControl (jmc in JDK 8.0). Here's a tarball from a quick run on my machine.

djspiewak commented 7 years ago

I've updated the tarball link in the OP with some more interesting data. Basically, I gathered up all of the fs2 benchmarks into a single run and profiled them all at once, to amplify the signal a bit more. I also enabled object profiling in JFR, so we could get an idea of the kind of effect heap pressure might be having.

First, the raw timings (the far-right column is op/s ± stddev; higher is better):

Cp.fs2Async                                          N/A  thrpt   20     61.881 ±   4.541  ops/s
Cp.fs2Sync                                           N/A  thrpt   20    118.709 ±   1.974  ops/s
Cp.scalazStreamIo                                    N/A  thrpt   20    150.855 ±   5.223  ops/s
Cp.scalazStreamNio                                   N/A  thrpt   20    127.597 ±   1.762  ops/s
FahrenheitToCelsius.fs2                              N/A  thrpt   20     36.187 ±   0.882  ops/s
FahrenheitToCelsius.scalazStream                     N/A  thrpt   20     38.244 ±   0.481  ops/s
FlatMap.fs2Apply                                   10000  thrpt   20    187.046 ±   7.447  ops/s
FlatMap.fs2Delay                                   10000  thrpt   20   1966.210 ±  24.205  ops/s
FlatMap.fs2Now                                     10000  thrpt   20   2784.915 ± 230.332  ops/s
FlatMap.futureApply                                10000  thrpt   20    247.787 ±   8.785  ops/s
FlatMap.futureSuccessful                           10000  thrpt   20    490.079 ±  26.855  ops/s
FlatMap.futureTrampolineEc                         10000  thrpt   20    494.020 ±  11.999  ops/s
FlatMap.monixApply                                 10000  thrpt   20    184.429 ±   5.517  ops/s
FlatMap.monixDelay                                 10000  thrpt   20   4178.997 ±  23.795  ops/s
FlatMap.monixNow                                   10000  thrpt   20   4920.248 ± 154.035  ops/s
FlatMap.scalazApply                                10000  thrpt   20    226.464 ±   8.621  ops/s
FlatMap.scalazDelay                                10000  thrpt   20   1859.340 ± 244.977  ops/s
FlatMap.scalazNow                                  10000  thrpt   20   2526.526 ±  46.266  ops/s
Sum.fs2Chunked                                     10000  thrpt   20  10850.315 ± 181.473  ops/s
Sum.fs2Iterate                                     10000  thrpt   20      3.317 ±   0.105  ops/s
Sum.fs2IterateEval                                 10000  thrpt   20      2.940 ±   0.056  ops/s
Sum.fs2IterateFold                                 10000  thrpt   20      6.544 ±   0.089  ops/s
Sum.fs2PureIterate                                 10000  thrpt   20      3.396 ±   0.036  ops/s
Sum.monixObservableFromAsyncStateActionGlobal      10000  thrpt   20    119.086 ±   5.526  ops/s
Sum.monixObservableFromIterable                    10000  thrpt   20   4838.098 ±  33.385  ops/s
Sum.monixObservableFromStateActionGlobal           10000  thrpt   20   4996.413 ±  60.220  ops/s
Sum.monixObservableFromStateActionSingleThreaded   10000  thrpt   20   5532.786 ±  29.232  ops/s
Sum.scalaStream                                    10000  thrpt   20   1468.308 ±  16.925  ops/s
Sum.scalazStreamEmitAll                            10000  thrpt   20     60.444 ±   1.432  ops/s
Sum.scalazStreamIterate                            10000  thrpt   20     23.433 ±   0.485  ops/s
Sum.scalazStreamIterateEval                        10000  thrpt   20     15.606 ±   0.290  ops/s
Sum.scalazStreamIterateFold                        10000  thrpt   20     50.661 ±   0.506  ops/s
WordCount.fs2Sequential                              N/A  thrpt   20      1.955 ±   0.106  ops/s
WordCount.monixSequential                            N/A  thrpt   20     26.890 ±   0.487  ops/s
WordCount.scalazStreamParallel                       N/A  thrpt   20      1.859 ±   0.036  ops/s
WordCount.scalazStreamSequential                     N/A  thrpt   20      2.209 ±   0.047  ops/s

Some thoughts from analyzing the flight recorder data (in the tarball):

Nothing really concrete yet, other than Free.Bind being a major hot spot. Heap pressure was less than expected. Really, the best guess is still the massive amounts of indirection (resulting in a larger set of less-frequently invoked functions during stream interpretation), but still working on it.

pchiusano commented 7 years ago

Hey @djspiewak and @rossabaker thanks for looking into this. I'm all for improving FS2's straight-line performance... though not at a cost of messing with the public API.

Had an idea.

The stream interpreter alternates between 1) running some pure code, and then 2) evaluating an effect. The 2) by necessity needs to go through Scope/Free, which seems to be a major source of overhead. But 1) does not, and we can have an optimized code path for the 'do a bunch of pure stuff' part of interpretation. My initial idea for this is to change:

  def step[F[_],O0,O](stack: Stack[F,O0,O])
  : Scope[F,Option[Attempt[(NonEmptyChunk[O],StreamCore[F,O])]]] =

to

  type StepResult[F,O] = Option[Attempt[(NonEmptyChunk[O],StreamCore[F,O])]]
  def step[F[_],O0,O](stack: Stack[F,O0,O])
  : Either[StepResult[F,O], Scope[F,StepResult[F,O]]] =

Then there can be a regular loop that runs step as long as it returns a Left, only going through Scope when needed. And there are probably lots of ways to micro-optimize this idea.

Also, I suspect that optimizing the interpretation of pure segments of the stream is going to be the biggest win, compared to optimizing how it handles effects... which are often doing some sort of IO that will be more of a bottleneck than the stream interpreter.

djspiewak commented 7 years ago

I'm all for improving FS2's straight-line performance... though not at a cost of messing with the public API.

This is my premise as well, though it remains to be seen whether or not Pull and Handle are efficient enough for how they are used. That's basically my secret worry in all this: we may find that some of the straight-line performance problems are fundamental in that API. No direct evidence of it yet, but it's on the table, and if it came to pass we would need to make a choice.

As it stands though, plenty of room to improve entirely internally. As I said, I'm generally a lot more concerned about the algebraic representation (i.e. the internals) than the API itself.

Then there can be a regular loop that runs step as long as it returns a Left, only going through Scope when needed. And there are probably lots of ways to micro-optimize this idea.

That sounds like a good idea, IMO, and a worthy experiment either way. I'm concerned that it might result in some extra allocations, but we can golf that down quite a bit if necessary at the cost of some casting.

which are often doing some sort of IO that will be more of a bottleneck than the stream interpreter.

I wouldn't be so sure. When we're looking at straight-line interpreter performance on the order of 3-5 op/s, it takes one hell of an IO effect to be the bottleneck. To put that in perspective, it's analogous to a series of IO effects which represent reading from a remote server with 250 milliseconds of latency! The interpreter is slow to an order of magnitude which renders moot most "IO dominates" arguments, which is why it's so concerning right now.

pchiusano commented 7 years ago

Wait you are getting 2-3 FS2 steps per second? That would be absurd. Is anyone claiming that? Make sure you actually look at the code backing the benchmarks to see what it is measuring. I was seeing I think 100k-1million steps per second in development. At 1 microsecond, there are very few IO operations that can complete in that time... On Sat, Nov 19, 2016 at 2:06 PM Daniel Spiewak notifications@github.com wrote:

I'm all for improving FS2's straight-line performance... though not at a cost of messing with the public API.

This is my premise as well, though it remains to be seen whether or not Pull and Handle are efficient enough for how they are used. That's basically my number one concern in all this: we may find that some of the straight-line performance problems are fundamental in that API. No direct evidence of it yet, but it's on the table, and if it came to pass we would need to make a choice.

As it stands though, plenty of room to improve entirely internally. As I said, I'm generally a lot more concerned about the algebraic representation (i.e. the internals) than the API itself.

Then there can be a regular loop that runs step as long as it returns a Left, only going through Scope when needed. And there are probably lots of ways to micro-optimize this idea.

That sounds like a good idea, IMO, and a worthy experiment either way. I'm concerned that it might result in some extra allocations, but we can golf that down quite a bit if necessary at the cost of some casting.

which are often doing some sort of IO that will be more of a bottleneck than the stream interpreter.

I wouldn't be so sure. When we're looking at straight-line interpreter performance on the order of 3-5 op/s, it takes one hell of an IO effect to be the bottleneck. To put that in perspective, it's analogous to a series of IO effects which represent reading from a remote server with 250 milliseconds of latency! The interpreter is slow to an order of magnitude which renders moot most "IO dominates" arguments, which is why it's so concerning right now.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-261733006, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQm-ANkciLCZCrG7ZroP371j-eUm8ks5q_0jPgaJpZM4K2xcs .

djspiewak commented 7 years ago

@pchiusano It depends considerably on the benchmark. There is a tarball with extensive results linked from the OP. Or, you know, in the second post from the OP. The iterate benchmarks are very informative, not because they're representative of practical throughput, but because they're effectively only measuring the per-chunk interpreter throughput. Which is 3-5 op/s.

Basically, FS2 only does well when things become hilariously heavily chunked. Under all other circumstances, it is about 7-10x slower than scalaz-stream, and somewhere on the order of 1500x slower than Monix. 3-5 op/s is actually generous on some of the measurements.

pchiusano commented 7 years ago

Could you point us to one where you are seeing extremely low numbers of STEPS per second? Like a 1k element stream takes 300 seconds to process or something crazy like that. On Sat, Nov 19, 2016 at 2:18 PM Daniel Spiewak notifications@github.com wrote:

@pchiusano https://github.com/pchiusano It depends considerably on the benchmark. There is a tarball with extensive results linked from the OP.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-261733639, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQi6Y21ORNWJgvnSJ_iAcNLXvUfbUks5q_0uKgaJpZM4K2xcs .

djspiewak commented 7 years ago

@pchiusano You'll need to be clear about what you mean by steps. As I said, I would consider the iterate benchmarks to be a relatively pure test of the straight-line interpreter throughput, since they disable chunking by definition and just walk through a stream.

All of our data is up, and linked from the OP. And all of the sources on the benchmarks are linked. I don't mean to be terse, but I have provided the evidence you're asking for, if you follow the links.


Edit Upon rereading your post, I think I understand what happened here. We're using a slightly definition of step, and it doesn't help that I accidentally used your definition when constructing my IO strawman (which was an inaccurate strawman). Still, look at the iterate benchmark.

It looks like the interpreter can handle about 35k steps/s on my laptop, based on those benchmarks and what I believe is your definition of step.

pchiusano commented 7 years ago

By a step I'll say the time to produce a single element of the output stream, assuming no elements are being filtered out of the stream. On Sat, Nov 19, 2016 at 2:27 PM Daniel Spiewak notifications@github.com wrote:

@pchiusano https://github.com/pchiusano You'll need to be clear about what you mean by steps. As I said, I would consider the iterate benchmarks to be a relatively pure test of the straight-line interpreter throughput, since they disable chunking by definition and just walk through a stream.

All of our data is up, and linked from the OP. And all of the sources on the benchmarks are linked. I don't mean to be terse, but I have provided the evidence you're asking for, if you follow the links.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-261734153, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQnTVZOkMZqLfPcW8GPMlA94k5Zy-ks5q_021gaJpZM4K2xcs .

djspiewak commented 7 years ago

@pchiusano I ninja-edited my previous post before you replied. tl;dr: definition cross-up, and the interpreter is doing about 35k steps/s. That's still at least an order of magnitude less than what you thought, but it's a far cry from 3-5.

Did a fresh run against master:

[info] Benchmark                                         (size)   Mode  Cnt      Score     Error  Units
[info] Compress.fs2Gzip                                     N/A  thrpt   20     69.063 ±   1.688  ops/s
[info] Compress.fs2GzipGunzip                               N/A  thrpt   20     29.528 ±   1.338  ops/s
[info] Compress.fs2NoIo                                     N/A  thrpt   20     42.912 ±   2.177  ops/s
[info] Compress.fs2Text                                     N/A  thrpt   20    158.135 ±  16.176  ops/s
[info] Compress.scalazStreamGzip                            N/A  thrpt   20    104.161 ±   1.834  ops/s
[info] Compress.scalazStreamGzipGunzip                      N/A  thrpt   20     90.635 ±   0.759  ops/s
[info] Compress.scalazStreamNoIo                            N/A  thrpt   20    233.130 ±  15.283  ops/s
[info] Compress.scalazStreamText                            N/A  thrpt   20     97.866 ±   2.864  ops/s
[info] Cp.fs2Async                                          N/A  thrpt   20     77.739 ±   1.621  ops/s
[info] Cp.fs2Sync                                           N/A  thrpt   20    123.240 ±   1.683  ops/s
[info] Cp.monixIo                                           N/A  thrpt   20    456.148 ±  16.508  ops/s
[info] Cp.scalazStreamIo                                    N/A  thrpt   20    133.946 ±   1.955  ops/s
[info] Cp.scalazStreamNio                                   N/A  thrpt   20    126.973 ±   3.199  ops/s
[info] FahrenheitToCelsius.fs2                              N/A  thrpt   20     42.936 ±   0.319  ops/s
[info] FahrenheitToCelsius.scalazStream                     N/A  thrpt   20     36.936 ±   2.034  ops/s
[info] FlatMap.fs2Apply                                   10000  thrpt   20    265.104 ± 106.108  ops/s
[info] FlatMap.fs2Delay                                   10000  thrpt   20   2876.940 ± 666.090  ops/s
[info] FlatMap.fs2Now                                     10000  thrpt   20   3565.698 ± 750.616  ops/s
[info] FlatMap.futureApply                                10000  thrpt   20     96.812 ±  23.907  ops/s
[info] FlatMap.futureSuccessful                           10000  thrpt   20    384.691 ± 179.041  ops/s
[info] FlatMap.futureTrampolineEc                         10000  thrpt   20    274.442 ± 130.966  ops/s
[info] FlatMap.monixApply                                 10000  thrpt   20     89.764 ±  23.964  ops/s
[info] FlatMap.monixDelay                                 10000  thrpt   20   4066.693 ± 334.661  ops/s
[info] FlatMap.monixNow                                   10000  thrpt   20   5379.569 ±  70.541  ops/s
[info] FlatMap.scalazApply                                10000  thrpt   20    270.350 ±   3.842  ops/s
[info] FlatMap.scalazDelay                                10000  thrpt   20   2487.882 ±  69.783  ops/s
[info] FlatMap.scalazNow                                  10000  thrpt   20   3049.922 ± 108.906  ops/s
[info] Sum.fs2Chunked                                     10000  thrpt   20  13215.398 ± 340.507  ops/s
[info] Sum.fs2Iterate                                     10000  thrpt   20      4.188 ±   0.091  ops/s
[info] Sum.fs2IterateEval                                 10000  thrpt   20      3.708 ±   0.076  ops/s
[info] Sum.fs2IterateFold                                 10000  thrpt   20      8.166 ±   0.262  ops/s
[info] Sum.fs2PureIterate                                 10000  thrpt   20      4.162 ±   0.146  ops/s
[info] Sum.fs2Unfold                                      10000  thrpt   20     11.353 ±   0.294  ops/s
[info] Sum.monixObservableFromAsyncStateActionGlobal      10000  thrpt   20    131.783 ±   5.047  ops/s
[info] Sum.monixObservableFromIterable                    10000  thrpt   20   5214.097 ±  87.598  ops/s
[info] Sum.monixObservableFromStateActionGlobal           10000  thrpt   20   5572.336 ±  27.349  ops/s
[info] Sum.monixObservableFromStateActionSingleThreaded   10000  thrpt   20   6029.938 ±  89.808  ops/s
[info] Sum.scalaStream                                    10000  thrpt   20   1470.258 ±  98.170  ops/s
[info] Sum.scalazStreamEmitAll                            10000  thrpt   20     32.650 ±  12.749  ops/s
[info] Sum.scalazStreamIterate                            10000  thrpt   20     16.677 ±   5.483  ops/s
[info] Sum.scalazStreamIterateEval                        10000  thrpt   20     12.738 ±   2.212  ops/s
[info] Sum.scalazStreamIterateFold                        10000  thrpt   20     37.363 ±   7.191  ops/s
[info] Sum.scalazStreamUnfold                             10000  thrpt   20     29.524 ±   4.729  ops/s
[info] Text.fs2EncodeDecode                                 N/A  thrpt   20    226.524 ± 123.176  ops/s
[info] Text.fs2Wc                                           N/A  thrpt   20    335.747 ±  27.656  ops/s
[info] Text.scalazStreamEncodeDecode                        N/A  thrpt   20     93.357 ±   3.133  ops/s
[info] Text.scalazStreamWc                                  N/A  thrpt   20      3.128 ±   0.073  ops/s
[info] WordCount.fs2Sequential                              N/A  thrpt   20      2.518 ±   0.065  ops/s
[info] WordCount.monixSequential                            N/A  thrpt   20     28.943 ±   1.387  ops/s
[info] WordCount.scalazStreamParallel                       N/A  thrpt   20      2.253 ±   0.015  ops/s
[info] WordCount.scalazStreamSequential                     N/A  thrpt   20      2.322 ±   0.055  ops/s

I think my machine went to sleep once or twice in the middle there, which might explain some of the high standard deviations. I was just letting it run unattended.

pchiusano commented 7 years ago

Okay, that makes more sense.

Interesting that the sum unfold version is much faster, I'd guess that's just because it's basically fusing the iterate and take stages.

djspiewak commented 7 years ago

@pchiusano That was the general intent behind the separate unfold version. @rossabaker suspected that the take might be adding some extra overhead, and we wanted a test which just purely hit the per-chunk overhead of the interpreter in a practical context.

The thing that bothers me, even more than the Monix performance disparity on that test (which is several orders of magnitude), is the scalaz-stream disparity. By the unfold test, scalaz-stream's interpreter is about 2.5x faster than fs2's on a per-chunk basis, which is terrifying given that scalaz-stream's interpreter performance has been a serious problem on several production projects that I know of. I think there are ways to improve things (your idea and others), but the performance as it stands seems problematic. Hence, the work to improve it.

mpilquist commented 7 years ago

Remember that two major things have changed in scalac since @pchiusano did his initial work on this: GADT support improved considerably in 2.11.8 (and incrementally again in 2.12), and SI-2712 was addressed. Those two changes alone likely open up several avenues which were impossible in the early days. Additionally, there are some inner-class skolem techniques which were not explored even at the time, which may be applicable here.

@djspiewak Any advice on potential avenues of investigation here? I may spend some time on experimenting with Free optimizations over the next few days and I'm happy to try other encodings.

mpilquist commented 7 years ago

Here's my first pass at Free improvements -- https://github.com/functional-streams-for-scala/fs2/pull/775. Nothing major but a nice 3x increase in map-heavy structures.

pchlupacek commented 7 years ago

@mpilquist not completely sure if this is right step, but any ideas if adding more pipes is degrading performance in any predictable pattern? I would love to see overhead of each nested Pulls. That perhaps can give us the cost of each Pull vs cost of StreamCore.

I am just thinking of following

Stream.iterate

Stream.iterate.take(1m) Stream.iterate.take(1m).take(1m) Stream.iterate.take(1m).take(1m).take(1m)

Perhaps running these can give us glue where we lost the most of resources.

djspiewak commented 7 years ago

Any advice on potential avenues of investigation here?

My advice would be to flip Stack back to a concrete ADT (rather than the catamorphic encoding) and see where things go south (assuming -Ypartial-unification). I would do the encoding experiments on 2.12.0, just to level the playing field in that regard. I would expect that at least a few of the problems Paul ran into are no longer relevant (e.g. the higher-order existential instantiated with a type lambda issue). I would also expect that fewer (if any) equality proofs are required now.

Some random things to keep in mind…

case class Foo[E, A](f: E => A, e: E) extends Something[A]

def bar[A](s: Something[A]) = s match {
  case Foo(f, e) => f(e)
  case f: Foo[e, A] => f.f(f.e)
}

Note that the e in the type parameter block is case-sensitive.

Sometimes the second case works while the first does not. The pattern matcher seems to be a lot better at managing path-dependent types in scopes than it does at unifying unbound existential type variables (not surprising, really). Most of these inconsistencies were fixed in 2.11.8 (and by extension, 2.12.0), but it's worth keeping in mind.

Another thing to keep in mind is that scalac is enormously better at tracking universals than it is at tracking existentials. This is helpful because you can always convert an existential to a universal by skolemizing into an inner class or function within a scope (classes are managed slightly differently by the typer, and usually behave better). So if you run into an issue where the compiler is just steadfastly refusing to track an existential, try converting to a universal within an inner class within that scope and see how far you get. I have an old ticket (filed against scalaz-stream) where I demonstrate operator fusion against the old algebra. In order to implement operator fusion, I needed to traverse a type-aligned seq, and in order to do that, I needed the skolem trick I'm referencing now.

Another random thought I had is that it might be possible to collapse some of the algebras together. Right now, fs2 is implemented with (as far as I can tell) three connected algebras, not including Free. That introduces a considerable amount of wrapping and indirection, which might be a source of performance problems. In theory, it should be possible to converge some of this stuff, especially if the compiler is now more cooperative. Jury's out on that one…

pchiusano commented 7 years ago

I guess I am less optimistic about changing the basic encoding. Even if you can actually write the pattern matching without Scala complaining, the interpreter creates nontrivial equality proofs which I can't imagine Scala would infer (would be happy to be proven wrong though). And I don't want an interpreter with a bunch of casts in it. I like that the current interpreter fits together like a puzzle. It may be an ugly encoding, but it's pretty hard to screw things up when making tweaks to it; the types are very strong. We really should try to preserve this property if at all possible regardless of encoding.

There also seems to be a lot of low-hanging fruit - when I have a chance I'm going to try doing the optimization for pure code. And I was saying to @mpilquist that adding some primitives for looping might be good too. Using recursion for looping everywhere means you have to repeat the "compilation" of the stream being looped. If it's a primitive, you can do some of this work once. It reminds me of what people do with parser combinators, where for a few combinators (like many), you provide an optimized implementation, even though in theory it could be expressed with recursion.

On Wed, Nov 23, 2016 at 1:19 PM Daniel Spiewak notifications@github.com wrote:

Any advice on potential avenues of investigation here?

My advice would be to flip Stack back to a concrete ADT (rather than the catamorphic encoding) and see where things go south (assuming -Ypartial-unification). I would do the encoding experiments on 2.12.0, just to level the playing field in that regard. I would expect that at least a few of the problems Paul ran into are no longer relevant (e.g. the higher-order existential instantiated with a type lambda issue). I would also expect that fewer (if any) equality proofs are required now.

Some random things to keep in mind…

case class Foo[E, A](f: E => A, e: E) extends Something[A] def bar[A](s: Something[A]) = s match { case Foo(f, e) => f(e) case f: Foo[e, A] => f.f(f.e) }

Note that the e in the type parameter block is case-sensitive.

Sometimes the second case works while the first does not. The pattern matcher seems to be a lot better at managing path-dependent types in scopes than it does at unifying unbound existential type variables (not surprising, really). Most of these inconsistencies were fixed in 2.11.8 (and by extension, 2.12.0), but it's worth keeping in mind.

Another thing to keep in mind is that scalac is enormously better at tracking universals than it is at tracking existentials. This is helpful because you can always convert an existential to a universal by skolemizing into an inner class or function within a scope (classes are managed slightly differently by the typer, and usually behave better). So if you run into an issue where the compiler is just steadfastly refusing to track an existential, try converting to a universal within an inner class within that scope and see how far you get. I have an old ticket (filed against scalaz-stream) where I demonstrate operator fusion against the old algebra. In order to implement operator fusion, I needed to traverse a type-aligned seq, and in order to do that, I needed the skolem trick I'm referencing now.

Another random thought I had is that it might be possible to collapse some of the algebras together. Right now, fs2 is implemented with (as far as I can tell) three connected algebras, not including Free. That introduces a considerable amount of wrapping and indirection, which might be a source of performance problems. In theory, it should be possible to converge some of this stuff, especially if the compiler is now more cooperative. Jury's out on that one…

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-262592897, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQikWI81q4nbO_llfXDM7qePPYpcbks5rBIO9gaJpZM4K2xcs .

djspiewak commented 7 years ago

I guess I am less optimistic about changing the basic encoding. Even if you can actually write the pattern matching without Scala complaining, the interpreter creates nontrivial equality proofs which I can't imagine Scala would infer (would be happy to be proven wrong though).

I'm very confident we can go quite far in this direction. The compiler is a lot more powerful than you're giving it credit for (albeit highly mercurial and inconsistent). Even beyond that, scalac's type equivalence mechanism is extensible (in arbitrary code if we need it), which gives us a lot of flexibility. Basically my point is that we don't need to compromise the encoding to make it fit the compiler's basic view of unification, even assuming that basic view of unification is insufficient.

And I don't want an interpreter with a bunch of casts in it.

No one wants that, though ultimately, if it's the last piece of fruit left on the vine, and we can measure that a castless interpreter is causing problems, it's still internal code. I can tell you which side of that debate I will come down on, if it happens.

But I don't think it's going to happen. You can go a heck of a long way using type-level extensible resolution.

It may be an ugly encoding

Do not underestimate the cost of this ugliness. I'm not the only one who has avoided contributing to the major parts of fs2, and not for lack of time investment in trying to understand it! I can think of at least two other Scala developers just off-hand who are in the same boat. These are real contributors who would have contributed usefully in some fashion who are not, solely due to the encoding. This point remains regardless of whether or not the current encoding can be made fast (which we have by no means precluded).

There also seems to be a lot of low-hanging fruit

+1 And I think we should absolutely see how far the low-hanging stuff can take us before we do anything drastic.

pchiusano commented 7 years ago

Maybe there will be a point where we'll disagree about what tradeoffs to make but sounds like we are on the same page basically about what to do next.

GO TEAM.

On Wed, Nov 23, 2016 at 4:02 PM Daniel Spiewak notifications@github.com wrote:

I guess I am less optimistic about changing the basic encoding. Even if you can actually write the pattern matching without Scala complaining, the interpreter creates nontrivial equality proofs which I can't imagine Scala would infer (would be happy to be proven wrong though).

I'm very confident we can go quite far in this direction. The compiler is a lot more powerful than you're giving it credit for (albeit highly mercurial and inconsistent). Even beyond that, scalac's type equivalence mechanism is extensible (in arbitrary code if we need it), which gives us a lot of flexibility. Basically my point is that we don't need to compromise the encoding to make it fit the compiler's basic view of unification, even assuming that basic view of unification is insufficient.

And I don't want an interpreter with a bunch of casts in it.

No one wants that, though ultimately, if it's the last piece of fruit left on the vine, and we can measure that a castless interpreter is causing problems, it's still internal code. I can tell you which side of that debate I will come down on, if it happens.

But I don't think it's going to happen. You can go a heck of a long way using type-level extensible resolution.

It may be an ugly encoding

Do not underestimate the cost of this ugliness. I'm not the only one who has avoided contributing to the major parts of fs2, and not for lack of time investment in trying to understand it! I can think of at least two other Scala developers just off-hand who are in the same boat. These are real contributors who would have contributed usefully in some fashion who are not, solely due to the encoding. This point remains regardless of whether or not the current encoding can be made fast (which we have by no means precluded).

There also seems to be a lot of low-hanging fruit

+1 And I think we should absolutely see how far the low-hanging stuff can take us before we do anything drastic.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-262629220, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQkBwAFLWuyxrxRfZ1x-QgRT8EWkuks5rBKn4gaJpZM4K2xcs .

djspiewak commented 7 years ago

@pchiusano Oh, btw, I forgot to mention…  I took a crack myself at doing the optimization for pure code. One of the things I noticed is that your proposal will slightly change the semantics for stream interruption (the types catch this). If I understand things correctly (which is a big "if"), it is currently possible for a stream to be interrupted in the middle of a chain of pure operations. Under your proposal, this will become impossible. I'm concerned about what implications that will have on infinite pure streams.

pchiusano commented 7 years ago

Wait you implemented it already? Link to code?

Might be as simple as checking interruption signal every N pure steps. N could be large. You just want to prevent zombie process from running indefinitely. I'll have to think about whether there are any cases where this would hold up closing a stream... I'd care more about latency there than I do about the CPU heating up a bit more than necessary.

On Wed, Nov 23, 2016 at 4:22 PM Daniel Spiewak notifications@github.com wrote:

@pchiusano https://github.com/pchiusano Oh, btw, I forgot to mention… I took a crack myself at doing the optimization for pure code. One of the things I noticed is that your proposal will slightly change the semantics for stream interruption (the types catch this). If I understand things correctly (which is a big "if"), it is currently possible for a stream to be interrupted in the middle of a chain of pure operations. Under your proposal, this will become impossible. I'm concerned about what implications that will have on infinite pure streams.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-262633042, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQnVGjz7RwmE4tParDx3dqKh21taSks5rBK6bgaJpZM4K2xcs .

djspiewak commented 7 years ago

@pchiusano No, I tried to implement it. :-) Didn't finish. Got far enough to realize a) the interruption problem, and b) the encoding still makes me sad.

Might be as simple as checking interruption signal every N pure steps. N could be large. You just want to prevent zombie process from running indefinitely. I'll have to think about whether there are any cases where this would hold up closing a stream... I'd care more about latency there than I do about the CPU heating up a bit more than necessary.

That's probably the answer, and there's nothing in the types which precludes this (interruption checking just becomes an artificial effect that doesn't "need" to be injected). I do care about the CPU heating up though, especially if it's going to literally heat up forever. I can't see how this wouldn't prevent closure, but then my understanding of the encoding is not 100%.

pchlupacek commented 7 years ago

I had spent last few days to look after some characteristics of the fs2._ in regards to (near) real time data processing.

I had simple scenario

Now I wanted to get answer on how much streams I can run in parallel before the processing time of single sample will exceed ~20ms.

I created four tests

  1. imperative implementation with scheduler (baseline)
  2. fs2.Task - only (to see overhead of task)
  3. fs2.Stream with time.sleep, run by fs2.Task in parallel
  4. fs2.Stream with time.sleep, run by concurrent.join

I was expecting not so great fs2 performance, but I must say I was kind of shocked by overhead fs2 is introducing nowadays.

The results are here : https://gist.github.com/pchlupacek/a2adba7bcbc35cf805915d62d07775ca The code is in here : https://gist.github.com/pchlupacek/03139e83dc47c51fd58f39b0c6b8b40a Tests were performed on 2014 MB Pro

As you may see the baseline for the test (imperative solution) was able to process these samples in expected ~2s.

The fs2.Task performed in pair with imperative solution, as sort of expected, with saturation starting to appear around 10k concurrent tasks.

However fs2.Stream showed drastic perfromance degradation from 250+ concurrent streams. To have it worse, introducing concurrent.join nearly doubled execution times at 1k+ concurrent streams.

I think the costs of interpreting the stream is now extraordinary high, introducing the overhead exceeding few thousands pct in some scenarios.

Maybe I am reading something bad or misinterpreting, but if not, I think we should look what can be done on performance field before releasing 1.0. As of today, any near-real-time processing is way far from being realistic.

I would like to hear your opinion @djspiewak @pchiusano @mpilquist. What do you think chances are improving here?

pchiusano commented 7 years ago

So in general I think yeah let's try to improve performance as much as reasonably possible. I have a todo to investigate some of the low hanging fruit (like optimizing for pure segments of the stream).

In this specific case I suspect something else is going on though - obvious thing is can we make sure the streams have the same number of threads to run on? You need a pool as big as the amount of parallel tasks.

The other thing I immediately thought of was contention on some of the async primitives used by fs2. If there is contention it does retries. Might want to find ways to reduce contention in implementations...

On Fri, Dec 23, 2016 at 5:15 AM Pavel Chlupacek notifications@github.com wrote:

I had spent last few days to look after some characteristics of the fs2._ in regards to (near) real time data processing.

I had simple scenario

  • each stream was continuos data with sample frequency of 20 ms, with 100 samples each
  • I have simulated some computation per sample.

Now I wanted to get answer on how much streams I can run in parallel before the processing time of single sample will exceed ~20ms.

I created four tests

  1. imperative implementation with scheduler (baseline)
  2. fs2.Task - only (to see overhead of task)
  3. fs2.Stream with time.sleep, run by fs2.Task in parallel
  4. fs2.Stream with time.sleep, run by concurrent.join

I was expecting not so great fs2 performance, but I must say I was kind of shocked by overhead fs2 is introducing nowadays.

The results are here : https://gist.github.com/pchlupacek/a2adba7bcbc35cf805915d62d07775ca The code is in here : https://gist.github.com/pchlupacek/03139e83dc47c51fd58f39b0c6b8b40a Tests were performed on 2014 MB Pro

As you may see the baseline for the test (imperative solution) was able to process these samples in expected ~2s.

The fs2.Task performed in pair with imperative solution, as sort of expected, with saturation starting to appear around 10k concurrent tasks.

However fs2.Stream showed drastic perfromance degradation from 250+ concurrent streams. To have it worse, introducing concurrent.join nearly doubled execution times at 1k+ concurrent streams.

I think the costs of interpreting the stream is now extraordinary high, introducing the overhead exceeding few thousands pct in some scenarios.

Maybe I am reading something bad or misinterpreting, but if not, I think we should look what can be done on performance field before releasing 1.0. As of today, any near-real-time processing is way far from being realistic.

I would like to hear your opinion @djspiewak https://github.com/djspiewak @pchiusano https://github.com/pchiusano @mpilquist https://github.com/mpilquist. What do you think chances are improving here?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-268968163, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQgfvcv3dgO57NT09h5mSRvVV8ndmks5rK58pgaJpZM4K2xcs .

pchiusano commented 7 years ago

As a quick test of the contention theory, just spin up N threads to increment/decrement an fs2 Semaphore and compare that to time needed for N threads to do nothing. I would guess that as N gets large contention becomes more of a bottleneck.

If this is a problem I think it is very fixable. Think like LongAdder vs AtomicLong. On Fri, Dec 23, 2016 at 5:38 PM Paul Chiusano paul.chiusano@gmail.com wrote:

So in general I think yeah let's try to improve performance as much as reasonably possible. I have a todo to investigate some of the low hanging fruit (like optimizing for pure segments of the stream).

In this specific case I suspect something else is going on though - obvious thing is can we make sure the streams have the same number of threads to run on? You need a pool as big as the amount of parallel tasks.

The other thing I immediately thought of was contention on some of the async primitives used by fs2. If there is contention it does retries. Might want to find ways to reduce contention in implementations...

On Fri, Dec 23, 2016 at 5:15 AM Pavel Chlupacek notifications@github.com wrote:

I had spent last few days to look after some characteristics of the fs2._ in regards to (near) real time data processing.

I had simple scenario

  • each stream was continuos data with sample frequency of 20 ms, with 100 samples each
  • I have simulated some computation per sample.

Now I wanted to get answer on how much streams I can run in parallel before the processing time of single sample will exceed ~20ms.

I created four tests

  1. imperative implementation with scheduler (baseline)
  2. fs2.Task - only (to see overhead of task)
  3. fs2.Stream with time.sleep, run by fs2.Task in parallel
  4. fs2.Stream with time.sleep, run by concurrent.join

I was expecting not so great fs2 performance, but I must say I was kind of shocked by overhead fs2 is introducing nowadays.

The results are here : https://gist.github.com/pchlupacek/a2adba7bcbc35cf805915d62d07775ca The code is in here : https://gist.github.com/pchlupacek/03139e83dc47c51fd58f39b0c6b8b40a Tests were performed on 2014 MB Pro

As you may see the baseline for the test (imperative solution) was able to process these samples in expected ~2s.

The fs2.Task performed in pair with imperative solution, as sort of expected, with saturation starting to appear around 10k concurrent tasks.

However fs2.Stream showed drastic perfromance degradation from 250+ concurrent streams. To have it worse, introducing concurrent.join nearly doubled execution times at 1k+ concurrent streams.

I think the costs of interpreting the stream is now extraordinary high, introducing the overhead exceeding few thousands pct in some scenarios.

Maybe I am reading something bad or misinterpreting, but if not, I think we should look what can be done on performance field before releasing 1.0. As of today, any near-real-time processing is way far from being realistic.

I would like to hear your opinion @djspiewak https://github.com/djspiewak @pchiusano https://github.com/pchiusano @mpilquist https://github.com/mpilquist. What do you think chances are improving here?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-268968163, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQgfvcv3dgO57NT09h5mSRvVV8ndmks5rK58pgaJpZM4K2xcs .

djspiewak commented 7 years ago

@pchlupacek I think the odds of improving are very good. The odds of improving significantly in terms of contention also seem very good, though it might take some seriously looking at Task.Ref. The odds of improving significantly in terms of throughput don't seem great.

Don't get me wrong, I think that @pchiusano's pure segments idea is going to be a win. I also think that there are probably several unexplored areas where we can get a lot more performance. Relatively speaking, comparatively little effort has gone into this area, so I'm sure there's more to be done. But I don't see orders of magnitude improvement in performance. The interpreter will likely always remain quite chunky (no pun intended).

Regarding concurrent.join, I would guess in broad strokes that the current implementation is quite a bit worse than the old njoin implementation in scalaz-stream. The new implementation is more elegant, and has better fairness and backpressure properties, but it also is a lot stricter about how and when streams are allowed to run (this is part of how it improves in the first two areas), and that strictness results in much more back-and-forth through the concurrency primitives. So we may need to make some changes there.

All in all, I think there's still plenty of room for improvement, likely by several constant factors in throughput and perhaps an order of magnitude (or more) in contention. The overall interpreter is probably well within an order of magnitude of its optimum though.

djspiewak commented 7 years ago

With that said… I do think it's important to keep some of this in context. Questions about contention and concurrency primitives aside, the straight-line performance of the fs2 interpreter now appears to be basically comparable to that of scalaz-stream. Given fs2's native chunking, it seems likely that in many or most realistic workloads, fs2's interpreter will be substantially faster and impose less overhead than what we're accustomed to from scalaz-stream.

Pull-based streaming just isn't going to be that efficient. I don't believe it to be worse, since the properties you can achieve with a primary-pull model are extremely desirable and useful, but the overhead will always be there.

pchlupacek commented 7 years ago

@djspiewak @pchiusano thanks for comments and suggestions. I think if we reduce contention, that could help. What worried me most was non-linear cost of adding stream, that points surely to contention, thanks.

Re the performance, I think cost of anything up to 100% and linear is ok for me. Anything above that I consider problematic. If you look in results we are pretty ok up to 500 streams, after that fs2 is just wild.

So I think we can have quick wins on concurrent.join and Task.Ref as you suggested. I shall be able to look on these in few days. I am in sort of inclining to re-implement concurrent.join to just queue like it was in 0.8 that shall reduce overhead imho, but will see.

I maybe add these performance tests as PR to benchmarks what do you think?

pchlupacek commented 7 years ago

Ok @pchiusano @djspiewak on the other thought I think the contention on Ref shouldn't be the cause here. The only asynchronous part there is the time.sleep that really shouldn't have any contention whatsoever as it is running in single given stream. I would assume there is chance to bring the concurrent.join to be in pair with asynchronously running stream by reimplementing the concurrent.join, but still I have no good answer why this (https://gist.github.com/pchlupacek/03139e83dc47c51fd58f39b0c6b8b40a#file-realtimebenchmark-scala-L78) is behaving with such a poor performance. Note the streams are runing completely in parallel by forking just that task that runs them.

I also think the scenarios are quite relevant and are very close to the production usage.

pchiusano commented 7 years ago

Yeah that is weird. Did you make sure that the stuff after the sleep is running on its own thread? If it were running on the scheduler thread I could see that causing problems.

Also I'm not sure how the scheduler works but it also probably requires sufficient threads... On Sat, Dec 24, 2016 at 8:46 AM Pavel Chlupacek notifications@github.com wrote:

Ok @pchiusano https://github.com/pchiusano @djspiewak https://github.com/djspiewak on the other thought I think the contention on Ref shouldn't be the cause here. The only asynchronous part there is the time.sleep that really shouldn't have any contention whatsoever as it is running in single given stream. I would assume there is chance to bring the concurrent.join to be in pair with asynchronously running stream by reimplementing the concurrent.join, but still I have no good answer why this ( https://gist.github.com/pchlupacek/03139e83dc47c51fd58f39b0c6b8b40a#file-realtimebenchmark-scala-L78) is behaving with such a poor performance. Note the streams are runing completely in parallel by forking just that task that runs them.

I also think the scenarios are quite relevant and are very close to the production usage.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-269085052, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQutuj1c8aDJMXpTF9LXFMOMykm3eks5rLSI0gaJpZM4K2xcs .

djspiewak commented 7 years ago

I thought about thread continuation as a possible problem, but remember that Task.async now thread-shifts by default, so that shouldn't be an issue. Scheduler works by running one or more threads that are slept for the minimum interval before the next required tick, depending on the current intersecting schedules. Once the thread awakes, it checks for the callbacks it needs to run, dispatches them, and then sleeps again. It gets a little more complicated than that with fairness and algorithms to ensure thread utilization on a larger pool with higher contention, but that's the general idea.

Critically though, that notification comes back to us and we immediately shift back to the main pool. So our scheduler pool only needs a single thread and should never be contended.

With that said, 20.millis is a very short time period. And since it's fixed between the streams, it's possible that the sleeps are getting shorter and shorter to the point where the explicit lack of precise bounding on sleep timing (it can go nondeterministically over by small amounts, and under by large amounts) is starting to dominate.

pchiusano commented 7 years ago

Ok I'd still like to verify, by like printing the thread name or something, just to rule it out.

Just curious - What if a 20ms sleep request comes in, the pool says great, starts sleeping for 20ms, then after 5ms, another 3ms sleep comes in? Will that second sleep spin up a new thread? Or will it be blocked until 20ms? In this case all the sleeps are for 20ms so I don't think would matter but just wondering.

The other thing it could be is the fs2 thread pool used. Is it a fixed number of threads? If so then that could be bottleneck.

The overhead of just stepping fs2 itself doesn't seem like it should be a major factor here. On Sat, Dec 24, 2016 at 2:17 PM Daniel Spiewak notifications@github.com wrote:

I thought about thread continuation as a possible problem, but remember that Task.async now thread-shifts by default, so that shouldn't be an issue. Scheduler works by running one or more threads that are slept for the minimum interval before the next required tick, depending on the current intersecting schedules. Once the thread awakes, it checks for the callbacks it needs to run, dispatches them, and then sleeps again. It gets a little more complicated than that with fairness and algorithms to ensure thread utilization on a larger pool with higher contention, but that's the general idea.

Critically though, that notification comes back to us and we immediately shift back to the main pool. So our scheduler pool only needs a single thread and should never be contended.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-269096947, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQmnkQUH_OjjhktM1Wd9ZdinTQ98bks5rLW_CgaJpZM4K2xcs .

djspiewak commented 7 years ago

Just curious - What if a 20ms sleep request comes in, the pool says great, starts sleeping for 20ms, then after 5ms, another 3ms sleep comes in? Will that second sleep spin up a new thread? Or will it be blocked until 20ms? In this case all the sleeps are for 20ms so I don't think would matter but just wondering.

That's a good question. Reading the sources briefly, it looks like this is where the heavy-lifting is done: http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/DelayQueue.java#DelayQueue

Edit It looks like it's relying on interrupts. Specifically here

pchlupacek commented 7 years ago

@djspiewak , @pchiusano thanks for suggestions.

Just give you an brief idea what 20ms is. This is sort of by default sampling interval for anything working with voice samples (not music). So every 20ms I simulate new sample by scheduling emit of new element and performing simple mathematical operation. My believe is that these 20ms are always forked in streams (in fact there would be rescheduled on default thread from the scheduler thread).

@pchiusano my understanding how the scheduler works is that 20ms time is measured from the next schedule. So I spawn it immediately after computation is done. I agree that to simulate true real-time I will need to assure that I get sample every 20ms. But in this scenario, this will just put more heats on the DelayedQueue imho, having zero or minimal impact on the tests.

To make sure that I have no JVM infrastructure related issues in play I did following:

All the tests using thread pool form benchmarks, that means num of cores for both Scheduler and for Async.

I made a branch here with tests so you can see : https://github.com/functional-streams-for-scala/fs2/tree/bench/concurrent

As regards the wonder if I run this on all threads, I believe so, because it really heats up all my cores, so I have no cycles idle. In fact when I remove Blackhole form the tests to simulate some computation, and just have no-op instead of them the test's won't change drastically, still almost same numbers. I also can confirm this by running tests in profiler and I see all Async threads are used.

@djspiewak I agree with your reasoning that the non-linear performance is the reason that we are not able to process stream overhead in 20ms, and as such it starts to queue up so we see that poor performance. But that is exactly the problem, the overhead of Stream is astronomical now, anything above 250 concurrent streams is not possible. that gives you approximately 1/10th of throughput compare to solution with Task or with imperative code, and I simply think that overhead is unacceptable.

For the sake of clarity I rerun this with 200ms and 10 samples, and still the performance starts to degrade around 2k concurrent streams, whereas baseline is still ok about 100k+. So this shows that likely more streams is involved the larger overhead is incurred, what is inline with my expectation.

djspiewak commented 7 years ago

As regards the wonder if I run this on all threads, I believe so, because it really heats up all my cores, so I have no cycles idle. In fact when I remove Blackhole form the tests to simulate some computation, and just have no-op instead of them the test's won't change drastically, still almost same numbers. I also can confirm this by running tests in profiler and I see all Async threads are used.

This is really interesting, since it indicates that the raw overhead of the framework is dominating even over Blackhole, which is sort of insane.

For the sake of clarity I rerun this with 200ms and 10 samples, and still the performance starts to degrade around 2k concurrent streams, whereas baseline is still ok about 100k+. So this shows that likely more streams is involved the larger overhead is incurred, what is inline with my expectation.

Great test. This and your raw-JVM test definitively rules out ScheduledExecutorService as the source of the contention.

Sifting this down, it appears we have the following conclusions (relative to your test):

Is that more or less a fair summary?

The first point seems intuitive enough, and I think it shouldn't be a problem to find significant wins in that code. The second point though is immensely confusing. I can't think of what in the interpreter would be contending with itself in completely independent concurrent stream runs (i.e. not concurrent.join, but literally unsafeRunAsync). The only resource two independent streams in the same JVM would share is the heap, but for heap allocation alone to be the limiting factor at only 250 concurrent streams would require legitimately astronomical allocation rates.

I think we should probably do some more detailed profiling of the interpreter than what JFR can spit out. Do we have a YourKit license handy? If not, it should be easy enough to snag one for use on this project. They're very quick to hand out licenses for open source projects, usually in exchange for a quick "profiled and tuned using YourKit" at the bottom of the readme, or something.

pchlupacek commented 7 years ago

@djspiewak I agree with your conclusions. Some more remmarks there:

I run the benchmarks in 4G JVM with G1 turned on 200ms GC delay. Even-though the spike of eden was high (as expected for functional style) the GC pressure was mediocre to low. in fact the whole GC time was less then few ms per each invocation. So I would rule out any allocation cost as well.

The JVM runtime overhead was about 5-20% of the whole time. So still this doesn't explain the rest. Java app was always taking the whole remaining of the CPU.

My bets are the issue is in execution time itself. I saw stacks of few hundreds of depth while interpreting the simple stream in benchmark, which is to me really too much.

I think we shall perhaps apply for YourKit. This will give us more relevant diagnostics, that we really need to. I just do that right away.

djspiewak commented 7 years ago

@pchlupacek Good to hear that heap pressure wasn't as much of a problem as I suspected. Allocations are usually harder to remove than stack frames.

But with that said, massive stack depth doesn't really explain why stream interpretation would have mutual interference.

pchlupacek commented 7 years ago

@djspiewak exactly. My only bet is we do still too much operations to produce single value than we are supposed to do.

pchiusano commented 7 years ago

Something I just noticed is that time.sleep is defined as

awakeEvery(d).take(1).drain

where awakeEvery is implemented via bracket and a Signal... which might be somewhat high overhead. I feel like a more direct implementation using Scheduler and Async.async would be simpler and faster.

On Mon, Dec 26, 2016 at 2:21 AM, Pavel Chlupacek notifications@github.com wrote:

@djspiewak https://github.com/djspiewak exactly. My only bet is we do still too much operations to produce single value than we are supposed to do.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-269175702, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQlFMqcB9pa0z4iU88egWAIYE68Hfks5rL2sDgaJpZM4K2xcs .

djspiewak commented 7 years ago

@pchiusano That's actually been the case for a long time. I think every project I've ever worked on that uses scalaz-stream has a replacement for time.sleep defined somewhere.

djspiewak commented 7 years ago

@pchlupacek Can you try again with #786?

pchlupacek commented 7 years ago

@djspiewak @pchiusano actually helped a bit :-). That means we have to perhaps look more in depth on the cost of individual combinators?

before :

[info] # Run complete. Total time: 00:03:14
[info] 
[info] Benchmark                           Mode  Cnt   Score   Error  Units
[info] RealTimeBenchmark.streamAsync_1     avgt        2.245           s/op
[info] RealTimeBenchmark.streamAsync_100   avgt        2.200           s/op
[info] RealTimeBenchmark.streamAsync_1000  avgt       11.408           s/op
[info] RealTimeBenchmark.streamAsync_2000  avgt       15.539           s/op
[info] RealTimeBenchmark.streamAsync_250   avgt        3.093           s/op
[info] RealTimeBenchmark.streamAsync_4     avgt        2.250           s/op
[info] RealTimeBenchmark.streamAsync_500   avgt        4.881           s/op
[info] RealTimeBenchmark.streamAsync_5000  avgt       42.160           s/op
[success] Total time: 203 s, completed Dec 25, 2016 9:11:17 AM

after

[info] Benchmark                           Mode  Cnt  Score   Error  Units
[info] RealTimeBenchmark.streamAsync_1     avgt       2.381           s/op
[info] RealTimeBenchmark.streamAsync_100   avgt       2.377           s/op
[info] RealTimeBenchmark.streamAsync_1000  avgt       2.286           s/op
[info] RealTimeBenchmark.streamAsync_2000  avgt       4.062           s/op
[info] RealTimeBenchmark.streamAsync_250   avgt       2.359           s/op
[info] RealTimeBenchmark.streamAsync_4     avgt       2.423           s/op
[info] RealTimeBenchmark.streamAsync_500   avgt       2.300           s/op
[info] RealTimeBenchmark.streamAsync_5000  avgt       7.844           s/op

also as side note to #786, I think the implementation of sleep shall spawn the result on the other thread than the scheduler, ideally borrowed from Async[F]

pchiusano commented 7 years ago

I think #786 is okay - the callback just sets a Ref in the implementation of async, so rest of computation should happen elsewhere. On Mon, Dec 26, 2016 at 11:54 PM Pavel Chlupacek notifications@github.com wrote:

@djspiewak https://github.com/djspiewak @pchiusano https://github.com/pchiusano didn't help too much, in contrary is a bit slower.

before :

[info] # Run complete. Total time: 00:01:08 [info] [info] Benchmark Mode Cnt Score Error Units [info] RealTimeBenchmark.streamAsync_1 avgt 2.034 s/op [info] RealTimeBenchmark.streamAsync_100 avgt 2.075 s/op [info] RealTimeBenchmark.streamAsync_1000 avgt 2.269 s/op [info] RealTimeBenchmark.streamAsync_2000 avgt 4.020 s/op [info] RealTimeBenchmark.streamAsync_250 avgt 2.076 s/op [info] RealTimeBenchmark.streamAsync_4 avgt 2.037 s/op [info] RealTimeBenchmark.streamAsync_500 avgt 2.101 s/op [info] RealTimeBenchmark.streamAsync_5000 avgt 5.684 s/op [success] Total time: 72 s, completed Dec 26, 2016 9:19:20 AM

after

[info] Benchmark Mode Cnt Score Error Units [info] RealTimeBenchmark.streamAsync_1 avgt 2.381 s/op [info] RealTimeBenchmark.streamAsync_100 avgt 2.377 s/op [info] RealTimeBenchmark.streamAsync_1000 avgt 2.286 s/op [info] RealTimeBenchmark.streamAsync_2000 avgt 4.062 s/op [info] RealTimeBenchmark.streamAsync_250 avgt 2.359 s/op [info] RealTimeBenchmark.streamAsync_4 avgt 2.423 s/op [info] RealTimeBenchmark.streamAsync_500 avgt 2.300 s/op [info] RealTimeBenchmark.streamAsync_5000 avgt 7.844 s/op

also as side note to #786 https://github.com/functional-streams-for-scala/fs2/pull/786, I think the implementation of sleep shall spawn the result on the other thread that the scheduler, ideally borrowed from Async[F]

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/functional-streams-for-scala/fs2/issues/768#issuecomment-269269778, or mute the thread https://github.com/notifications/unsubscribe-auth/AAArQlNlULV6WJDvJXDRKO1RZ30XSme5ks5rMJodgaJpZM4K2xcs .

djspiewak commented 7 years ago

@pchlupacek That's still a very small change. Given the fluctuation between different test runs, it's basically rounding-error within the other overhead.

pchlupacek commented 7 years ago

Just take a look on github page. I posted originaly incorrect former results ( that were 10x200ms ones). In fact the increase was significant.

Sent from my iPhone

On 27. 12. 2016, at 22:53, Daniel Spiewak notifications@github.com wrote:

@pchlupacek That's still a very small change. Given the fluctuation between different test runs, it's basically rounding-error within the other overhead.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

djspiewak commented 7 years ago

@pchlupacek Were the 1k, 2k and 5k results from the "before" consistent?

scottcarey commented 7 years ago

FWIW, I did a major overhaul of an app using scalaz-stream to make its Process flow more chunky and have a less complicated merge/wye graph, and the result was MASSIVE performance improvement -- about 10x less CPU. This was entangled with also making many individual Tasks chunkier (fewer Tasks are constructed by flatMaping little ones together) so the improvement is partly due to that.

I am gun-shy on attempting any migration to fs2 given the performance sensitivity of my use cases. This includes upgrading to whatever http4s version contains its update to fs2.

At this time, I am keeping my eye on Monix. Do note that a pull-model flavor of tools are being developed there as well, though there is no planned merge/wye/tee type stuff so it may not ever directly compete.

mpilquist commented 7 years ago

Re; performance of awakeEvery, #924 adds a light weight alternative called awakeDelay along with lower level yet more commonly needed fixedRate and fixedDelay methods.

Any objections to closing this issue since it is mostly overcome by the 0.10 design? We could always open a new issue with any 0.10 performance findings.

pchlupacek commented 7 years ago

@mpilquist +1 to close