ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.84k stars 7.61k forks source link

FATAL EXCEPTION: RxCachedThreadScheduler-4 or -1 or -2 or -3 ... #4807

Closed yubaokang closed 7 years ago

yubaokang commented 7 years ago

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1 Process: goujiawang.gjstore, PID: 5960 java.io.InterruptedIOException: thread interrupted at okio.Timeout.throwIfReached(Timeout.java:146) at okio.Okio$2.read(Okio.java:135) at okio.Buffer.writeAll(Buffer.java:993) at okhttp3.RequestBody$3.writeTo(RequestBody.java:118) at okhttp3.MultipartBody.writeOrCountBytes(MultipartBody.java:171) at okhttp3.MultipartBody.writeTo(MultipartBody.java:113) at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:189) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:170) at okhttp3.RealCall.execute(RealCall.java:60) at retrofit2.OkHttpCall.execute(OkHttpCall.java:174) at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41) at io.reactivex.Observable.subscribe(Observable.java:10151) at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10151) at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:31) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:31) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:156) at io.reactivex.internal.operators.flowable.FlowableFilter$FilterSubscriber.tryOnNext(FlowableFilter.java:72) at io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorConditionalSubscription.slowPath(FlowableFromIterable.java:376) at io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:123) at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:152) at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:110) at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:66) at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:65) at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:46) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:35) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:52) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:52) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) at java.lang.Thread.run(Thread.java:818)

  upload = Flowable.fromIterable(view.getUploadFiles())
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        return !TextUtils.isEmpty(s);
                    }
                })
                .flatMap(new Function<String, Publisher<BaseRes<UploadImagData>>>() {
                    @Override
                    public Publisher<BaseRes<UploadImagData>> apply(String s) throws Exception {
                        return RServices.get().uploadImg(view.getJSessionId(), UpLoadUtils.getRequestBody(s));
                    }
                })
                .compose(Transformer.<UploadImagData>retrofit())
                .subscribeWith(new RSubscriber<UploadImagData>(context) {
                    @Override
                    public void _onNext(UploadImagData uploadImagData) {
                        picPaths.add(uploadImagData.getId());
                    }

                    @Override
                    public void _onTEmpty() {
                        T.show(context, context.getString(R.string.submit_failed));
                    }

                    @Override
                    public void _onNetWorkError() {
                        T.show(context, context.getString(R.string.network_not_well));
                    }

                    @Override
                    public void _onReturnCodeError(String returnCode, String msg) {
                        T.show(context, msg);
                    }

                    @Override
                    public void _onComplete() {
                        submitMaterialQuality(picPaths);
                    }
                });

and when activity destroyed, I dispose it

 upload .dispose();
akarnokd commented 7 years ago

Did you hook RxJavaPlugins.onError by any chance?

yubaokang commented 7 years ago

@akarnokd no.

yubaokang commented 7 years ago

I had used retrofit2 to upload several image file. get the error ,how to deal with this problem? thanks.

JakeWharton commented 7 years ago

This is a timeout from the HTTP client. Set larger timeouts if your uploads will be taking longer. This exception is otherwise a completely normal behavior of a network: it's a transient thing that became flaky and stopped responding. Your application should be resilient to network failures and do automatic retrying or prompt the user to retry.

On Fri, Nov 4, 2016 at 11:35 AM yubaokang notifications@github.com wrote:

I had used retrofit2 to upload several image file. get the error ,how to deal with this problem? thanks.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/ReactiveX/RxJava/issues/4807#issuecomment-258464712, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEETTVTfLRjupoPG9oze6b2lfUQzQ7ks5q61DTgaJpZM4Kpgwk .

yubaokang commented 7 years ago

oh,thanks. I will try it tomorrow at company.

yubaokang commented 7 years ago

This may be the exception to the server, but this should not cause the app to run, this exception RxJava can not be caught? Should not call the onError method?

michaldrabik commented 7 years ago

I'm having exactly the same issue. When online everything is good but crashes in airplane mode. Getting 'unknownHost' warnings etc. and then FATAL EXCEPTION: RxCachedThreadScheduler-1

Could we get an explanation why this exception crashes instead of going down to onError()?

Regards

ssseasonnn commented 7 years ago

I also met the same problem, what should I do?

ssseasonnn commented 7 years ago

Ok,I found the solution.

radzio commented 7 years ago

@ssseasonnn what solution have you found?

VincentMasselis commented 7 years ago

@radzio I've faced the problem, my mistake was to trying to emit a onError after the dispose. For now, I surround any of my emitter.onError(), onNext(), onComplete() like that

if (!emitter.isDisposed())
    emitter.onNext(value)

Note : RxJava2 only, that doesn't crash on RxJava1

dimsuz commented 7 years ago

Hmm I have this crash when I use retrofit's RxJava2 adapter to send 2 or more requests at once like this:

Observable
  .fromIterable(listOf(params1, params2, params3))
  // every api call below will throw a `ConnectException`
  .flatMapSingle { params -> myApi.getApiCallSingle(params) }
  .toList()
  .subscribe({ }, { e -> println("error is ${e}") })

In this case the first error gets delivered successfully to the onError while all others seem to result in an app crash with similar message as OPs from AndroidRuntime:

         AndroidRuntime  E  FATAL EXCEPTION: RxCachedThreadScheduler-3
                         E  Process: com.example.dev, PID: 2686

I found that if I use concatMap instead of flatMap above - the crash goes away.

I am not sure what could be the source of this bug - rxjava2 itself or rxjava2 retrofit's call adapter...

Any help/hint would be appreciated.

akarnokd commented 7 years ago

@dimsuz It might be a problem in the dispose order inside RxJava - shutting down the scheduler before the fact of cancellation reaches the code that gets woken up by an interrupted blocking.

