reactor / reactor

Reactor Bill Of Materials (tracking reactor-core, reactor-netty and more)
https://projectreactor.io
Apache License 2.0
3.61k stars 490 forks source link

Fully Support Algorithmic Skeletons / Parallel Patterns #338

Closed sirinath closed 10 years ago

sirinath commented 10 years ago

Hi,

Since you are considering mapreduce (https://github.com/reactor/reactor/issues/91) it might be worth investgating to go beyond this and impliment a full set of Algorithmic Skeletons / Parallel Patterns out of which mapreduce in one pattern.

Suminda

michaelklishin commented 10 years ago

@sirinath can you please be more specific? How full "full support" is and if algorithmic skeletons are just high level algorithms for parallel computation, which ones do you suggest Reactor should support give it has a really small team working on it?

sirinath commented 10 years ago

When I said full support what I meant is not to limit yourself to mapreduce but be open to support other patterns.

To start with it would be good to have what was there in the old Skandium package (https://github.com/mleyton/Skandium). They are fairly simple to implement. Few hundred lines of code the most with documentation comments (see https://github.com/mleyton/Skandium/tree/master/core/src/main/java/cl/niclabs/skandium/skeletons, https://github.com/mleyton/Skandium/tree/master/core/src/main/java/cl/niclabs/skandium/muscles). You could have modified and reused the above code if it was not GPL. But you have to adopt to Reactor use case.

This will be further simplified if you have if you can group and sequence consumers like in Disruptor (https://github.com/LMAX-Exchange/disruptor/wiki/Disruptor-Wizard). Essentially this all that is there to this in different bottles (abstractions) in Parallel Patterns / Algorithmic Skeletons.

May be with a bit of additional work this can be extended to your up and coming streaming API and then add new patterns on and off when time permits.

smaldini commented 10 years ago

I think we will evaluate separately this in the Streams API. We have parallel(), merge(), groupBy(), window() and pretty soon we will have zip(), shrink() (reduce parallelism), and maybe join().

sirinath commented 10 years ago

Looks like Reactor is turning out into a great library!!!

Some frameworks and patterns are covered in the wiki article: http://en.wikipedia.org/wiki/Algorithmic_skeleton

Also Intel Cilk Plus and Intel TBB cover some patters.

sirinath commented 10 years ago

Also if possible since you mentioned joins can you evaluate adding some advance join patterns.

smaldini commented 10 years ago

In my mind I have the following:

- join() : List<?>
Join N Streams from the parent Stream<Stream<?>> and pass the coordinated result under a List form (or Tuple, not decided yet). Can be combined with a throttle() or timeout() downstream to join up to T time period.
- zip(Function<List<?>, V> mapper) :  V 
Join N Streams from the parent Stream<Stream<?>> and transform the value before passing it next, not decided about List vs Tuple yet. Can be combined with a throttle() or timeout() downstream to join up to T time period.
- zipWindow(Function<Stream<?>, Stream<V>>) : Stream<Stream<V>> : 
Same thing than zip() but doesn't buffer values into a list, instead create a new window for each synchronized merged stream.
- parallelMerge(int) : Stream<Stream<?>> :
Reduce parallelism by merging upstream Streams (Stream<Stream<?>>) to one of the output predefine stream.
https://github.com/reactor/reactor/issues/338#

I don't plan more for M1 which is due now but let me know what do you think about these (in addition to the others changes).

sirinath commented 10 years ago

Just to make it easy I am listing the operators. Perhaps you can see if all bases are covered. This does not relate to any implementation but the concepts only

All operators need to work in nested streams and multiple non nested streams. Nested stream needs to subclass stream as some of the operations will not be relevant to non nested stream.

With regarding to some operator above the following cases need to be supported: 1) select when all values are available 2) select when any value is available with other as default, null or just sending the subset 3) select when a specific set of values are available replacing the rest with default or null or just sending the subset. The the above case there where packing order 1) defined order (value from each window goes into a specific slot) 2) availability order (1st available is the 1st added). 3) using a mapping function which accepts a stream with the available subset of values.

