NaikSoftware / StompProtocolAndroid

STOMP protocol via WebSocket for Android
MIT License
589 stars 265 forks source link

Not connected yet #123

Open bauer-bao opened 5 years ago

bauer-bao commented 5 years ago

io.reactivex.exceptions.OnErrorNotImplementedException: Not connected yet at io.reactivex.internal.observers.EmptyCompletableObserver.onError(EmptyCompletableObserver.java:51) at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual(CompletableFromCallable.java:40) at io.reactivex.Completable.subscribe(Completable.java:2171) at io.reactivex.Completable.subscribe(Completable.java:2159) at ua.naiksoftware.stomp.client.StompClient.lambda$connect$0$StompClient(StompClient.java:121) at ua.naiksoftware.stomp.client.StompClient$$Lambda$0.accept(Unknown Source) at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63) at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:308) at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:228) at ua.naiksoftware.stomp.AbstractConnectionProvider.emitLifecycleEvent(AbstractConnectionProvider.java:110) at ua.naiksoftware.stomp.OkHttpConnectionProvider$1.onOpen(OkHttpConnectionProvider.java:61) at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:209) at okhttp3.RealCall$AsyncCall.execute(RealCall.java:153) at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:776) Caused by: java.lang.IllegalStateException: Not connected yet at ua.naiksoftware.stomp.AbstractConnectionProvider.lambda$send$0$AbstractConnectionProvider(AbstractConnectionProvider.java:75) at ua.naiksoftware.stomp.AbstractConnectionProvider$$Lambda$2.call(Unknown Source) at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual(CompletableFromCallable.java:36) at io.reactivex.Completable.subscribe(Completable.java:2171)  at io.reactivex.Completable.subscribe(Completable.java:2159)  at ua.naiksoftware.stomp.client.StompClient.lambda$connect$0$StompClient(StompClient.java:121)  at ua.naiksoftware.stomp.client.StompClient$$Lambda$0.accept(Unknown Source)  at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63)  at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:308)  at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:228)  at ua.naiksoftware.stomp.AbstractConnectionProvider.emitLifecycleEvent(AbstractConnectionProvider.java:110)  at ua.naiksoftware.stomp.OkHttpConnectionProvider$1.onOpen(OkHttpConnectionProvider.java:61)  at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:209)  at okhttp3.RealCall$AsyncCall.execute(RealCall.java:153)  at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)  at java.lang.Thread.run(Thread.java:776)  java.lang.IllegalStateException: Not connected yet at ua.naiksoftware.stomp.AbstractConnectionProvider.lambda$send$0$AbstractConnectionProvider(AbstractConnectionProvider.java:75) at ua.naiksoftware.stomp.AbstractConnectionProvider$$Lambda$2.call(Unknown Source) at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual(CompletableFromCallable.java:36) at io.reactivex.Completable.subscribe(Completable.java:2171) at io.reactivex.Completable.subscribe(Completable.java:2159) at ua.naiksoftware.stomp.client.StompClient.lambda$connect$0$StompClient(StompClient.java:121) at ua.naiksoftware.stomp.client.StompClient$$Lambda$0.accept(Unknown Source) at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63) at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:308) at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:228) at ua.naiksoftware.stomp.AbstractConnectionProvider.emitLifecycleEvent(AbstractConnectionProvider.java:110) at ua.naiksoftware.stomp.OkHttpConnectionProvider$1.onOpen(OkHttpConnectionProvider.java:61) at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:209) at okhttp3.RealCall$AsyncCall.execute(RealCall.java:153) at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:776)

App crash when sending message after Emit lifecycle event: OPENED

BilalSiddiqui commented 5 years ago

Having same issue

forresthopkinsa commented 5 years ago

Do you have onError implemented on your websocket stream? That's not the problem, but that's why you're getting this ugly stack trace.

The problem is that you're sending messages before you have an open websocket.

NaikSoftware commented 5 years ago

Not necessary wait for connection, library will do this instead you:)

NaikSoftware commented 5 years ago

If you have same issue, please post snippet for reproduce or write more info..

BilalSiddiqui commented 5 years ago

