dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.67k stars 748 forks source link

4.x: possible new operators based on RxJava's feature set #489

Open akarnokd opened 6 years ago

akarnokd commented 6 years ago

I've collected the operators and overloads of theirs which are available in RxJava but (likely) not in Rx.NET yet to enable discussions about feature synchronization.

Operators

Operator in RxJava Description
combineLatestDelayError Keep combining sources as long as possible, then signal the aggregate exception at the end.
concatDelayError Keep concatenating and signal the aggregate exception at the end.
concatEager Run many or all source observables concurrently, but provide the items from the sources in their original order.
concatEagerDelayError Run many or all source observables concurrently, provide items in order and signal the aggregate exception at the end.
error(Callable) Signal an onError with the generated exception per observer.
fromCallable Invoke a function to generate a single item & completion per observer.
generate Generate events in a callback fashion with an observer-abstraction and per observer state.
intervalRange Produce a range of numbers with an initial delay for the first followed by a periodic signal of the rest.
mergeDelayError Merges some or all sources into one sequence and signals an aggregate exception when all of them terminated.
switchOnNextDelayError Switch to newer sources, signal the aggregate exception at the end.
zip(delayError) Zips sources into one sequence and signals the error after the last row could be combined.
blockingIterable Turn a sequence into a blocking enumerable sequence.
blockingSubscribe Observe on the caller thread in a blocking fashion.
cache Consume the upstream once, cache and replay all items to observers.
collect Aggregate items into a single, custom container per observer with the help of a collector action and signal the container as a single result.
compose Applies a transformation function from observable to observable when the sequence is constructed.
concatMap Maps the upstream items into observables and concatenates them in order; an in-sequence version of concat. Can be implemented more efficiently than concat(map(source, f)).
concatMapDelayError Maps the upstream items into observables and concatenates them in order, signaling an aggregate exception at the end.
concatMapEager Maps the upstream items into observables, runs some or all of them eagerly but emits their items in order; an in-sequence version of concatEager.
concatMapEagerDelayError Maps the upstream items into observables, runs some or all of them eagerly but emits their items in order while signaling an aggregate exception at the end.
concatMapIterable Maps the upstream items into enumerations and emits their items in order; more efficient than concatMapping an Iterable turned into an observable.
doAfterNext Call an action with the current item after it has been emitted to the downstream but before the next value from upstream.
doFinally Perform an action exactly once per observer when it either completes normally, with an error or gets disposed.
doOnDispose Perform an action when the sequence gets disposed.
doOnSubscribe Perform an action when the subscription happens but before items start flowing.
flatMap(delayError) Maps the upstream items into observables, merges them into one sequence and signals the aggregate error at the end. Can be implemented more efficiently than merge(map(source, f)).
forEachWhile Consumes the sequence via a predicate that when returns false, the sequence is disposed.
reduceWith Start with a per observer initial value as the accumulator and apply a function that takes that accumulator, the upstream item and returns a new accumulator value, then emit the last accumulator value as a single result.
repeat(BooleanSupplier) Repeatedly subscribe to a source if the boolean predicate at each completion allows that.
repeatWhen Repeatedly subscribe to a source when after the current completion, a secondary per observer sequence signals an item.
retry(Predicate) Retry a failed sequence if the predicate returns true for the exception
retry(BiPredicate) Retry a failed sequence if the predicate returns true for the exception and the total retry count so far.
retryUntil(BooleanSupplier) Retry a failed sequence if the boolean predicate allows it.
retryWhen Retry a a failed sequence when the secondary per observer sequence signals an item in response to the current failure.
sample(emitLast) Sample the source sequence and optionally emit the very last unsampled item when the sequence completes before the sampler signals again.
scanWith Starts with a per observer generated initial accumulator value and emits a combination of this accumulator and the upstream item which also becomes the new accumulator value.
switchIfEmpty If the main sequence turns out to be empty, a fallback sequence is used to stream events instead.
switchMap Maps the upstream item into an observable and keeps switching to newer ones as those come along, disposing the previous ones. Can be implemented more efficiently than switchOnNext(map(source, f)).
switchMapDelayError Maps the upstream item into an observable and keeps switching to newer ones as those come along, disposing the previous ones and signalling the aggregated exception at the end.
takeUntil(Predicate) Emits items from the upstream and after each, it runs a predicate to determine if the sequence should stop just after the current item.
observeOn(delayError) Observe items on another thread and optionally keep the total event order or have the errors cut ahead.
unsubscribeOn When the sequence is diposed, the dispose call traveling upstream is executed on a scheduler.
withLatestFrom(Observable...) Combines the items of the main source with the latest items of multiple other sources.
autoConnect Automatically connects once to a ConnectableObservable after the specified number of observers have subscribed.
refCount(n, time) Connect to a ConnectableObservable after the specified number of observers have subscribed and/or disconnect after a grace period (under review).

