ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.88k stars 7.61k forks source link

ConnectableObservable.connect() causes NetworkOnMainThreadException #5588

Closed DCRichards closed 7 years ago

DCRichards commented 7 years ago

Versions

RxJava: 2.1.0 RxAndroid: 2.0.1

Issue

I'm using the following pattern to allow me to share the result of an expensive network call across two observables, which both transform the result in different ways:

ConnectableObservable<Model> connectableObservable = doExpensiveNetworkCall().publish();

// Transform in one way.
connectableObservable
    .filter(...)
    .subscribeOn(mDataManager.getScheduler())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...);

// Transform in another.
connectableObservable
    .filter(....)
    .subscribeOn(mDataManager.getScheduler())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...);

// And connect... Uh oh,  NetworkOnMainThreadException!
connectableObservable.connect();

This is a very useful pattern, however, calling .connect() causes NetworkOnMainThreadException as the expensive network call is then run on the main thread, despite all subscribers of the downstream observables being on a separate thread.

Is there an equivalent of .connectOn() or another alternative which can be used to also let the ConnectableObservable know that it should be run on a different thread? Alternatively, if there's a better way of achieving the above, any guidance would be much appreciated!

akarnokd commented 7 years ago

publish turns the cold source into a hot one, similar to a PublishSubject, and subscribeOn has no practical effect on hot publish-like sources. In other terms, applying subscribeOn after the ConnectableObservable has no effect. You have to apply subscribeOn before the publishing:

ConnectableObservable<Model> connectableObservable = doExpensiveNetworkCall()
    .subscribeOn(mDataManager.getScheduler())                  // <------------------------------
    .publish();

// Transform in one way.
connectableObservable
    .filter(...)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...);
akarnokd commented 7 years ago

Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

DCRichards commented 7 years ago

Thank you @akarnokd, that was exactly what I needed, had a bit of a wood from the trees moment. Sorted 👌