ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.84k stars 7.61k forks source link

combineWithMostRecent #405

Closed samuelgruetter closed 9 years ago

samuelgruetter commented 10 years ago

I'm looking for an operation which does the following:

Whenever Observable o1 emits an item, combine this item with the most recent item of Observable o2.

Illustration:

----A-------B------C----->  o1

--0----1-2----3-4-------->  o2

    |       |      |
    V       V      V

  (A,0)   (B,2)  (C,4)

I can't find a nice way of doing this. Can anyone help me? Or do we need to add a new operation to rx.Observable?

jmhofer commented 10 years ago

Actually, combineLatest together with distinctUntilChanged, with equality based on the tuple projection would work here I think. Maybe there's an easier way, but this is what first came to my mind...

samuelgruetter commented 10 years ago

But what if o1 emits two equal items in sequence?

abersnaze commented 10 years ago

If that is part of your problem we're discussing the distinct operator here. https://github.com/Netflix/RxJava/issues/395#issuecomment-24858966

jmhofer commented 10 years ago

Maybe this is a Join use case? I found a description here.

headinthebox commented 10 years ago

Join is hardly ever used, and the semantics are subtle. and because of all the functions you need to pass pretty nasty if you don't have groupBy query comprehension syntax.

But implementing it would be a fun task for anyone that wants to dig down to the next level of detail.

Watch http://channel9.msdn.com/Series/Rx-Workshop/Rx-Workshop-7-Reactive-Coincidence first ...

On Sep 24, 2013, at 7:32 AM, Joachim Hofer notifications@github.com wrote:

Maybe this is a Join use case? I found a description here.

— Reply to this email directly or view it on GitHub.

jmhofer commented 10 years ago

This means that maybe it's a good idea to add combineWithMostRecent (as described by @samuelgruetter) as an operator? - To me it sounds like a relatively frequent use case.

zorba128 commented 10 years ago

I have similar problem - came up with:

public static <T, U, R> Observable<R> enrich(Observable<T> source, Observable<U> data, Func2<T, U, R> f) {
    return Observable.zip(source, data.sample(source), f);
}

but noticed sample() doesn't really sample input - it doesn't emit last value multiple times when no change occured between timer, causing zip operation to get out of sync.

Isn't that a bug (or at least serious documentation problem) with sample()?

marcin

samuelgruetter commented 10 years ago

I agree, that's inconsistent: sample(long, TimeUnit) emits the last value multiple times if the source observable didn't emit a new value between two ticks, but sample(Observable<U>) does not repeat it, and this line suggests it was done on purpose. But I think sample(Observable<U>) should be changed to match the behavior of sample(long, TimeUnit).

benjchristensen commented 10 years ago

How about this?

    Observable.combineLatest(a, b, { x, y -> [x, y] })
        .distinctUntilChanged({ tuple -> tuple[0]})
        .distinctUntilChanged({ tuple -> tuple[1]})
        .toBlockingObservable().forEach({ v-> println(v)})
samuelgruetter commented 10 years ago

Regarding issue1: How to get combineWithMostRecent/enrich behavior: What if the source Observable a emits two equal values in succession? We still want to see this in the resulting Observable, so we can't use an approach with distinctUntilChanged.

Regarding issue2: Is the behavior of sample(Observable<U>) correct? @benjchristensen what do you think?

benjchristensen commented 10 years ago

The sample operator was fixed in v0.18.2.

samuelgruetter commented 9 years ago

Note: The sample operator was indeed fixed, but not the way @zorba128 and me would have expected. sample(Observable<U>) and sample(long, TimeUnit) are now consistent and both do not emit the last value multiple times if it hasn't changed between two sampler ticks.