Also looking at the above you will need a function to create windows from multiple streams so they can be used with the operators. So some operators can be also declared as static methods so this can be achieved.

Also in accumulation / reduction if you have a version which takes in 2 additional streams. 1) n items of input history 2) n items of output history. In this case the seed value is not needed. In addition to this one that outputs just the final result and another this also outputs intermediate results as a stream. In this case choose / switch can also have similar scheme.

Mapping function can take 2 forms. 1) taking relevant set of parameters only 2) one which as an additional n items of history sent back to it.

If you have these you covered all bases. Any more new additions will not be critical, but desirable and bells and whistles. Perhap you can take the opportunity to cut down and streamline the implementation in version 2.0.

Most of the functionality in Rx can be boiled own to a much smaller yet composable subset of composable operators.

Can brain storm with on this if you are looking to do this.

sirinath commented 10 years ago

If you have the following high level functionality you can all what you can do with Rx are much more:

Frameworks like GPars, Hopac can do all what Rx does and much more with less operations. Perhaps you can have a look at them also.

If you have everything take and return also a stream it will make the API simpler and also have a random access capabilities in finite streams like a list.

smaldini commented 10 years ago

I come from the GPars world but even in Rx you don't need that much. We're quite close of this selection you presented here, bare from Zip and Switch. Our Split also have a different meaning (transforming Stream<Iterable> into Stream) and we have also Parallel() operation which is a bit like Switch but less flexible (the switch is done for you between generated sub-stream for round-robin parallelism).

smaldini commented 10 years ago

In your list I consider switch() (which again is close to parallel but more flexible, I think parallel will be based on switch() if I introduce it). Maybe a multiSelect() version to return more than 1 result. I think I am not going 2.0 M1 without finishing the static version for every action too, the challenging one for its typing nightmare is zip() of course.

sirinath commented 10 years ago

Historic state above refers to 1) past value 2) n past output 3) n past input where 1 is the traditional reduce usage.

A lot of people have argued with me on the merits of unzip but it is desirable to have this also.

Stream: <Stream: <A1, A2>, Stream: <B1, B2>>

to

Stream: <Stream: <A1, B1>, Stream: <A1, B1>>

back to

Stream: <Stream: <A1, A2>, Stream: <B1, B2>>

One thing to think about is do you really need all of the above. Beyond a point you loose track of what each of these exactly does. One name or idiom with many overloads are fine.

Select and Operator without their limitations (synchronisation, ordering, control over selection, etc.), windowing, grouping and few other idioms may be just enough to get all the work done. Maintenance and optimisation becomes a problem when you have too many.

Also not need of different idiom for multi select values.

Make version 2.0 an opportunity to critically reevaluate the design and API

sirinath commented 10 years ago

A few more powerful and flexible idioms are better than many overlapping idioms.

sirinath commented 10 years ago

Also you are missing groupings and group level reduces.

sirinath commented 10 years ago

Also you do not have thins like Ambiguous in the above list. May be one method to handle all use cases would be good enough.

smaldini commented 10 years ago

I don't think ambiguous is that criticial as you can build it from switch or groupBy depending on the strategy. What goup level reduce do you mean that is different from current reduce() or merge() ?

sirinath commented 10 years ago

See basically this takes 2 forms: 1) Selecting between values within nested streams / multiple streams 2) Selecting between streams in nested as well as non nested cases where once the stream is selected all values from the selected stream.

Additional scenarios extending this is: 3) select value on availability order 4) select value in order in round robbing manner 5) select values based on a selector function evoked for each new set of values 6) select values based on a selector function evoked on start up 7) select the streams in order where all values from the selected stream is read before the other stream is read 8) choose order based on an indexing function evoked once at the start 9) select stream on 1st available order

2 is not critical but desirable and would make things complete.

