alaisi / postgres-async-driver

Asynchronous PostgreSQL Java driver
Apache License 2.0
287 stars 38 forks source link

Consider not calling onNext(null) #33

Open cretz opened 7 years ago

cretz commented 7 years ago

Null values can be difficult for different reactive streams implementations to handle (e.g. https://github.com/monix/monix/issues/252). I understand it can cause compatibility concerns, but possibly changing com.github.pgasync.Transaction::commit to return Observable<?> or Observable<Object> instead of Observable<Void> and return something like Optional.empty() may be cleaner (and apply the same thing to other places where onNext(null) is invoked).

alexandru commented 7 years ago

To add some context, the problem is that interfaces like java.util.Queue are using null to signal an empty queue on methods such as poll(). See: https://docs.oracle.com/javase/7/docs/api/java/util/Queue.html#poll()

This is then implemented by all concurrent queue implementations, like Java's own ConcurrentLinkedQueue, or the ones in JCTools.org.

And reactive streams implementations, such as Monix, use concurrent queues to implement buffering on asynchronous boundaries, where needed. RxJava is also using the queue implementations in JCTools. Even if you've been allowed to push null values downstream and it worked until now, because of these queue implementations pushing nulls is basically a minefield.

alexandru commented 7 years ago

Here's another clearer reason for why null is not allowed - null parameters are forbidden by the Reactive Streams specification, see rule 2.13 - if the Monix implementation doesn't throw a NullPointerException then it wouldn't be compliant with that spec.

alaisi commented 7 years ago

Hi,

you're correct, but due to backward compatibility I'm hesitant to change observables emitting a single null to Observable.empty(). But for example returning something like Observable<Transaction.State> with onNext(Transaction.State.ACTIVE) from Transaction::begin would probably be a good compromise? The same goes for the other null-emitting observables.

alexandru commented 7 years ago

@alaisi if that's some kind of enum, then it sounds good. IMO the problem is that Observables aren't really meant for single values. Those are usually Future abstractions, though granted in Java those suck at least until ver 8, in @Monix we have Task and I believe RxJava had Single, you might want to check that out.

cretz commented 7 years ago

@alaisi - I'd say that's fine, just note you still have a compat issue if people saved off the Observable<Void> but I agree that it is less of a compat concern than not emitting anything. I don't think you need to change begin, just commit and rollback. I agree with @alexandru that you should utilize Single if you can stand the compat break (and everywhere else where it's a single job with a single emitted value), but otherwise an observable that emits a single non-null value should be fine.

alaisi commented 7 years ago

Ok, great. For Single, that will probably be taken into using when porting to RxJava 2.0.

cretz commented 6 years ago

For any others, note, this is causing me production issues at https://github.com/alaisi/postgres-async-driver/blob/71e761c677db1ce1c2b7a57b96a528ace18d61b6/src/main/java/com/github/pgasync/impl/netty/NettyPgProtocolStream.java#L304. Here is the stack trace from my Play app:

play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[MissingBackpressureException: null]]
        at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:255)
        at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:182)
        at play.core.server.AkkaHttpServer$$anonfun$2.applyOrElse(AkkaHttpServer.scala:310)
        at play.core.server.AkkaHttpServer$$anonfun$2.applyOrElse(AkkaHttpServer.scala:308)
        at scala.concurrent.Future.$anonfun$recoverWith$1(Future.scala:414)
        at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:37)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
Caused by: rx.exceptions.MissingBackpressureException: null
        at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:353)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:379)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:361)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
        at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101)
        at com.github.pgasync.impl.PgConnection$1.onNext(PgConnection.java:130)
        at com.github.pgasync.impl.PgConnection$1.onNext(PgConnection.java:121)
        at com.github.pgasync.impl.netty.NettyPgProtocolStream$1.onNext(NettyPgProtocolStream.java:204)
        at com.github.pgasync.impl.netty.NettyPgProtocolStream$5.channelRead(NettyPgProtocolStream.java:304)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)

Not really sure what to do about it (working on moving to another lib) but figured I'd post the stack trace to help anyone else Googling.