ReactiveX / RxAndroid

RxJava bindings for Android
Apache License 2.0
19.89k stars 2.94k forks source link

RxAndroid ThreadPool issue #364

Closed cuihujun closed 7 years ago

cuihujun commented 7 years ago

not sure why I am getting these errors, I try to implement BleService and when I call the addNotification() after 30s I will get the threadpoll reject error.

public class BleService extends Service { //... mConnectionObservable = prepareConnectionObservable(); private Observable prepareConnectionObservable() { return mRxBleDevice .establishConnection(this, false) .takeUntil(disconnectTriggerSubject) .doOnUnsubscribe(this::clearSubscription) .compose(new ConnectionSharingAdapter()); } // ...

mConnectionObservable .doOnSubscribe(new Action0() { @Override public void call() { Log.w(TAG, "call: start connect"); } }) .doOnNext(rxBleConnection -> Log.w(TAG, "call: Connct Sucess")) .doOnNext(rxBleConnection -> Log.i(TAG, "call: start find service")) .compose(timeoutJustFirstEmit(20, TimeUnit.SECONDS)) .flatMap(RxBleConnection::discoverServices) .doOnNext(this::onDiscoverServices) .retryWhen(errors -> errors.flatMap((Func1<Throwable, Observable<?>>) error -> Observable.timer(5, TimeUnit.SECONDS))) .subscribe(rxBleConnection -> Log.i(TAG, "success"), throwable -> Log.e(TAG, "failed" + throwable.getMessage(), throwable)); //...

public void addNotification(String servUuid, String chaUuid) { if (isConnected()) { mConnectionObservable .flatMap(rxBleConnection -> rxBleConnection.setupNotification(charactoristicUuid)) .doOnNext(observable -> notificationHasBeenSetUp(charactoristicUuid, observable)) .flatMap(notificationObservable -> notificationObservable) .subscribe(bytes -> { onNotificationReceived(serviceUuid, charactoristicUuid, bytes); }, this::onNotificationSetupFailure); } } // ... }

rxandroidtest W/BluetoothGatt: Unhandled exception in callback rx.exceptions.OnErrorNotImplementedException: Task rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker@902dda3 rejected from java.util.concurrent.ThreadPoolExecutor@95f260f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 261] at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386) at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383) at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44) at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:153) at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:60) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.subscribe(Observable.java:10236) at rx.Observable.subscribe(Observable.java:10203) at rx.Observable.subscribe(Observable.java:10008) at com.polidea.rxandroidble.internal.connection.RxBleGattCallback$1.onCharacteristicChanged(RxBleGattCallback.java:132) at android.bluetooth.BluetoothGatt$1.onNotify(BluetoothGatt.java:485) at android.bluetooth.IBluetoothGattCallback$Stub.onTransact(IBluetoothGattCallback.java:399) at android.os.Binder.execTransact(Binder.java:453) Caused by: java.util.concurrent.RejectedExecutionException: Task rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker@902dda3 rejected from java.util.concurrent.ThreadPoolExecutor@95f260f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 261] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2014) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:794) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1340) at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:584) at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:79) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.schedule(OperatorObserveOn.java:188) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$1.request(OperatorObserveOn.java:145) at rx.Subscriber.setProducer(Subscriber.java:209) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.init(OperatorObserveOn.java:139) at rx.internal.operators.OperatorObserveOn.call(OperatorObserveOn.java:75) at rx.internal.operators.OperatorObserveOn.call(OperatorObserveOn.java:40) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:44) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)  at rx.Observable.subscribe(Observable.java:10236)  at rx.Observable.subscribe(Observable.java:10203)  at rx.Observable.subscribe(Observable.java:10008)  at com.polidea.rxandroidble.internal.connection.RxBleGattCallback$1.onCharacteristicChanged(RxBleGattCallback.java:132)  at android.bluetooth.BluetoothGatt$1.onNotify(BluetoothGatt.java:485)  at android.bluetooth.IBluetoothGattCallback$Stub.onTransact(IBluetoothGattCallback.java:399)  at android.os.Binder.execTransact(Binder.java:453) 

JakeWharton commented 7 years ago

This isn't related to RxAndroid and your shown code doesn't contain observevOn which is the source of the problem. You'll have better luck posting on the RxJava mailing lists or StackOverflow withe the 'rxjava' tag and a complete code example that demonstrates the problem. If you think it's a bug in RxJava then you can file an issue on that project with a runnable code example that reproduces the problem or a failing test case.