vert-x3 / vertx-rx

Reactive Extensions for Vert.x
Apache License 2.0
145 stars 72 forks source link

ObservableReadStream: add pause before setting handlers and resume after onSubscribe #292

Closed anguisa closed 1 year ago

anguisa commented 1 year ago

Motivation:

Stream handlers are defined in Subscription.set() - before calling onSubscribe on observer. For JDBCSQLRowStream setting handler causes readBatch() (https://github.com/vert-x3/vertx-jdbc-client/blob/1e9c8cb039735de6cc66400555da62093cfb7d7b/src/main/java/io/vertx/ext/jdbc/impl/actions/JDBCSQLRowStream.java#L133). Sometimes emitting items from readBatch happens before calling o.onSubscribe(sub) in subscribeActual. For example, we can receive such exception if next operator after ObservableHelper.toObservable(readStream) is observeOn: java.lang.NullPointerException: Cannot invoke "io.reactivex.internal.fuseable.SimpleQueue.offer(Object)" because "this.queue" is null at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.onNext(ObservableObserveOn.java:117) at io.vertx.reactivex.impl.ObservableReadStream$Subscription.lambda$set$2(ObservableReadStream.java:61) at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239) at io.vertx.core.streams.impl.InboundBuffer.emitPending(InboundBuffer.java:187) at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:163) at io.vertx.ext.jdbc.impl.actions.JDBCSQLRowStream.lambda$readBatch$3(JDBCSQLRowStream.java:186) at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141) at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60) at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:196) at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:164) at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) at io.vertx.core.impl.ContextBase.setResultHandler(ContextBase.java:37) at io.vertx.core.impl.ContextInternal.executeBlocking(ContextInternal.java:129) at io.vertx.ext.jdbc.impl.actions.JDBCSQLRowStream.readBatch(JDBCSQLRowStream.java:161) at io.vertx.ext.jdbc.impl.actions.JDBCSQLRowStream.handler(JDBCSQLRowStream.java:136) at io.vertx.ext.sql.impl.RowStreamWrapper.handler(RowStreamWrapper.java:58) at io.vertx.ext.sql.impl.RowStreamWrapper.handler(RowStreamWrapper.java:32) at io.vertx.reactivex.impl.ObservableReadStream$Subscription.set(ObservableReadStream.java:60) at io.vertx.reactivex.impl.ObservableReadStream$Subscription.access$300(ObservableReadStream.java:26) at io.vertx.reactivex.impl.ObservableReadStream.subscribeActual(ObservableReadStream.java:97) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableWindow.subscribeActual(ObservableWindow.java:39) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableFlatMapSingle.subscribeActual(ObservableFlatMapSingle.java:48) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.mixed.SingleFlatMapObservable$FlatMapObserver.onSuccess(SingleFlatMapObservable.java:109) at io.reactivex.internal.operators.single.SingleDoFinally$DoFinallyObserver.onSuccess(SingleDoFinally.java:73) at io.reactivex.internal.operators.single.SingleDoOnSubscribe$DoOnSubscribeSingleObserver.onSuccess(SingleDoOnSubscribe.java:77) at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback$FlatMapSingleObserver.onSuccess(SingleFlatMap.java:111) at io.reactivex.internal.operators.single.SingleDoOnError$DoOnError.onSuccess(SingleDoOnError.java:52) at io.reactivex.internal.operators.single.SingleMap$MapSingleObserver.onSuccess(SingleMap.java:64) at io.vertx.reactivex.impl.AsyncResultSingle.lambda$subscribeActual$0(AsyncResultSingle.java:49) at io.vertx.lang.rx.DelegatingHandler.handle(DelegatingHandler.java:20) at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141) at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54) at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833)

Changing the order of sub.set() and o.onSubscribe(sub) also fixes the problem, but it seems not correct.

Using Flowable instead of Observable (FlowableHelper.toFlowable) works correct. FlowableReadStream uses pause().

Should I add stream.pause() in the constructor (as in FlowableReadStream) or something else?

tsegismont commented 1 year ago

Can you please file an issue with the details of the problem please? Thank you

Regarding the fix, I believe we should pause the stream immediately in the constructor. Then yes, we shall resume when subscription is effective.

anguisa commented 1 year ago

The issue - https://github.com/vert-x3/vertx-rx/issues/293 I added stream.pause() in the constructor and also in subscribeActual() (as it is done in FlowableReadStream - https://github.com/vert-x3/vertx-rx/blob/a99c786fa55066bf1b105a87afc92e3f7c22f357/rx-java2-gen/src/main/java/io/vertx/reactivex/impl/FlowableReadStream.java#L75)