Subjects

RxJava has one additional subject type: UnicastSubject. It buffers items until one observer subscribes some time later and then drains the buffer into the observer.

In addition, Subjects in RxJava have additional state-peeking methods available to all variants:

Method Description
hasObservers() (all subjects) true if there are currently any observers subscribed to the subject.
hasComplete() (all subjects) true if the subject terminated normally.
hasThrowable() (all subjects) true if the subject terminated exceptionally.
getThrowable() (all subjects) Returns the exception if the subject terminated exceptionally.
toSerialized() Makes the onXXX methods threadsafe to call from any thread.
hasValue() true if a BehaviorSubject or ReplaySubject have items in their buffer.
getValue() Returns the current item of a BehaviorSubject.
getValues() Returns the currently buffered items in a ReplaySubject.

I'm not sure how interface evolution works in C#, so assuming it would break System.Reactive.Interfaces users, the methods could be added to a new interface derived from ISubject or just onto the various Rx Subject types.

ghuntley commented 6 years ago

Here's a bookmark for some of the side-convo that is going on in the Slack channel.

https://reactivex.slack.com/archives/C02B9R3QA/p1525433788000119

I'd appreciate if, over time as we figure out how to work together, less discussions happened on Slack and more on GitHub. It's early days of rebuilding the community, and right now so we don't need much structure but for progressing something like this more formal structure will be required.

We definitely want to be conservative when adding operators and each one will need to be debated on merit in their own separate issue. We aren't ready for separate issues though. Let's use this thread as the baseline for bringing parity and discussions if we even need to do so (vs creating libraries) and how it could be done and when would be best to do it.

You are doing a fantastic job. Thank-you for your help so far. Thank-you for doing this audit @akarnokd.

RivaCode commented 6 years ago

I would like to propose yet another operator from RxJs, which I find highly useful with UI applications! exhaustMap - it is a high order observable which, unlike switch (and switchMap) ignores any new values pushed from the upstream until the inner observable has completed.

akarnokd commented 6 years ago

@RivaCode. We have it as flatMapDrop or flatMapLatest in the RxJava 2 Extensions project. I didn't include those because they look as rare to esoteric even in RxJava's quite liberal eyes.

RivaCode commented 6 years ago

Can't say I agree.

When, for example: Clicking a button, causes a creation of a resource and you must make sure that operation is atomic. exhaustMap (or flatMapDrop/flatMapLatest) comes quite handy!

akarnokd commented 6 years ago

As I mentioned in the Slack channel, thanks to C# extension methods, these or any other operator can live in any 3rd party library, outside of Rx.NET. The drawback is the possible difficulty of discovering them, the distrust of being non-standard or naming conflicts between multiple 3rd party extension methods.

akarnokd commented 6 years ago

Added section about subjects in RxJava.

quinmars commented 6 years ago

Would it be possible to apply the performance improvments of concatMap behind the scences? So that if you write Concat().Select(x => ...) it will use the improved implementation. Similar as it is done for Skip(4).Skip(2), which will be transposed to Skip(6).

akarnokd commented 6 years ago

It's Select(x => IObservable).Concat() but you'd have to typematch the Select instance with the appropriate Func<T, IObservable<R>> signature.

glopesdev commented 6 years ago

I am hesitant about introducing the vocabulary of other Rx variants into the dotnet repo, since it will introduce ambiguity. For example, flatMap already exists, it's called SelectMany, and I don't think introducing aliases will help .NET developers understand the duality between IEnumerable and IObservable.

