apache / pekko

Build highly concurrent, distributed, and resilient message-driven applications using Java/Scala
https://pekko.apache.org/
Apache License 2.0
1.17k stars 139 forks source link

Discussion about restoring statefulMapConcat #1141

Closed JD557 closed 6 months ago

JD557 commented 6 months ago

I would like to open the discussion to restore statefulMapConcat, as a follow up to it's deprecation in https://github.com/apache/incubator-pekko/issues/601 and https://github.com/apache/incubator-pekko-http/issues/395#issuecomment-1916722960

I understand that it had problems regarding the upstreamFinished, but the statefulMap+mapConcat alternative does not seem a viable migration considering the extra allocations.

I believe this issue was also identified for pekko-http in https://github.com/apache/incubator-pekko-http/issues/395, which lead to https://github.com/apache/incubator-pekko-http/pull/462.

However, I find it odd that the fix is in pekko-http. It seems to me that something like this should be available to all pekko-streams users and be one of the recommended migrations.

Also, I don't find the behavior of statefulMapConcat regarding upstreamFinished that surprising. I think instead of a deprecation it would be enough to add a recommendation in the documentation to use statefulMap + mapConcat if one needs to keep the state when the upstream finishes.

pjfanning commented 6 months ago

@JD557 We'll discuss it but be aware that we will not remove the statefulMapConcat method any time soon any way.

He-Pin commented 6 months ago

I know the problem, the only problem of the current implementation is the extra allocation of tuple, which is the same as mapAccumlate in fs2 and zio.

And after jit, jvm may allocates this object on stack instead of heap.

I would like to add same thing as the pr in http, but can't find a good name for now.

For the performance, in test zipwithindex migration, I saw performance boost.

I agree with you with most part but I don't think we should remove the deprecation.

And the allocation can be removed with a object instead of tuple, which is how the current java stream 's gather is implemented in Jdk22.

JD557 commented 6 months ago

For reference, here are some benchmarks (based on the original ZIO Streams benchmarks): https://gist.github.com/JD557/251467aac30860dca88dc3e697dfb381

You can quickly run them with Scala CLI with scala-cli --power --jmh https://gist.github.com/JD557/251467aac30860dca88dc3e697dfb381

Here I compare an implementation using scan+mapConcat (pekkoScan), statefulMapConcat (pekkoStatefulMapConcat) and statefulMap+mapConcat (pekkoStatefulMapMapConcat)

The tests were run on an Apple M3 Max, using OpenJDK 21.0.2+13-LTS

Results:

Benchmark                                      (chunkSize)  (cols)  (rows)   Mode  Cnt   Score   Error  Units
CSVStreamBenchmarks.pekkoScan                         5000     100     100  thrpt    5  27.785 ± 0.080  ops/s
CSVStreamBenchmarks.pekkoStatefulMapConcat            5000     100     100  thrpt    5  72.419 ± 0.308  ops/s
CSVStreamBenchmarks.pekkoStatefulMapMapConcat         5000     100     100  thrpt    5  42.094 ± 0.744  ops/s
He-Pin commented 6 months ago

I will take care of it this weekend, we will definitely come up with a solution 😀

He-Pin commented 6 months ago

So the quick fix seems just remove the deprecation, and add more detail log in 1.0.x. And will need a new building block for this which will not introduce any allocation.

Otherwise:

  def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] =???

  def statefulMap[T](functionConstructor: () => Out => T): Repr[T] = ???

Will cause much confusing, and does not work too:(

He-Pin commented 6 months ago

@JD557 Hi, the related change has been merged, thanks.