[I've reread this thread because this question was asked on stackoverflow].

benjchristensen commented 9 years ago

Would something like the following work:

observableA.combineWithMostRecentFrom(observableB)

This would different from combineLatest as it would not emit all permutations. It would only emit when A emits and take whatever the last from B was.

Then the A observable could be the "slow one" and we just grab whatever the last from B was.

staltz commented 9 years ago

Guys, guys, we need to solve this one. I've been using this "combineWithMostRecent" or "enrich" (I call it "combinePrev") many times in an Android project in production. It's very useful. Basically what we need is the asymmetric version of combineLatest. I used to have an implementation of it based on join, and I informed @mattpodwysocki that it would be good if we could do that in RxJS too, see https://github.com/Reactive-Extensions/RxJS/issues/335#issuecomment-60421335.

My implementation of the operator with join is this:

 /**
   * Similar to combineLatest, except it joins the latestitems from the two source
   * observables only when the first observable emits an item.
   * 
   * first:  ------f----f-------------f--------------f----|>
   * second: ---s1----------s---s--s3-----s-s-s-s4--------|>
   * result: ------R1---R1------------R3-------------R4---|>
   * 
   * @param first
   *        The first source Observable
   * @param second
   *        The second source Observable
   * @param combineFunction
   *        the aggregation function used to combine the items emitted by the source
   *        Observables
   * @return an Observable that emits items that are the result of combining the items
   *         emitted by the source Observables by means of the given function
   */
  public static <TFirst, TSecond, R> Observable<R> combinePrev(
    final Observable<TFirst> first, 
    final Observable<TSecond> second, 
    final Func2<TFirst, TSecond, R> combineFunction) 
  {
    final Observable<TSecond> hotSecond = second.publish().refCount();
    return first.join(hotSecond, new Func1<TFirst, Observable<Object>>() {
      public Observable<Object> call(final TFirst it) {
        return Observable.<Object>empty();
      }
    }, new Func1<TSecond, Observable<TSecond>>() {
      public Observable<TSecond> call(final TSecond it) {
        return hotSecond;
      }
    }, combineFunction);
  }

But I just figured a much simpler implementation based only on map and switch, basically this:

A.map({a -> B.map({b -> [a, b]})}).switch()

See this StackOverflow answer I wrote.

benjchristensen commented 9 years ago

not the way @zorba128 and me would have expected

What was expected? It emits the last item in a given time window if something was emitted. If nothing was emitted then nothing is emitted at the end of the time window.

http://reactivex.io/RxJava/javadoc/rx/Observable.html#sample(long,%20java.util.concurrent.TimeUnit)

benjchristensen commented 9 years ago

we need to solve this one

Sure, let's get it solved.

Once https://github.com/ReactiveX/RxJava/pull/1905 is confirmed let's add a new operator marked with @Beta and make sure it works well for everyone and then in 1.1 or 1.2 we can mark it as stable and remove the @Beta marker.

staltz commented 9 years ago

So we just need to find a proper name for it. I prefer shorter names, but enrich didn't ring a bell to me. Maybe sampleCombine?

benjchristensen commented 9 years ago

It's not quite sample though, as it's not sampling with a time interval, it really is just taking whatever the last value is, similar to BehaviorSubject, or BlockingObservable.latest().

The static Observable.combinateLatest combines all permutations of all Observables it combines.

To confirm, here we want to combine every value from one Observable with the latest or most recent of another, correct?

It feels like an instance method of combineWithMostRecent or combineWithLatest. Or is combine too confusing with the static combineLatest that does all permutations?

It ends up being very similar to zipWith if we had a zipWithLatest variant?

Does this have a proper name in Haskell, Scala or some other functional language that I'm unaware of?

@headinthebox Your input on this would be helpful.

headinthebox commented 9 years ago

Let me think what the shortest way to implement this is using the existing combinators; cant believe it is very long but I am jetlagged ;-)

akarnokd commented 9 years ago

This appears to be producing the expected results:

public class CombineWhenOther {
    public static void main(String[] args) {
        PublishSubject<Integer> source = PublishSubject.create();
        BehaviorSubject<Integer> other = BehaviorSubject.create();

        source.concatMap(e -> other.take(1).map(f -> e + f))
        .subscribe(System.out::println, Throwable::printStackTrace, 
                       () -> System.out.println("Done"));

        source.onNext(1);
        other.onNext(10);
        other.onNext(20);
        other.onNext(30);
        source.onNext(2);
        source.onNext(3);
        other.onNext(40);
        source.onCompleted();
    }
}