All the above boils down to Select and Operator (with more flexibility than GPars) like operations on: 1) Selecting the stream itself 2) Selecting values between stream 3) Packing of the selected values

If you look at it even zip and unzip is contained within this latter scenario.

With regard to reduce on groupings there is a key involved on which the grouping is done.

Also for me merge is a zip variation. May be why not just have it as zip without a mapper function.

Your life might get easier if you have a special class for which holds nested streams. E.g. Zip will have no meaning if the stream does not contain any inner streams.

smaldini commented 10 years ago

merge() stream values where zip buffer values, zip() without function mapper is a join() to me.

Additional scenarios extending this is: 3) select value on availability order -> window()

4) select value in order in round robbing manner -> parallel()

5) select values based on a selector function evoked for each new set of values --> groupBy()

6) select values based on a selector function evoked on start up --> merge()

7) select the streams in order where all values from the selected stream is read before the other stream is read --> zip()

8) choose order based on an indexing function evoked once at the start --> sort()

9) select stream on 1st available order --> ..

Rest is just flatMap or zip.

sirinath commented 10 years ago

I think we are talking about 2 different things.

E.g. if we take 3 which is plain select (http://gpars.org/guide/guide/dataflow.html#dataflow_selects). I am wondering how this functionality can be made available by window (at leas in the Rx sense.) Similarly I have some doubts for the other items also.

smaldini commented 10 years ago

Oh yes this select is a Promise use case, covered by Promises.any() or if you want to do it with Streams : stream.merge().next() where "stream" streams Streams.

sirinath commented 10 years ago

I don't think we still are on the same page. Lets sleep on this. In the meantime if you have some time go over the following book:

http://www.amazon.com/Concurrent-Programming-ML-John-Reppy/dp/0521714729

jbrisbin commented 10 years ago

Other than that it would be a "nice to have" I don't see a clear ROI in going through all the work required to understand and implement these algorithms correctly in the core given we have so many other smaller features that really are required by applications we're already trying to build.

Reactor isn't a replacement for RxJava. It also doesn't make sense for us to loose our DNA in trying to be like other library X. It seems to me that a "real" parallel library based on Reactor would be more beneficial than trying to continually add this and that kitchen sink operation to Stream. We should be really careful to not look at Stream as the end-all-be-all of functional processing in Reactor. It might actually be simpler to implement some of these a little differently than the way Stream processes values.

smaldini commented 10 years ago

I second Jon and thats the reason behind having Consumer interface and operations like toBroadcastComplete|Next|ErrorConsumer for the missing bit as you can bridge a plain reactor and fully flexible strategies from here. Having refactored Stream to fit in the spec gave us the side effect of making implementing operations much more easily and I've carried on tracking and evaluating core operations or the ones that can't be made without introspection or package visibility since we try to encourage dev to use public API.

The way it is now we have:

sirinath commented 10 years ago

What I was saying is very similar, it is best to simplify by having atomic (smallest possible) operations also but covering all the bases (6 or 7 max) than having Rx like operation. (As in there future there will be interoperability.) Any other operation alias should be expressible through combination of the atomic operations. Also any of the current Rx operation should be expressible through the atomic operation if desired. Best is the have the atomic operation in place so these as lego brick to create the other complex operators. Then there is a sound and easily testable and verifiable basis for the implementation. Maintenance also will be easy as you have to only fix bugs if any in the basic operators and any complex operators will fix them self unless the bug is in how the composition is done.

Through I started this as a Parallel Skeleton the more resent discussions were on how to streamline the stream API than Skeletons so don't confuse the Skeleton aspect of it, but just keep the API Skeleton friendly though. (This will not be a problem if you have version of some of the operators returning both Stream<?>, and Stream<Stream<?>>)

See http://en.wikipedia.org/wiki/Process_calculus#Mathematics_of_processes

Being small, focused, non overlapping and comprehensive set of operations is better than a large number of complex, overlapping operators, and also having to add new operators since existing operators are not atomic.