ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.87k stars 7.6k forks source link

timeout eats exceptions #3786

Closed AAverin closed 8 years ago

AAverin commented 8 years ago

If I have a custom Observable that published some exceptions in onError, chaining such observable with timeout(time, units, MyCustomTimeoutException()) hides exceptions that are sent by custom observable.

Some example. SecurityException and IllegalArgumentException never reach my subscriber.

fun getLocation(): Observable<Location> {
        return requestSingleLocation()
                .subscribeOn(schedulers.loopedIo)
                .timeout(LOCATION_REQUEST_TIMEOUT,
                         TimeUnit.SECONDS,
                         Observable.error(NetworkLocationTimeoutException()))
                .first()
    }

    private fun requestSingleLocation(): Observable<Location> {
        return Observable.create<Location> { subscriber ->

            try {
                val knownLocation = locationManager.getLastKnownLocation(LocationManager.NETWORK_PROVIDER)
                knownLocation?.apply {
                    subscriber.onNext(this)
                }

                locationManager.requestSingleUpdate(LocationManager.NETWORK_PROVIDER, LocationChangedListener {
                    subscriber.onNext(it)
                    subscriber.onCompleted()
                }, null)

            } catch (securityException: SecurityException) {
                subscriber.onError(securityException)
            } catch (illegalArgumentException: IllegalArgumentException) {
                subscriber.onError(illegalArgumentException)
            }
        }
    }

Following test fails:

@Test
    fun getLocationProcessesSecurityException() {
        // given

        val securityException = SecurityException()
        given(locationManager.getLastKnownLocation(LocationManager.NETWORK_PROVIDER)).willReturn(location)
        given(locationManager.requestSingleUpdate(BDDMockito.anyString(), any(), any())).willThrow(securityException)
        val testSubscriber = TestSubscriber<Location>()

        // when
        classToTest.getLocation().subscribe(testSubscriber)

        // then
        testSubscriber.assertError(securityException)
    }
akarnokd commented 8 years ago

This test works for me:

@Test
public void testErrorTimeout() {
    TestSubscriber<Object> ts =  TestSubscriber.create();

    SecurityException se = new SecurityException();
    Observable.error(se)
    .subscribeOn(Schedulers.io())
    .timeout(1, TimeUnit.SECONDS, Observable.error(new TestException()))
    .subscribe(ts)
    ;

    ts.awaitTerminalEvent();
    ts.assertError(se);
}

It seems you did forget to await the terminal event and thus the test thread completes before the error is propagated.

AAverin commented 8 years ago

Still getting No Errors message

akarnokd commented 8 years ago

Print out the content of ts.getOnNextEvents() list before asserting to see if the mocking of requestSingleUpdate didn't work (maybe it runs the body but only throws after it?).

AAverin commented 8 years ago

I have modified your test example and made it fail

 class TestClass {

    }

    @Test
    fun testErrorTimeout() {
        val ts: TestSubscriber<TestClass> = TestSubscriber.create()

        val se = SecurityException()

        Observable.merge(Observable.just(TestClass()), Observable.error(SecurityException())).first()
                .subscribeOn(Schedulers.io())
                .timeout(1, TimeUnit.SECONDS, Observable.error(NetworkLocationTimeoutException()))
                .subscribe(ts)

        ts.awaitTerminalEvent()
        ts.assertError(se)
    }
AAverin commented 8 years ago

Sorry, it's in Kotlin though

akarnokd commented 8 years ago

first() Returns an Observable that emits only the very first item emitted by the source Observable, or notifies of an NoSuchElementException if the source Observable is empty.

That also means it cuts any subsequent events, including errors.

AAverin commented 8 years ago

Hmm. What I need is either a successful result - TestClass returned, or a custom error message. Without .first() I will get both even in case of success because timeout will wait for subsequent events until timer runs out.

AAverin commented 8 years ago

Ok, can confirm that sample test passes without first(). Can you suggest any other way of achieving desired result? Thanks

akarnokd commented 8 years ago

I'm not sure what you want to achieve. Timeout should not even happen, unless getLastKnownLocation or requestSingleLocation really take a long time to return or throw.

AAverin commented 8 years ago

Well, they can. User may be in a bad connection place and these call can take a long time. I need to make sure that if they really take a long time - I get a custom exception to gracefully handle the case in the UI.

Implementation with first() works in the UI - I correctly get either a success, or a custom exception in case of long response. But I also need to cover this with tests, and looks like I may have a problem=) And looks like first() might be not a correct solution - if getLastKnownLocation will return valid result, but requestSingleLocation would throw an exception - my code will not get it

akarnokd commented 8 years ago

You mean the case when timeout happens before the requestSingleLocation throws?

AAverin commented 8 years ago

getLastKnownLocation is a relatevely fast call and will just return null in case there is no last known location requestSingleLocation, on the other hand, can take a while.

The problem in my code is that I can't use first() at all - I will loose my updated location if there was a known location. So I need a timeout() operator that will work only if there are no items emmited, but would not wait for subsequent items.

Removing the first will lead to a situation when I will get onNext() and then onError() from the timeout because there was no 2nd onNext event with new item - my observable isn't hot and is expected to emmit only single set of items, starting with some cached old result

AAverin commented 8 years ago

Issue resolved, thanks for your help a lot! Problem was that I didn't call onCompleted() for my custom subscriber due to a small mistake in the code. onCompleted correctly unsubscribes timeout

akarnokd commented 8 years ago

Great to hear it!