ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.88k stars 7.6k forks source link

Stack trace is opaque for IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling. #2293

Closed jondwillis closed 9 years ago

jondwillis commented 9 years ago

This occurs when using RxAndroid 0.23.0/RxJava 1.0.0, please redirect me if this is not the most appropriate place for this.

There seem to be a several ways to cause the following exception, notably using observeOn() on a when there exists overproducing or underconsuming, and there is no clear association with client code:

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
       at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:50)
       at android.os.Handler.handleCallback(Handler.java:733)
       at android.os.Handler.dispatchMessage(Handler.java:95)
       at android.os.Looper.loop(Looper.java:146)
       at android.app.ActivityThread.main(ActivityThread.java:5598)
       at java.lang.reflect.Method.invokeNative(Method.java)
       at java.lang.reflect.Method.invoke(Method.java:515)
       at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1283)
       at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1099)
       at dalvik.system.NativeStart.main(NativeStart.java)
Caused by: rx.exceptions.OnErrorNotImplementedException
       at rx.Observable$31.onError(Observable.java:7204)
       at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:127)
       at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:96)
       at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
       at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:177)
       at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.access$000(OperatorObserveOn.java:65)
       at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:153)
       at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:45)
       at android.os.Handler.handleCallback(Handler.java:733)
       at android.os.Handler.dispatchMessage(Handler.java:95)
       at android.os.Looper.loop(Looper.java:146)
       at android.app.ActivityThread.main(ActivityThread.java:5598)
       at java.lang.reflect.Method.invokeNative(Method.java)
       at java.lang.reflect.Method.invoke(Method.java:515)
       at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1283)
       at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1099)
       at dalvik.system.NativeStart.main(NativeStart.java)
Caused by: rx.exceptions.MissingBackpressureException
       at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
       at rx.internal.operators.OnSubscribeCombineLatest$MultiSourceProducer.onNext(OnSubscribeCombineLatest.java:201)
       at rx.internal.operators.OnSubscribeCombineLatest$MultiSourceRequestableSubscriber.onNext(OnSubscribeCombineLatest.java:252)
       at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
       at rx.Scheduler$Worker$1.call(Scheduler.java:118)
       at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:45)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
       at java.util.concurrent.FutureTask.run(FutureTask.java:237)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
       at java.lang.Thread.run(Thread.java:841)

I'd like to strike up a discussion highlighting common causes on these backpressure exceptions, how to determine the source of them in client code, and finally how to improve the stacktrace.

jondwillis commented 9 years ago

May relate to https://github.com/ReactiveX/RxJava/issues/1821 https://github.com/ReactiveX/RxJava/issues/1804 https://github.com/ReactiveX/RxJava/issues/1682

zsxwing commented 9 years ago

You can simply add an onError handler so that RxJava can report the error to you.

jondwillis commented 9 years ago

Right, but given I have hundreds of places that I could add that onError handler to, and decoupled stacktrace, I have no obvious place to start.

benjchristensen commented 9 years ago

You can register a global error handler if you don't have specific error handlers. We use it at Netflix to track unhandled errors.

Take a look at http://reactivex.io/RxJava/javadoc/rx/plugins/RxJavaPlugins.html#registerErrorHandler(rx.plugins.RxJavaErrorHandler)

benjchristensen commented 9 years ago

Since you haven't defined an error handler all that can be done is register the uncaught exception. This has been discussed at length, particularly in #1682 and #1766 which added the uncaughtExceptionHandler behavior: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/schedulers/ScheduledAction.java#L58

Errors are propagated down, not up, when going across thread boundaries. When an exception is thrown up, as happens when there is no error handler defined, it will eventually hit the thread boundary and then we send it to the ErrorPlugin and register it with the uncaughtExceptionHandler: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/schedulers/ScheduledAction.java#L56

So you have 3 options:

1) add error handling in your application (generally the appropriate solution) 2) add an uncaughtExceptionHandler (the JDK solution for getting unhandled errors) 3) add a RxJavaErrorHandler (the RxJava solution for getting unhandled errors)

Now a few more specific questions:

given I have hundreds of places that I could add that onError handler to

This jumped out at me as that number doesn't sound right. If you have hundreds of places where you are subscribing that generally indicates a problem (unless your application is rather large or has many distinct use cases). The subscribe should only happen at the very end of an Observable sequence. In a request/response loop serving a user request for example, there would be one subscribe. Do you legitimately have hundreds of individual use cases with subscribes, or by chance are you doing nested subscribes inside map and other such functions?

I'd like to strike up a discussion highlighting common causes on these backpressure exceptions

What particular questions do you have about causes?

The operators involved are generally going to be merge, flatMap, zip, or observeOn.

how to determine the source of them in client code

The stack trace you showed suggests it is probably a timer. combineLatest sequence. See the next comments below for more on this.

and finally how to improve the stacktrace.

