btlines / grpcakkastream

Use GRPC services with the Akka-stream API
MIT License
69 stars 12 forks source link

ReactiveStreams violation in Server streaming calls #4

Closed btlines closed 7 years ago

btlines commented 7 years ago

There is a ReactiveStreams specification violation in server streaming call.

The reason is that the publisher used in Source.fromPublisher doesn't respect the reactive streams specification (by ignoring the subscription).

It should call onSubscribe on the subscriber and provide a Subscription by which the subscriber can request more items.

See: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#specification

[error] (run-main-0) java.lang.IllegalStateException: Shutting down because of violation of the Reactive Streams specification.
java.lang.IllegalStateException: Shutting down because of violation of the Reactive Streams specification.
        at akka.stream.impl.fusing.GraphInterpreterShell.tryAbort(ActorGraphInterpreter.scala:600)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:727)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:740)
        at akka.actor.Actor.aroundReceive(Actor.scala:514)
        at akka.actor.Actor.aroundReceive$(Actor.scala:512)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:650)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
        at akka.actor.ActorCell.invoke(ActorCell.scala:496)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16
        at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:113)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.dequeue(ActorGraphInterpreter.scala:144)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.onNext(ActorGraphInterpreter.scala:172)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:78)
        at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute(ActorGraphInterpreter.scala:40)
        at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute$(ActorGraphInterpreter.scala:38)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:75)
        at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:546)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:725)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:740)
        at akka.actor.Actor.aroundReceive(Actor.scala:514)
        at akka.actor.Actor.aroundReceive$(Actor.scala:512)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:650)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
        at akka.actor.ActorCell.invoke(ActorCell.scala:496)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
        at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:112)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.dequeue(ActorGraphInterpreter.scala:144)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.onNext(ActorGraphInterpreter.scala:172)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:78)
        at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute(ActorGraphInterpreter.scala:40)
        at akka.stream.impl.fusing.ActorGraphInterpreter$SimpleBoundaryEvent.execute$(ActorGraphInterpreter.scala:38)
        at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary$OnNext.execute(ActorGraphInterpreter.scala:75)
        at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:546)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:725)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:740)
        at akka.actor.Actor.aroundReceive(Actor.scala:514)
        at akka.actor.Actor.aroundReceive$(Actor.scala:512)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:650)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
        at akka.actor.ActorCell.invoke(ActorCell.scala:496)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
btlines commented 7 years ago

Solved by https://github.com/btlines/grpcakkastream/pull/5