ReactiveX / RxJavaMath

Math operators for RxJava.
Apache License 2.0
96 stars 18 forks source link

Cannot seem to skip operators on empty observables #7

Closed dragantl closed 8 years ago

dragantl commented 8 years ago

I have a case where I'm using window to capture some data. There are times when the window comes in empty. However, by the logic of non-empty case, I have the following code:

sourceObservable.window(driverObservable).switchMap(window -> {
    Observable<Float> value = window.map(data -> data.value);

    Observable<Float> min = MathObservable.min(value);
    Observable<Float> max = MathObservable.max(value);

    return Observable.zip(min, max, Pair::new);
})...

In the above code, things work fine if window has any data. However, in time when the window is empty, map will ignore the operation and complete. However, min and max will, upon seeing complete without data, will throw an error. What I really want is for min and max to also complete without raising an error if the window is empty.

Overall, I need the whole operation not to produce a value if the window is empty. The only way I could quick-fix is via this very bad logic:

sourceObservable.window(driverObservable)
                .flatMap(Observable::toList)
                .filter(list -> list.size() > 0)
                .map(Observable::from)
.                switchMap(window -> {
                    Observable<Float> value = window.map(data -> data.value);

                    Observable<Float> min = MathObservable.min(value);
                    Observable<Float> max = MathObservable.max(value);

                    return Observable.zip(min, max, Pair::new);
})...

This works appropriately but it's an extra hit on logic that I'd hope I didn't need to do. Any suggestions? Maybe I missed something in the documentation?

akarnokd commented 8 years ago

I can't remember why min/max was defined to throw on empty (Rx.NET relic?). You could just simply ignore the error:

Observable<Float> min = MathObservable.min(value).onErrorResumeNext(Observable.empty());
Observable<Float> max = MathObservable.max(value).onErrorResumeNext(Observable.empty());
dragantl commented 8 years ago

Thanks, that works! I was just now trying to use sample but I guess it cannot be used if you sample using the same shared observable (shared since window observable cannot have multiple subscribers)... thought that was an elegant approach but didn't work:

Observable<float[]> windowObservable = sourceObservable
    .window(driverObservable)
    .map(Observable::share);

windowObservable
    .sample(windowObservable.isEmpty().filter(empty -> !empty))
    .switchMap(window -> {
        Observable<Float> value = window.map(data -> data.value);

        Observable<Float> min = MathObservable.min(value);
        Observable<Float> max = MathObservable.max(value);

        return Observable.zip(min, max, Pair::new);
})...