I think concatMap and switchMap are really useful, giving you more intuition of the possible ways to reduce the sequence, so I think they do have a place in this library. I would align their names with SelectMany though, and call them ConcatMany and SwitchMany.

quinmars commented 6 years ago

@glopesdev It probalby should be ConcatSelect because the counter part of map is Select. Or SelectConcat because flatMap translates to SelectMany and not ManySelect.

@akarnokd I see. So you would need some type variable in the type checks, which are not possible like (pseudo code):

if (source is Select<x, y>.Selector && y is IObservable) ...
glopesdev commented 6 years ago

@quinmars ConcatSelect breaks the correspondence as you pointed out, so it doesn't really help users immediately relate to these variants. SelectConcat could be a reasonable option, but it doesn't sound right because it doesn't preserve the original flavour of SelectMany, which comes from SQL:

from x select one turned into from x select (many).

In the case of Concat, it feels more intuitive to read: from x concat (many).

glopesdev commented 6 years ago

Another way to think about this:

Select doesn't really translate cleanly to Map. It comes from SQL, where it originally meant selecting which columns of the data frame to include in the query result. Gradually it was realized it could be much more powerful and extended to include derived columns created from "mapping" or "transforming" the columns of each row.

In a similar way, the Many in SelectMany doesn't really mean Merge or Flat, but simply indicates that you are selecting multiple rows, rather than just one row. What to do with these rows is left up to imagination. In database "pull" world, there is usually only one way to think about it, because time is irrelevant, so the only thing you can do with Many is "select" them.

In reactive world, there is many more things you can do with Many, you can ConcatMany or SwitchMany, among other possibilities.

quinmars commented 6 years ago

I'm aware of the meaning of SelectMany. But as you said: it's called SelectMany, because you select not only one element but many elements. With Switch you switch to the latest observable, but with SwitchMany you do not switch to many observables. The only difference is that you pass a selector to select the latest obervable. IMO Many does not make any sense here. Same with Concat you always conact many observables, even if the method is only called Concat. ConcatMany would be just confusing.

I see that the concatenation of two verbs like in ConcatSelect or SelectConcat is not ideal, but SwitchMany feels simply wrong. Maybe SelectingSwitch and SelectingConcat? Or do we real need an extra name? Couldn't an additional overload do the same work? Like .Switch(v => v.Observable) or .Concat(async x => await GetAsync(x)).

glopesdev commented 6 years ago

Couldn't an additional overload do the same work? Like .Switch(v => v.Observable) or .Concat(async x => await GetAsync(x))

I think this really nails it 👍 i very much support the overload idea, no need to confuse people with more names that seem to imply new concepts

glopesdev commented 6 years ago

Hmmm, although I still agree with @quinmars solution of adding an overload, I wanted to present another perspective into using the Many suffix.

For me, what Many represents is really the selector, as was pointed out. Specifically, Many in this case indicates a constraint on the selector: it cannot be just any selector, in fact it has to be a selector "from one to many", it has to return an IObservable or IEnumerable.

Basically Many represents the qualification on the return type of the selector, while the verb itself represents what you are doing with those Many: in the case of SelectMany you are simply emitting all of them, which is why Merge is used; in the case of ConcatMany or SwitchMany you are applying the Concat or Switch operator to the result of the selector.

I think it's equally valid to think about things this way, and as long as this perspective is made clear, and used consistently, I would still accept ConcatMany and SwitchMany.

The selector overload idea is not too bad, but it will beg the question of why SelectMany is not simply an overload of Merge...

bartdesmet commented 5 years ago

I think it'd be good to consider new operators on a case-by-case basis. So, I'd be all for seeing a set of issues that propose addition of operators in a more fine-grained manner.

A few answers:

weitzhandler commented 4 years ago

Just bumped into the necessity for a SwitchIfEmpty. Has been asked on SO too.

Posted separately: #1076.

Liero commented 2 years ago

What we really need to move forward when adding new extensions is a code analysis of existing repositories to find out, which operators are most used.

BTW, I would love to see feature parity with Rx.JS rather than Rx.Java as they are used much often.