For the time being this crash is occurring random because i am unable to find the reason behind. Sometime code is working perfectly and sometimes crashes repeatedly.

 mStompClient = Stomp.over(Stomp.ConnectionProvider.OKHTTP, "wss://api." + serverUrl + "/mp/ws/websocket");
        resetSubscriptions();
        Disposable dispLifecycle = mStompClient.lifecycle()
                .subscribeOn(Schedulers.io())
                .doOnError(error -> Log.d(TAG, "The error message is: " + error.getMessage()))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(lifecycleEvent -> {
                    switch (lifecycleEvent.getType()) {
                        case OPENED:
                            Log.i(TAG, "Stomp connection opened");
                            break;
                        case ERROR:
                            Log.e(TAG, "Stomp connection error", lifecycleEvent.getException());
                            break;
                        case CLOSED:
                            Log.e(TAG, "Stomp connection closed");
                            resetSubscriptions();
                            connectScoket();
                            break;
                        case FAILED_SERVER_HEARTBEAT:
                            Log.e(TAG, "Stomp failed server heartbeat");
                            break;
                    }
                }, throwable -> {
                    Log.e(TAG, "Error on subscribe topic", throwable);
                });
        compositeDisposable.add(dispLifecycle);
        subscribeDirectMessages();
        mStompClient.connect();

    private void subscribeDirectMessages() {
        if (DataHolder.getInstance().getDirectChannels() != null && !DataHolder.getInstance().getDirectChannels().isEmpty()) {
            int size = DataHolder.getInstance().getDirectChannels().size();
            for (int i = 0; i < size; i++) {
//                if (DataHolder.getInstance().getDirectChannels().get(i) instanceof ParticipantsMeeting) {
                Disposable dispTopic = mStompClient.topic("/exchange/amq.direct/direct-discussionchannels." + DataHolder.getInstance().getDirectChannels().get(i).id)
                        .subscribeOn(Schedulers.io())
                        .doOnError(error -> Log.d(TAG, "The error message is: " + error.getMessage()))
                        .onErrorReturn(error ->
                                StompMessage.from(error.getMessage())
                        )
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(topicMessage -> {
                            Log.d(TAG, "Received " + topicMessage.getPayload());
                            handleDirectMessage(topicMessage);
                        }, error -> Log.d(TAG, "onError should not be printed!"));
                compositeDisposable.add(dispTopic);

            }
        }

    }
bauer-bao commented 5 years ago

@BilalSiddiqui so do I. My crash log is from third log statistics platform.

BilalSiddiqui commented 5 years ago

@NaikSoftware Is there anything we can do to avoid this error.

manfcas commented 5 years ago

Looks like a race condition: the openSocket field is nullified in OkHttpConnectionProvider.createWebSocketConnection() on another thread, so AbstractConnectionProvider.send(String stompMessage) finds it null (in my case when StompCommand.CONNECT is sent).

Gosunet commented 3 years ago

Hello i am facing the same issue on a small percentage of my client app.

Same stack trace : java.lang.IllegalStateException: Not connected yet at ua.naiksoftware.stomp.AbstractConnectionProvider.lambda$send$0$AbstractConnectionProvider()(AbstractConnectionProvider.java:77) at ua.naiksoftware.stomp.-$$Lambda$AbstractConnectionProvider$i_moq_Q2vigeLagkjZBHpwYcZXU.call()(undefined:4) at io.reactivex.internal.operators.completable.CompletableFromCallable.subscribeActual()(CompletableFromCallable.java:35) at io.reactivex.Completable.subscribe()(Completable.java:1517) at io.reactivex.Completable.subscribe()(Completable.java:1505) at ua.naiksoftware.stomp.client.StompClient.lambda$connect$0$StompClient()(StompClient.java:121) at ua.naiksoftware.stomp.client.-$$Lambda$StompClient$mOosLpglfnf-qRWLaT0yM_6Q4ME.accept()(undefined:6) at io.reactivex.internal.observers.LambdaObserver.onNext()(LambdaObserver.java:59) at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext()(PublishSubject.java:263) at io.reactivex.subjects.PublishSubject.onNext()(PublishSubject.java:182) at ua.naiksoftware.stomp.AbstractConnectionProvider.emitLifecycleEvent()(AbstractConnectionProvider.java:112) at ua.naiksoftware.stomp.OkHttpConnectionProvider$1.onOpen()(OkHttpConnectionProvider.java:61) at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse()(RealWebSocket.kt:194) at okhttp3.internal.connection.RealCall$AsyncCall.run()(RealCall.kt:519) at java.util.concurrent.ThreadPoolExecutor.runWorker()(ThreadPoolExecutor.java:1167) at java.util.concurrent.ThreadPoolExecutor$Worker.run()(ThreadPoolExecutor.java:641) at java.lang.Thread.run()(Thread.java:923)

Nielssg commented 1 year ago

The issue can be mitigated by replacing the following line in this library's source code, which I have copied from issue #144:

AbstractConnectionProvider.java, line 77

Replace throw new IllegalStateException("Not connected");

With return Completable.error(new IllegalStateException("Not connected"));

This solved the issue with my app crashing, but note that it does NOT solve the actual problem, there is of course a reason why your socket is null.