But both sources are hot, and since we don't have multicast(), I don't know how to convert a general other Observable to BehaviorSubject with the stable API.

staltz commented 9 years ago

Sorry, with this implementation

A.map({a -> B.map({b -> [a, b]})}).switch()

I forgot to mention that B must be hot.

It's not quite sample though, as it's not sampling with a time interval, it really is just taking whatever the last value is, similar to BehaviorSubject, or BlockingObservable.latest().

It is sample as in a.sample(b) combined with b. It is not sampling with a time interval, it is sampling with b as the sampler. See this jsfiddle (c is a.sample(b)).

The static Observable.combineLatest combines all permutations of all Observables it combines.

To confirm, here we want to combine every value from one Observable with the latest or most recent of another, correct?

It feels like an instance method of combineWithMostRecent or combineWithLatest. Or is combine too confusing with the static combineLatest that does all permutations?

If we could afford renaming existing operators, one suggestion is combineSymmetric for combineLatest and combineAsymmetric for this new one. The problem with names such as combineWithMostRecentand combineWithLatest is that in English they mean basically the same as combineLatest, and a lot of confusion can emerge from this ambiguity.

Another thing to keep in mind is that this new operator is an instance method, and shouldn't have a static version. Because of the asymmetric behavior, there should be one source Observable that commands the emission of the resulting Observable.

If we take that into consideration, we could name it withLatest, since it will be always applied on some source observable a:

c = a.withLatest(b, combineFunction)

Another insight is that since the c observable emits at the same time a emits, we can take advantage of the map concept. mapWithLatest could work as a name.

My humble suggestions are then either sampleCombine or withLatest or mapWithLatest.

samuelgruetter commented 9 years ago

Would something like the following work:

observableA.combineWithMostRecentFrom(observableB)

This would different from combineLatest as it would not emit all permutations. It would only emit when A emits and take whatever the last from B was.

That's exactly what I was looking for.

not the way @zorba128 and me would have expected

What was expected? It emits the last item in a given time window if something was emitted. If nothing was emitted then nothing is emitted at the end of the time window.

http://reactivex.io/RxJava/javadoc/rx/Observable.html#sample(long,%20java.util.concurrent.TimeUnit)

For example, if I sample an audio signal s at a frequency of 44100 Hz, I expect to get one sample every 1/44100 seconds, no matter what the shape of s is. Taking this analogy to observables, I'd expect that

myObservable.sample(50 milliseconds)

emits an element every 50 milliseconds, no matter when myObservable emits how many items. That is, I'd expect that if nothing was emitted in the time window, the last value is repeated.

But I agree that the way RxJava understands "sample" also makes sense, and it's well explained in the docs what happens, so I'm not saying we should change anything.

If we take that into consideration, we could name it withLatest, since it will be always applied on some source observable a:

c = a.withLatest(b, combineFunction)

I think withLatest would be a nice name.

dvtomas commented 9 years ago

