EventStore / EventStoreDB-Client-Java

Official Asynchronous Java 8+ Client Library for EventStoreDB 20.6+
https://eventstore.com
Apache License 2.0
61 stars 19 forks source link

UndeliverableException for StreamNotFoundException #244

Open dosomder opened 8 months ago

dosomder commented 8 months ago

We rarely observe the following exception on our servers which crashes the application:

From the stacktrace it looks like the StreamNotFoundException (or at least onError) is emitted twice which is not legal for a Publisher. The first time may be here: https://github.com/EventStore/EventStoreDB-Client-Java/blob/trunk/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java#L108 and the second time here: https://github.com/EventStore/EventStoreDB-Client-Java/blob/trunk/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java#L113 (this line is in the stacktrace).

Although the ReadSubscription tries to protect against this with an AtomicBoolean so I don't know how this exception can still happen. There was some exception handling changes here, not sure if they are related: https://github.com/EventStore/EventStoreDB-Client-Java/commit/3fb47014ce22d4cfae2ad209ccde08d393f64126

Forcing JVM exit because Thread[xxxx-xxxx-xxxxx-16,5,main] threw an uncaught exception
io.reactivex.rxjava3.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.eventstore.dbclient.StreamNotFoundException
    at io.reactivex.rxjava3.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:372)
    at io.reactivex.rxjava3.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:96)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher$PublisherSubscriber.onError(ObservableFromPublisher.java:52)
    at com.eventstore.dbclient.ReadSubscription.onError(ReadSubscription.java:39)
    at com.eventstore.dbclient.AbstractRead.lambda$subscribe$1(AbstractRead.java:113)
    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
    at java.base/java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:1008)
    at java.base/java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2364)
    at com.eventstore.dbclient.AbstractRead.subscribe(AbstractRead.java:112)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher.subscribeActual(ObservableFromPublisher.java:32)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
    at io.reactivex.rxjava3.internal.jdk8.ObservableMapOptional.subscribeActual(ObservableMapOptional.java:42)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableOnErrorComplete.subscribeActual(ObservableOnErrorComplete.java:41)
    at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13262)
    at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtMaybe.subscribeActual(ObservableElementAtMaybe.java:32)
    at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
    at io.reactivex.rxjava3.internal.operators.maybe.MaybeMap.subscribeActual(MaybeMap.java:41)
    at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
    at io.reactivex.rxjava3.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
    at io.reactivex.rxjava3.internal.operators.single.SingleFlatMap.subscribeActual(SingleFlatMap.java:37)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
    at io.reactivex.rxjava3.internal.operators.single.SingleFlatMapMaybe.subscribeActual(SingleFlatMapMaybe.java:38)
    at io.reactivex.rxjava3.core.Maybe.subscribe(Maybe.java:5377)
    at io.reactivex.rxjava3.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
    at io.reactivex.rxjava3.internal.operators.single.SingleResumeNext.subscribeActual(SingleResumeNext.java:39)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
    at io.reactivex.rxjava3.core.Single.blockingGet(Single.java:3644)
    at com.xxx.xxx.XXXXXX.lambda$xxxxxxxxxx$17(XXXXXXXXX.java:260)
    at io.reactivex.rxjava3.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:43)
    at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4855)
    at io.reactivex.rxjava3.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
    at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
    at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:25)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.eventstore.dbclient.StreamNotFoundException: null
    at com.eventstore.dbclient.AbstractRead$1.onNext(AbstractRead.java:62)
    at com.eventstore.dbclient.AbstractRead$1.onNext(AbstractRead.java:48)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:478)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:660)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:647)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    ... 3 common frames omitted

Client: 4.3.0

YoEight commented 8 months ago

Hey @dosomder,

Can you share some info on your runtime workload when it happens? Is there a situation where you delete a stream while another part of your code is reading from it?

dosomder commented 8 months ago

We don't delete a stream in particular but we set maxAge and maxCount.

In this case when the exception happens, we try to read the last event from a stream and expect it to be empty. This is how we read the stream. We handle the StreamNotFoundException in the rxjava chain.

Observable.fromPublisher(
        this.client.readStreamReactive(
            this.getStreamName(),
            ReadStreamOptions.get().fromEnd().backwards().maxCount(1)))
        // Only interested in the actual event
        .filter(ReadMessage::hasEvent)
        .map(ReadMessage::getEvent)
        // Decode event
        .mapOptional(this.translator::decodeEvent)
        // A none existing stream is to be treated as if no events exist
        .onErrorComplete(StreamNotFoundException.class::isInstance);
YoEight commented 8 months ago

@dosomder Sorry for taking time to respond. I think you have diagnosed the issue properly. I need to conduct some tests to see when an exception has been raised in a future, was it already processed by the observer of the subscription or if it's the other way around.

dosomder commented 7 months ago

@YoEight Meanwhile we are on client 5.x and since then (6 weeks now) we have not seen this exception on our environments.

YoEight commented 7 months ago

Unfortunately, I can assure you that the issue is still there, even if it's scarce. I never managed to reproduce it locally but per RX documentation, the way we just the library in this context is wrong.