cujojs / most

Ultra-high performance reactive programming
MIT License
3.49k stars 206 forks source link

How to natively implement partition #393

Closed akloeber closed 7 years ago

akloeber commented 7 years ago

I need to process data in chunks and therefore I need to partition the values into buckets (like RxJS's Observable#bufferWithCount or Highland's Stream#batch). I did not find a suitable function to implement that in Most.js directly (most#reduce reduces all values into a single value and most#map produces an equal amount of values) but instead I had to integrate with transducers-js.partitionAll via most#transduce(transducers.partitionAll(CHUNK_SIZE)).

What would be the direct approach against the Most.js API without using a third-party transducer lib. Is this only possible via most#transduce?

TylorS commented 7 years ago

Hello @akloeber, thank you for your interest in most.js. To better understand how it is that we can help, would you be able to clarify what exactly it is you'd like to have done. Is helping you with creating an equivalent of sorts to bufferWithCount enough?

If you need more help than that, with creating an entire partition implementation, it'd be very helpful if you could provide more information about how you'd like for this operation to work. For instance, given a stream most.from([1, 2, 3, 4, 5, 6, 7]) what output would you like to see? If you're familiar with creating diagrams with the notation we use in our documentation, that would also be very helpful!

akloeber commented 7 years ago

Hello @TylorS, thanks for picking this up.

Let me summarize my real-world use case:
I have a large amount of in-memory numeric data stored in a tree-like structure that offers an ES6 Iterable interface. This data should be stored via HTTP requests in a time series database (InfluxDB). A constraint of InfluxDB is that the amount of values written in a single request should not exceed 2000, so the data has to transmitted in chunks of 2000s. For the "chunking/partitioning" I need a tranformation that collects up to n values and then emits an array of those n values downstream which is what RxJS's Observable#buffer[With]Count, Highland's Stream#batch or Kefir's Observer#bufferWithCount do.

I started evaluating different stream libraries to support that use case where the Iterable is the producer and an already available InfluxConnector that actually executes the HTTP requests via InfluxConnector#write(chunk) : Promise is the consumer. I implemented this in Highland, Kefir, RxJS 4 and 5 as well as in Most.js where the (simplified) version looks like that:

most.from(ITERABLE)
  .transduce(transducers.partitionAll(CHUNK_SIZE))
  .concatMap(chunk => most.fromPromise(influxConnectore.write(chunk)))
  .drain()

In the first place I wanted to get rid of the extra dependency on transducers-js and implement it directly against Most's API which is what this issue is about.

In the next step (different issue or maybe directly via gitter) I want to dig deeper into performance characteristics as in the initial implementation Most.js was the slowest (which is strange when comparing it to the numbers provided by Most.js itself):

Highland [back pressure] x 53.44 ops/sec ±3.28% (61 runs sampled)
RxJS 4 [flow control on stream] x 15.93 ops/sec ±5.93% (55 runs sampled)
RxJS 4 [flow control on source] x 49.09 ops/sec ±3.88% (71 runs sampled)
RxJS 5 [no flow control] x 203 ops/sec ±3.01% (72 runs sampled)
RxJS 5 [flow control on source] x 169 ops/sec ±2.70% (68 runs sampled)
Kefir [flow control on source] x 132 ops/sec ±2.68% (72 runs sampled)
Most.js [no flow control] x 10.15 ops/sec ±3.04% (50 runs sampled)
Most.js [flow control on source] x 67.02 ops/sec ±4.20% (62 runs sampled)

(The above snippet is represented by Most.js [no flow control], ITERABLE provides 200,000 values, CHUNK_SIZE is 2000)

davidchase commented 7 years ago

@akloeber we just released https://github.com/mostjs-community/most-chunksOf check it out maybe its what you need?

instead of using observe you can use reduce to achieve a single list as produced by partitionAll

let us know if this helps

briancavalier commented 7 years ago

@akloeber most-chunksOf is a great simple example of a new custom "native" mostjs combinator in a pretty small amount of code. Well done, @davidchase. Because it uses most's native architecture rather than the 3rd party partitionAll, my guess is that it will perform significantly better.

It'd be cool to hear how it works for you.

akloeber commented 7 years ago

@davidchase thanks a lot, that was exactly what I had been looking for. I couldn't implement it myself because I did know about this internal most API.

Unfortunately the performance numbers did only improve in the benchmark with flow control (i.e. the Iterable is not passed directly to most.from but a Floatable wrapper is used instead that allows requesting items explicitely as they are consumed downstream. Upon request the Floatable pushes the requested number of items into the stream via most-subject).

These are the current numbers:

$ node test/perf.test.js 
Highland [back pressure] x 75.62 ops/sec ±1.41% (70 runs sampled)
RxJS 4 [flow control on stream] x 18.59 ops/sec ±3.06% (67 runs sampled)
RxJS 4 [flow control on source] x 75.26 ops/sec ±2.22% (70 runs sampled)
RxJS 5 [no flow control] x 312 ops/sec ±1.12% (82 runs sampled)
RxJS 5 [flow control on source] x 258 ops/sec ±1.35% (78 runs sampled)
Kefir [flow control on source] x 166 ops/sec ±1.58% (70 runs sampled)
Most.js [no flow control] x 9.13 ops/sec ±1.19% (47 runs sampled)
Most.js [flow control on source] x 113 ops/sec ±1.86% (74 runs sampled)
Fastest is 'RxJS 5 [no flow control]'

(node v4.4.7, Mac OS 10.12.2, Quadcore Intel Core i5 at 2.9 GHz)

Note: The overall numbers are higher than the ones given in https://github.com/cujojs/most/issues/393#issuecomment-271139677 because the previous beanchmark was executed on a less performant MacBook. Besides this they are reproducable.

You can have a look into the sources at https://github.com/akloeber/stream-eval.

briancavalier commented 7 years ago

Hi @akloeber. Thanks for opening this issue, and for posting a link to your perf tests. The test helped us discover some old code that needs to be updated.

See #395 for more detail. Basically, fromIterable(), which is called by from(), was originally written at a time when we believed that being concurrency-friendly was important for (potentially infinite) iterables, and without as much concern for performance. So, we've simplified it to the obvious implementation, which should cover the vast majority of iterable use cases. We'll release an update soon.

There are a couple other issues with the test that I'd like to point out.

First, the tests seem to be designed to require imperative event pushing, for example, by using an rxjs-like subject, such as most-subject. Most encourages declarative approaches over imperative, and subjects are intentionally not part of the core package. Most-subject is a community project and thus has different goals, one of which is not performance. As I understand it, most-subject's goals are around correctness and avoiding race conditions that can be common with subjects.

Second, the test is using concatMap + fromPromise, which is not an idiomatic way to achieve the apparent desired test behavior using most's API. The idiomatic approach is map + await. Note: The stream.awaitPromises() alias isn't released yet (it will be released along with the fromIterable update mentioned above), so until then, use stream.await().

As a slight aside, the most tests use concatMap, which concatenates, while the other libs use the respective merging operation. For example, the rxjs 5 tests use mergeMap, which is distinctly different from concatMap. I don't believe this is a significant source of difference in test results, and again, neither concatMap nor chain appear to be an idiomatic use of most's API for this case. However, the difference indicates that there may be still more differences in the tests that make them apples-to-oranges comparisons.

We appreciate all the info and the perf test that helped us re-discover and update this old fromIterable code. I'll ping this issue again once that's been released.

briancavalier commented 7 years ago

@akloeber We just released most 1.2.0 with the update to from/fromIterable.

akloeber commented 7 years ago

@briancavalier Thanks a lot for the update of the fromIterable implementation. After integration of the new version the benchmark of the direct version (no flow control) shows a throughput of over 300 ops/sec which is around 30 times faster than before.

Regarding your doubt about performance of most-subject: I implemented a new Flowable which can be passed directly to most.from as an Observable but has a Flowable#emit(n) method for flow control. In fact it increases throughput in comparison to the approach with most-subject.

Furthermore, according to your suggestion, I switched to map + await instead of concatMap which again improved the throughput.

Here are the current numbers:

$ node test/perf.test.js
Highland [back pressure] x 56.61 ops/sec ±1.83% (71 runs sampled)
RxJS 4 [flow control on stream] x 18.04 ops/sec ±2.70% (82 runs sampled)
RxJS 4 [flow control on source with Flowable] x 76.74 ops/sec ±1.22% (76 runs sampled)
RxJS 5 [no flow control] x 316 ops/sec ±0.92% (83 runs sampled)
RxJS 5 [flow control on source with Flowable] x 288 ops/sec ±1.34% (81 runs sampled)
RxJS 5 [flow control on source with NEW Flowable] x 254 ops/sec ±1.04% (81 runs sampled)
Kefir [flow control on source with Flowable] x 242 ops/sec ±1.71% (84 runs sampled)
Kefir [flow control on source with NEW Flowable] x 206 ops/sec ±2.30% (78 runs sampled)
Most.js [no flow control] x 312 ops/sec ±2.01% (83 runs sampled)
Most.js [flow control on source with most-subject] x 126 ops/sec ±1.60% (81 runs sampled)
Most.js [flow control on source with most-subject and NEW Flowable] x 118 ops/sec ±1.61% (77 runs sampled)
Most.js [flow control on source with NEW Flowable] x 192 ops/sec ±1.59% (83 runs sampled)
Most.js [flow control on source with NEW Flowable and await] x 202 ops/sec ±1.42% (82 runs sampled)
Fastest is 'RxJS 5 [no flow control]'

(node v4.4.7, Mac OS 10.12.2, Quadcore Intel Core i5 at 2.9 GHz)