vert-x3 / vertx-rx

Reactive Extensions for Vert.x
Apache License 2.0
145 stars 72 forks source link

SQLRowStream toObservable can result in NullPointerException #293

Closed anguisa closed 1 year ago

anguisa commented 1 year ago

Version

This bug exists in the latest version which is 5.0.0-SNAPSHOT

Context

Stream handlers are defined in Subscription.set() - before calling onSubscribe on observer. For JDBCSQLRowStream setting handler causes readBatch() (https://github.com/vert-x3/vertx-jdbc-client/blob/1e9c8cb039735de6cc66400555da62093cfb7d7b/src/main/java/io/vertx/ext/jdbc/impl/actions/JDBCSQLRowStream.java#L133). Sometimes emitting items from readBatch happens before calling o.onSubscribe(sub) in subscribeActual. For example, we can receive such exception if next operator after ObservableHelper.toObservable(readStream) is observeOn:

java.lang.NullPointerException: Cannot invoke "io.reactivex.internal.fuseable.SimpleQueue.offer(Object)" because "this.queue" is null at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.onNext(ObservableObserveOn.java:117) at io.vertx.reactivex.impl.ObservableReadStream$Subscription.lambda$set$2(ObservableReadStream.java:61) at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239) at io.vertx.core.streams.impl.InboundBuffer.emitPending(InboundBuffer.java:187) at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:163) at io.vertx.ext.jdbc.impl.actions.JDBCSQLRowStream.lambda$readBatch$3(JDBCSQLRowStream.java:186) at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141) at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60) at io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:196) at io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) at io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:164) at io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) at io.vertx.core.impl.ContextBase.setResultHandler(ContextBase.java:37) at io.vertx.core.impl.ContextInternal.executeBlocking(ContextInternal.java:129) at io.vertx.ext.jdbc.impl.actions.JDBCSQLRowStream.readBatch(JDBCSQLRowStream.java:161) at io.vertx.ext.jdbc.impl.actions.JDBCSQLRowStream.handler(JDBCSQLRowStream.java:136) at io.vertx.ext.sql.impl.RowStreamWrapper.handler(RowStreamWrapper.java:58) at io.vertx.ext.sql.impl.RowStreamWrapper.handler(RowStreamWrapper.java:32) at io.vertx.reactivex.impl.ObservableReadStream$Subscription.set(ObservableReadStream.java:60) at io.vertx.reactivex.impl.ObservableReadStream$Subscription.access$300(ObservableReadStream.java:26) at io.vertx.reactivex.impl.ObservableReadStream.subscribeActual(ObservableReadStream.java:97) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableWindow.subscribeActual(ObservableWindow.java:39) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.observable.ObservableFlatMapSingle.subscribeActual(ObservableFlatMapSingle.java:48) at io.reactivex.Observable.subscribe(Observable.java:12284) at io.reactivex.internal.operators.mixed.SingleFlatMapObservable$FlatMapObserver.onSuccess(SingleFlatMapObservable.java:109) at io.reactivex.internal.operators.single.SingleDoFinally$DoFinallyObserver.onSuccess(SingleDoFinally.java:73) at io.reactivex.internal.operators.single.SingleDoOnSubscribe$DoOnSubscribeSingleObserver.onSuccess(SingleDoOnSubscribe.java:77) at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback$FlatMapSingleObserver.onSuccess(SingleFlatMap.java:111) at io.reactivex.internal.operators.single.SingleDoOnError$DoOnError.onSuccess(SingleDoOnError.java:52) at io.reactivex.internal.operators.single.SingleMap$MapSingleObserver.onSuccess(SingleMap.java:64) at io.vertx.reactivex.impl.AsyncResultSingle.lambda$subscribeActual$0(AsyncResultSingle.java:49) at io.vertx.lang.rx.DelegatingHandler.handle(DelegatingHandler.java:20) at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141) at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54) at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833)

Possible fix

https://github.com/vert-x3/vertx-rx/pull/292

tsegismont commented 1 year ago

Fixed in 66648352