square / retrofit

A type-safe HTTP client for Android and the JVM
https://square.github.io/retrofit/
Apache License 2.0
43.13k stars 7.3k forks source link

Async RxJava3 call adapter doesn't produce error event when request is canceled by timeout #3524

Open alapshin opened 3 years ago

alapshin commented 3 years ago

Test case/sample: https://gist.github.com/alapshin/a60540a1a128c0af042b7fe427b9de88

Description

  1. OkHttp client is configured to use arbitrary call timeout
  2. Retrofit's RxJava3 call adapter is created via RxJava3CallAdapterFactory.create()

Result When call time outs Rx stream doesn't receive any event and remains active

Expected result When call time outs Rx stream receives error event and completes

Additional info If RxJava3 call adapter is created via RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io) then call cancellation by timeout produces exception which is propagated to stream

java.io.InterruptedIOException: timeout
    at okhttp3.internal.connection.RealCall.timeoutExit(RealCall.kt:398)
    at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:360)
    at okhttp3.internal.connection.RealCall.noMoreExchanges$okhttp(RealCall.kt:325)
    at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:209)
    at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154)
    at retrofit2.OkHttpCall.execute(OkHttpCall.java:204)
    at retrofit2.adapter.rxjava3.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:46)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13127)
    at retrofit2.adapter.rxjava3.BodyObservable.subscribeActual(BodyObservable.java:35)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13127)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    at io.reactivex.rxjava3.core.Scheduler$DisposeTask.run(Scheduler.java:644)
    at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:65)
    at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:56)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Canceled
    at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:72)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
    at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
    ... 16 more

There is also somewhat related discussion at https://github.com/square/retrofit/issues/3453 with comment with comment https://github.com/square/retrofit/issues/3453#issuecomment-682034904 which points to possible cause of this difference in behavior.

purnaPrasanth commented 3 years ago

Seems like the issue happens when thread switching happens while onFailure is being executed. To demonstrate the case deterministically I have created a sample snippet,

import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.Single
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.CompositeException
import io.reactivex.exceptions.Exceptions
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
    val countDownLatch = CountDownLatch(1)

    val disposable = SampleObservable()
        .subscribeOn(Schedulers.io())
        .subscribe(
            {
                println("Emitted")
                countDownLatch.countDown()
            },
            {
                println("Errored")
                countDownLatch.countDown()
            }
        )

    Single.just(Unit)
        .delay(3000, TimeUnit.MILLISECONDS, Schedulers.io())
        .subscribe({ disposable.dispose() }, {})

    countDownLatch.await()
}

class SampleObservable : Observable<Unit>() {
    override fun subscribeActual(observer: Observer<in Unit>?) {
        observer?.onSubscribe(SampleCallback(observer))
    }

    private inner class SampleCallback(private val observer: Observer<in Unit>) : Disposable {
        @Volatile
        private var isDisposed: Boolean = false

        init {
            sendError(IllegalStateException("Sample Exception"))
        }

        override fun dispose() {
            isDisposed = true
        }

        override fun isDisposed(): Boolean = isDisposed

        private fun sendError(exception: Exception) {
            if (!isDisposed) {
                val executorService = Executors.newSingleThreadScheduledExecutor()
                executorService.schedule(
                    {
                        try {
                            observer.onError(exception)
                        } catch (inner: Throwable) {
                            Exceptions.throwIfFatal(inner)
                            RxJavaPlugins.onError(CompositeException(exception, inner))
                        }
                    },
                    5,
                    TimeUnit.SECONDS
                )
            }
        }
    }
}

The fix would be to synchronise read and writes to isDisposed and observer.

This is the cause for one of the top trending crashes in our product. Is the fix already in pipeline? Please advice any alternatives if possible. Thanks.

dbriggsDFleet commented 2 years ago

Is there any negative side effect of just leveraging, RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io) instead of the default .create()? Seems important that timeouts would be sent downstream instead of hanging.

EDIT: It's also a significant regression from the rxjava2 call adapter, no?

optisamit commented 1 year ago

Apparently this is happening due to this line: https://github.com/square/retrofit/blob/b4eed3f82a969d0eb3c171449df04dc2b2824bbc/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/CallEnqueueObservable.java#L85

tomridder commented 1 year ago

in okhttp,when timeout,it will cancel all the call so in CallEnqueueObervale.java

if (call.isCanceled()) return;

then ,it will never catch the exception