typelevel / fs2-grpc

gRPC implementation for FS2/cats-effect
MIT License
270 stars 56 forks source link

A closed client can still receive events #638

Open Jasper-M opened 1 year ago

Jasper-M commented 1 year ago

To be more precise, onClose can be called on the underlying Fs2StreamClientCallListener while the client (and its dispatcher) are already closed. I don't have a full end-to-end example, but it comes down to this:

val someServiceResource: Resource[IO, SomeServiceFs2Grpc[IO, Unit]] = ???

someServiceResource
  .map(
    _.someStream(SomeStreamRequest(), ()).interruptAfter(1.second).compile.drain
  )
  .useEval
  .unsafeRunSync()

Output:

Jun 07, 2023 1:52:23 PM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed@13affa0e
java.lang.IllegalStateException: dispatcher already shutdown
        at cats.effect.std.Dispatcher$$anon$2.unsafeToFutureCancelable(Dispatcher.scala:422)
        at cats.effect.std.DispatcherPlatform.unsafeRunTimed(DispatcherPlatform.scala:59)
        at cats.effect.std.DispatcherPlatform.unsafeRunTimed$(DispatcherPlatform.scala:58)
        at cats.effect.std.Dispatcher$$anon$2.unsafeRunTimed(Dispatcher.scala:317)
        at cats.effect.std.DispatcherPlatform.unsafeRunSync(DispatcherPlatform.scala:51)
        at cats.effect.std.DispatcherPlatform.unsafeRunSync$(DispatcherPlatform.scala:50)
        at cats.effect.std.Dispatcher$$anon$2.unsafeRunSync(Dispatcher.scala:317)
        at fs2.grpc.client.Fs2StreamClientCallListener.onClose(Fs2StreamClientCallListener.scala:43)
        at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:468)
        at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:432)
        at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:465)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

So interruptAfter preemptively stops the stream after which the surrounding resource is immediately closed. But onClose still gets called afterwards and invokes the already closed dispatcher.

ahjohannessen commented 1 year ago

What happens if you provide a dispatcher that waits for the completion of active fibers?

Jasper-M commented 1 year ago

Same result. The error seems to happen when submitting a new task to the dispatcher. So the dispatcher is already closed even before the task is started.

ahjohannessen commented 1 year ago

I wonder how we best avoid that error. Gate submission through something?

Jasper-M commented 1 year ago

Perhaps ideally the cancellation triggered by interruptAfter should backpressure until onClose has been called? Assuming onClose will always be called at least once.

I guess the most sensible thing might be to make clients themselves "closable" resources and reject all events after the client has been closed. But then all methods that don't return a Resource yet (def client, def mkClient, def stub) would have to change incompatibly just for this.

ahjohannessen commented 1 year ago

@Jasper-M You are welcome to give it a go, I do not have the bandwidth atm. I suppose it is time to spring clean that client interface in a major bump that allows breaking changes.