This is the biggest weakness of async code because JVM callstacks are intended for synchronous imperative code. I personally am researching solutions for stitching together artificial callstacks that show the actual user code instead of the JVM callstacks which show the Rx library, but to do it without killing performance requires digging into the native JVM code via native java agents. Thus it will not be an option for Android. We may be able to create one for debugging purposes that uses normal Java, but it would be far too slow to ever enable in production (as it involves capturing a stacktrace at every operator invocation).

Another route we are exploring is the use of the plugins for debugging and capturing the callgraph as a stream of events. We have succeeded in getting the graph of events, but not yet had time to create a UI to make it usable. You can see the effort so far at https://github.com/ReactiveX/RxJavaDebug Again, this would be too slow for production use except for cases where it is dynamically enabled just for tracing a particular request.

What suggestions do you have and what would you like to help improve?

jondwillis commented 9 years ago

@benjchristensen thank you for taking the time to answer. I think I understand the difficulty here. I've already got an uncaughtExceptionHandler hooked up to some analytics that made me aware of this issue in the first place.

This jumped out at me as that number doesn't sound right. If you have hundreds of places where you are subscribing that generally indicates a problem (unless your application is rather large or has many distinct use cases).

You're right. Performing a search for subscribe( in my application turned up 153 instances, where about 110 of those are in client/UI binding code, and 43 in service-level code. Admittedly, there is a lot of legacy code here that wholly misses the mark of Rx in general and abuses subscriptions where other more appropriate operators should be used. I've begun identifying and cleaning up instances of these abuses and hopefully that will improve the situation.

The stack trace you showed suggests it is probably a timer. combineLatest sequence. See the next comments below for more on this.

I see where the stack trace references timer, but I don't see any obvious references to combineLatest without knowing more about the internals of the library. I don't use timer directly in my client code, but I do use Workers/SchedulePeriodically and Observable.interval()

For the time being, is there some way I can apply a global backpressure strategy, knowing that it will have negative side-effects, until I can properly address my problems?

benjchristensen commented 9 years ago

You can apply onBackpressureBuffer to your source and it will make it behave the same as before. If that isn't an option you can set a global property for rx.ring-buffer.size to a value higher than your max stream length.

benjchristensen commented 9 years ago

Another curiosity for me to better understand how things get used. Why do you end up using Workers/SchedulePeriodically directly instead of an operator like interval?=

akarnokd commented 9 years ago

In my opinion, it happens because programmers come from the classical Executor training and find the Scheduler as the nearest thing to work with periodically.

The second cause, I think, is that not using a value from a source and mapping in something else is a strange way of doing async work. (I.e., using just(1).subscribeOn(io()).map(_ -> { doWork(); return 2;}).subscribe();).

Maybe the detailing of the scheduler API should be an advanced topic described way after the use of operators such as timer and interval. Besides, using the Worker correctly to avoid leaks in operators or in user code is IMO an advanced topic.

jondwillis commented 9 years ago

@benjchristensen Naivety. Like I said, I've begun re-writing the app to eliminate some of these novice mistakes. Something like Observable.interval(x).takeUntil(needsCleanup).flatMap(...).subscribe() for some of the cases where I'm using schedulePeriodically and accessing member observables within their subscribes.

edit) @akarnokd is fairly spot on. The API feels familiar and is accessible. Can you elaborate on:

The second cause, I think, is that not using a value from a source and mapping in something else is a strange way of doing async work. (I.e., using just(1).subscribeOn(io()).map(_ -> { doWork(); return 2;}).subscribe();).

benjchristensen commented 9 years ago

Naivety

That's a good enough reason :-) It took me many months before I was using this style of programming correctly.

What would you suggest we improve to provide education and examples for how to do things idiomatically and functionally? I was lucky to have an expert close by when I was learning so I could ask lots of questions. Without that it would have been far harder. As you have learned, what do you wish you could have been provided by the RxJava project early on to teach you?

jondwillis commented 9 years ago

@benjchristensen I'm under a fairly tight deadline right now, but I'll try to review the current education resources and get you some feedback soon.

I picked up RxJava-Android (when it was called that) back in March 2014; my interest was originally piqued having used ReactiveCocoa on iOS. Much of what I've picked up has been from a combination of the documents from https://github.com/ReactiveX/RxJava/wiki/The-RxJava-Android-Module and hard-won from ReactiveCocoa's Github issues. Luckily most of the Rx contract is upheld between the two implementations ;)

Diolor commented 9 years ago

@benjchristensen Not sure if my suggestion is solid. I have seen the following above:

I personally am researching solutions for stitching together artificial callstacks that show the actual user code instead of the JVM callstacks which show the Rx library,

The problem I am facing right now is that somewhere there is not placed an onError so it causes the (android) program to crash. It would be very helpful to see the codeline of the original Action0 (or something similar) so we can easily locate the problem. Of course I do not know the implications of the whole JVM - as you say above.

Update for further people hitting this from google: As a general solution I registered the registerErrorHandler and then:

 Log.w("Error",e);
benjchristensen commented 9 years ago

We are actively exploring this space, but nothing usable yet. Also, it is only for the Hotspot/OpenJDK/Oracle JVMs that we are pursuing anything right now. I don't know what the options are for Android.