I'll try to hunt down the locations but I'm not yet sure if switching the dispose order has further implications or not.

dimsuz commented 7 years ago

Thank you! I might try to help you check, because for me it's rather easy: I can flip concatMap to flatMapSingle to reproduce it. Ping if this will be needed.

By the way I forgot to mention that this happens on Android 6.0, but not on earlier version, but I am not sure why, because these are the core Java APIs, seemingly not touching Android SDK...

On Tue, Jan 31, 2017, 17:44 David Karnok notifications@github.com wrote:

@dimsuz https://github.com/dimsuz It might be a problem in the dispose order inside RxJava - shutting down the scheduler before the fact of cancellation reaches the code that gets woken up by an interrupted blocking.

I'll try to hunt down the locations but I'm not yet sure if switching the dispose order has further implications or not.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ReactiveX/RxJava/issues/4807#issuecomment-276399453, or mute the thread https://github.com/notifications/unsubscribe-auth/AACnqM89urjhQEjK3wprqdaReJV2CL-Jks5rX1bFgaJpZM4Kpgwk .

akarnokd commented 7 years ago

@dimsuz If you could write an unit test that reproduces the crash on desktop without the need for going online, that would be helpful.

dimsuz commented 7 years ago

@akarnokd I will try. I have tried to write a naive piece of code without using any retrofit stuff, synchonously and that wouldn't reproduce the issue, seems like concurrency/scheduling stuff is what triggers this :)

akarnokd commented 7 years ago

Okay. For diagnosing the problem, I can work with a retrofit-based code in my own repo. Once the issue is located, I'll likely be able to write an offline unit test for it.

dimsuz commented 7 years ago

Jake Wharton replied to my description of this issue which I posted earlier in kotlin's Slack channel:

That's how Rx works The first error terminates the stream and the second undeliverable one goes to the plugins which goes to the thread uncaught handler.

After this explanation it seems like behavior I am getting is the correct one according to rx contract.

dimsuz commented 7 years ago

But in this case, this code should crash too, while it doesn't and prints "error!"...

Observable.fromIterable(listOf(1,2,3))
  .flatMapSingle { Single.fromCallable { throw RuntimeException() } }
  .toList()
  .subscribe({}, { println("error!") })
akarnokd commented 7 years ago

The problem is that we have to use thread interrupt to unblock blocking APIs. So when a blocking retrofit call is interrupted, it throws an InterruptedException but generally you can't know why and have to check some cancellation indication flag.

In RxJava, the interruption happens first and the set of the cancellation flag happens after. If the blocked thread is slow to wake up, it will see the cancellation flag and not emit the interrupt error. If the blocked thread wakes up fast or the cancel thread pauses between the interrupt and setting the flag, the woken up thread may find the flag not set and think it was a spurious interrupt and complain.

If the flag would be set before the interrupt is sent, a cancellation would never trigger the signal of the interrupted error.

ssseasonnn commented 7 years ago

@radzio Sorry to see it now, I used the following code to solve this problem.

static {
        RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                if (throwable instanceof InterruptedException) {
                    log("Thread interrupted");
                } else if (throwable instanceof InterruptedIOException) {
                    log("Io interrupted");
                } else if (throwable instanceof SocketException) {
                    log("Socket error");
                }
            }
        });
    }
DavidMihola commented 7 years ago

I seem to have a related error and am still confused about how to handle it...

When I disconnect the network, this code first correctly reports SocketTimeoutException in onError, but then immediately crashes with a ConnectException from somewhere within OkHttp.

compositeDisposable.add(
    Flowable.interval(10, TimeUnit.SECONDS)
        .subscribeOn(Schedulers.io())
        .flatMap(__ -> channelInfoManager.sendKeepAlive().toFlowable())
        .subscribe(
            Log::onNextWithTimber,
            Log::onErrorWithTimber
        )
);

If I replace the flatMap with either concatMap or switchMap it does not crash. Also, it didn't crash with RxJava 1.

Is this now the expected behavior and if so what exactly is the explanation and how should we deal with it?

akarnokd commented 7 years ago

@DavidMihola RxJava 1 by default dropped all excess exceptions and you didn't know about it. RxJava 2 doesn't lose them anymore. Could you try with the current RxJava 2 snapshot?

repositories {
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.0.0-DP0-SNAPSHOT'
}
DavidMihola commented 7 years ago

@akarnokd: Thanks for the reply!

OK, that required some fiddling - some other libraries kept "upgrading" RxJava from 2.0.0-DP0-SNAPSHOT to 2.0.1. But I added excludes everywhere and I am now reasonably certain that I am actually using the snapshot but that didn't solve the crashes...

akarnokd commented 7 years ago

You may suppress the extra exceptions by overriding the handler, similar to ssseasonnn's example.

DavidMihola commented 7 years ago

Yes, I saw that - but is it safe? That is, will I only "lose" any Exceptions that I wouldn't want to know about anyway (because they happened after a previous onError or after onCompleted) or is there any danger of missing valuable error information?

akarnokd commented 7 years ago

It depends on what you app does; RxJava can't do much to help you other than the hook itself. If it was me I'd let any RuntimeException crash the app as those are very likely due to programming errors and log/drop any checked exceptions because those are usually IO crashes in response to cancellations.

DavidMihola commented 7 years ago

OK, thanks again - we'll need to think that through some more!

FWIW, just "swallowing" the Exceptions with onErrorResumeNext also prevents the crashes, and may actually be closer to what we actually mean:

.flatMap(__ -> channelInfoManager.sendKeepAlive().toFlowable()
    .onErrorResumeNext(Flowable.empty())
)