ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.84k stars 7.61k forks source link

2.x: Surprising `startWith` scheduling behaviour #5812

Closed Rosomack closed 6 years ago

Rosomack commented 6 years ago

Android and RxJava 2.1.8:

I ran into a surprising behaviour of startWith that I'd like to verify. I've boiled it down to a simple example:

Observable.just(Unit)
                .observeOn(Schedulers.io())
                .startWith(Unit)
                .switchMap {
                    Observable.create<Unit> { emitter ->
                        Thread.sleep(5000)
                        emitter.onNext(Unit)
                        emitter.onComplete()
                    }
                }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe()

In the above example, execution of the observable inside switchMap happens on the main scheduler and blocks my UI.

I would expect that it will happen on the io scheduler and it does indeed, as soon as I remove startWith.

I realise I can just add another observeOn after startWith but it's puzzling. We're already on the io scheduler when the startWith gets hit, so why would it switch threads?

akarnokd commented 6 years ago

startWith subscribes to its argument (constants are turned into Observable.just()) on the current thread and observeOn is above it, thus can't have any effect of it. Let me rewrite your code to make it more apparent:

Observable.concat(
    Observable.just(Unit),
    Observable.just(Unit).observeOn(Schedulers.io())
)
.switchMap {
       Observable.create<Unit> { emitter ->
                Thread.sleep(5000)
                emitter.onNext(Unit)
                emitter.onComplete()
       }
 }
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe()

As the subscription travels up, the thread hasn't changed so the first just will emit Unit on the caller thread which then goes into the blocking switchMap.

You can apply subscribeOn instead and the switchMap will trigger on the background thread:

Observable.just(Unit)
                .startWith(Unit)
                .switchMap {
                    Observable.create<Unit> { emitter ->
                        Thread.sleep(5000)
                        emitter.onNext(Unit)
                        emitter.onComplete()
                    }
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe()

or move the observeOn below the startWith:

Observable.just(Unit)
                .startWith(Unit)
                .observeOn(Schedulers.io())
                .switchMap {
                    Observable.create<Unit> { emitter ->
                        Thread.sleep(5000)
                        emitter.onNext(Unit)
                        emitter.onComplete()
                    }
                }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe()
Rosomack commented 6 years ago

Ordinarily I would use subscribeOn, but sadly I'm using RxBinding which requires the subscription to happen on the main thread.

Thanks for your complete explanation! It makes sense.