Just the other day I saw some native C code via native plugins and also via JNI allowing a logical callstack across async boundaries. Performance is still an issue though. Beyond that I don't have much further to offer as a solution at this time.

We still use println logging (via doOnEach, doOnNext, doOnSubscribe, doOnUnsubscribe, doOnRequest, etc) for our debugging today at Netflix.

corlaez commented 8 years ago

I am toying with RxJava and I had this issue as well. I think ReactiveX should be very careful to explain the difference between Observable.doOnError (side effect, will still blow up) and Subscriber.onError (last consumer, if handled here your error won't pop anymore. At least that was my problem. https://twitter.com/Corlaez/status/782023068793339904

zmin666 commented 8 years ago

maybe callback run in wrong thread. i just meet the problem, and solve it.

gubo commented 7 years ago

it occurs to me that Action1<> can be dangerous to use, given that Observable.subscribe will throw unchecked exception in onError ... ex:

final rx.plugins.RxJavaErrorHandler rxJavaErrorHandler = new rx.plugins.RxJavaErrorHandler() {
            @Override
            public void handleError( final Throwable x ) {
                System.out.println( "rxJavaErrorHandler.handleError: " + x.getClass().getSimpleName() );
            }
        };
        rx.plugins.RxJavaPlugins.getInstance().registerErrorHandler( rxJavaErrorHandler );

        final rx.functions.Action1<Long> action = new rx.functions.Action1<Long>() {
            @Override
            public void call( final Long L ) {
                System.out.println( "tick" );
                try { Thread.sleep( 2500 ); } catch ( InterruptedException x ) {}
            }
        };

        final rx.Subscription subscription = rx.Observable.interval( 100L,TimeUnit.MILLISECONDS )
                .subscribeOn( rx.schedulers.Schedulers.io() )
                .observeOn( rx.schedulers.Schedulers.newThread() )
                .subscribe( action );

this code generates MissingBackpressureException, which propogates to OnErrorNotImplementedException then is wrapped in IllegalStateException ... but in ScheduledAction.run, calls RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); and then regardless calls thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); to me this means you don't really get to "handle" the error in your registered RxJavaPlugins error handler. on android, this will kill the process.

JakeWharton commented 7 years ago

That seems normal. If you don't supply what is the reactive equivalent of a catch block then the exception propagates resulting in app death. Supply an error handler and the problem goes away.

On Sun, Nov 27, 2016, 10:53 PM GuBo notifications@github.com wrote:

it occurs to me that Action1<> can be dangerous to use, given that Observable.subscribe will throw unchecked exception in onError ... ex:

` final rx.plugins.RxJavaErrorHandler rxJavaErrorHandler = new rx.plugins.RxJavaErrorHandler() { @Override https://github.com/Override public void handleError( final Throwable x ) { System.out.println( "rxJavaErrorHandler.handleError: " + x.getClass().getSimpleName() ); } }; rx.plugins.RxJavaPlugins.getInstance().registerErrorHandler( rxJavaErrorHandler );

final rx.functions.Action1<Long> action = new rx.functions.Action1<Long>() {
    @Override
    public void call( final Long L ) {
        System.out.println( "tick" );
        try { Thread.sleep( 2500 ); } catch ( InterruptedException x ) {}
    }
};

final rx.Subscription subscription = rx.Observable.interval( 100L,TimeUnit.MILLISECONDS )
        .subscribeOn( rx.schedulers.Schedulers.io() )
        .observeOn( rx.schedulers.Schedulers.newThread() )
        .subscribe( action );`

this code generates MissingBackpressureException, which propogates to OnErrorNotImplementedException then is wrapped in IllegalStateException ... but in ScheduledAction.run, calls RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); and then regardless calls thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); to me this means you don't really get to "handle" the error in your registered RxJavaPlugins error handler. on android, this will kill the process.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/ReactiveX/RxJava/issues/2293#issuecomment-263178113, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEYQnNW4Hqy1bK6ssdmWzvsUYCyr5ks5rClBLgaJpZM4DNEGU .

gubo commented 7 years ago

but rx.plugins.RxJavaPlugins.getInstance().registerErrorHandler( rxJavaErrorHandler ); was my error handler.

if i put .doOnError( Action1... ) after .observeOn, it gets called but the chain continues on to call the plugin errorhandler, then it calls the actual Subscriber that Observable creates to wrap the Action, and this then throws OnErrorNotImplementedException.

(if i subscribe on a SafeSubscriber, i.e. new rx.Observer<>... then there is not the problem ... problem is with subscribing with an Action1)

akarnokd commented 7 years ago

The problem is this:

.subscribe( action );

You have to handle exception by consuming it via subscribe:

.subscribe( action, error -> errorHandler(error) );
DP0326 commented 5 years ago

I am trying a program in android studio which was provided by ResearchStack. I am acing the same error and I now I should use an Error Handler to solve this issue. But the RxJava program is an executable jar file of RxJava. So where can I add the error handler program?