I agree this is a very useful operator, and, @staltz , I like your map - switch implementation. Just for cross-reference, I have already raised this issue (#912) some time ago.

Naming the new operator indeed is difficult now that sample is taken. Perhaps sampleEach could be considered, to express that there is a difference in behavior to sample, while still asserting that it is sampling in some sense?

dvtomas commented 9 years ago

Or, what about combineSampled? I like how it is simillar to combineLatest - both do combining, both have same type signatures ((T, U) => (T, U)), they but they differ in what they combine - either latest values, or values taken at times defined by the sampler. They even sort close alphabetically, so the user can see he has a choice, what suits him best.

sampleEach could be used as an alternative to just sample, with slightly different behavior (not filtering out non-changed values), if need for it's inclusion in the library ever arises (but it can be trivially replaced by combineSampled, so it will probably not).

staltz commented 9 years ago

@dvtomas if we would use a.combineSampled(b, combineFunction), it would sound like we are combining a with b.sample(something) while in reality we are combining a.sample(b) with b.

akarnokd commented 9 years ago

Or a.combineWithLatestOf(b, throughAFunction).

dvtomas commented 9 years ago

@staltz Sorry, can't see it. My mind is already too deeply connected with my interpretation.. Also, I work in scala, the combineFunction would probably be absent, it would be just a combineSampled b, I haven't thought of Java.

@akarnokd That sounds reasonable wrt to combineLatest being widely understood and used. I had to go through some pondering about what the Latest part in combineLatest really means first, to appreciate that...

staltz commented 9 years ago

it would be just a combineSampled b, I haven't thought of Java.

So to clarify, which of a or b do you think is being sampled by the other, in this a combineSampled b idiom? How are you "reading" this in plain english?

dvtomas commented 9 years ago

@staltz I see your point now. It reads roughly as A combine with sampled B (sampled with what?). That's not right..

I like @akarnokd's combineWithLatestOf the best so far.

staltz commented 9 years ago

@dvtomas precisely, when we say "sampled B", we think "we take samples of B" which to RxJava translates to b.sample(something), which does not happen in reality. I don't want this to become a gotcha.

staltz commented 9 years ago

Now implemented in RxJS as withLatestFrom. I would make a PR in RxJava as well, but I'm having a hard time navigating through the core in the codebase, maybe someone else familiar with the codebase could implement it?

The implementation can be roughly

A.map({a -> hotB.map({b -> [a, b]})}).switch()

Or a state machine like I did in RxJS.

staltz commented 9 years ago

http://stackoverflow.com/questions/28580490/rxjava-how-to-emulate-withlatestfrom People are asking for this operator.

Some one please implement it?

JakeWharton commented 9 years ago

:+1: I have had 4 separate instances of need of this in the last two weeks that I felt dirty working around!

akarnokd commented 9 years ago

I'll do this.

staltz commented 9 years ago

:+1: :+1:

akarnokd commented 9 years ago

See #2760 for the proposed name and behavior.

JakeWharton commented 9 years ago

This issue can be closed.

abersnaze commented 9 years ago

The window operator looks like an interesting option for building this. The initial drawing looks almost exactly like the marble diagram for the operator.

window marble diagram

abersnaze commented 9 years ago

Damn it, I couldn't stop thinking about this all night.

package asdf;

import static rx.Observable.zip;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public class Main {
    private static class Tuple {
        public final String t;
        public final int i;

        Tuple(String t, int i) {
            this.t = t;
            this.i = i;
        }

        @Override
        public String toString() {
            return t + ":" + i;
        }
    }

    public static void main(String[] args) {
        Subject<String, String> trigger = PublishSubject.create();
        Subject<Integer, Integer> data = PublishSubject.create();

        trigger.publish(trigger_ -> {
            return zip(trigger_, data.window(trigger_).flatMap(window -> window.lastOrDefault(-1)), Tuple::new);
        }).scan((last, curr) -> curr.i == -1 ? new Tuple(curr.t, last.i) : curr).subscribe(System.out::println);

        data.onNext(0);
        trigger.onNext("A");
        data.onNext(1);
        data.onNext(2);
        trigger.onNext("B");
        trigger.onNext("C");
        data.onNext(3);
        data.onNext(4);
        trigger.onNext("D");
    }
}

produces the output

A:0
B:2
C:2
D:4

It would simpler if you don't need the triggers value.

package asdf;

import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public class Main {
    public static void main(String[] args) {
        Subject<String, String> trigger = PublishSubject.create();
        Subject<Integer, Integer> data = PublishSubject.create();

        data.window(trigger).flatMap(window -> window.lastOrDefault(-1)).scan((last, curr) -> curr == -1 ? last : curr).subscribe(System.out::println);

        data.onNext(0);
        trigger.onNext("A");
        data.onNext(1);
        data.onNext(2);
        trigger.onNext("B");
        trigger.onNext("C");
        data.onNext(3);
        data.onNext(4);
        trigger.onNext("D");
    }
}

produces the output

0
2
2
4
staltz commented 9 years ago

@abersnaze or just

A.switchMap({a -> hotB.map({b -> [a, b]})})
abersnaze commented 9 years ago

I had to change the variable names to grok it. Much better than mine. data.switchMap(i -> trigger.map(t -> new Tuple(t, i))).subscribe(System.out::println);