grpc / grpc-java

The Java gRPC implementation. HTTP/2 based RPC
https://grpc.io/docs/languages/java/
Apache License 2.0
11.39k stars 3.83k forks source link

Tracking issue for StreamObservers #4694

Open carl-mastrangelo opened 6 years ago

micheljung commented 1 year ago

If you have a method like this (Kotlin):


  @Transactional
  override fun get(
    request: DummyRequest,
    responseObserver: StreamObserver<DummyResponse>,
  ) {
    myStreamingDatabaseQuery()
      .iterator()
      .forEach { responseObserver.onNext(it) }

    responseObserver.onCompleted()
  }

  fun myStreamingDatabaseQuery(): Stream<Dummy> = ...

It will work fine as the transaction is opened, the stream is processed, and the transaction is closed.

However, you might then run into memory issues and try to use StreamObservers like this:

  @Transactional
  override fun get(
    request: DummyRequest,
    responseObserver: StreamObserver<DummyResponse>,
  ) {
    val iterator = myStreamingDatabaseQuery().iterator()

    StreamObservers.copyWithFlowControl(
      iterator,
      responseObserver as CallStreamObserver<DummyResponse>,
    )
  }

  fun myStreamingDatabaseQuery(): Stream<Dummy> = ...

This won't work because StreamObservers starts processing the stream after get() has returned. However, the transaction will then have been closed and you can no longer read the stream.

Any idea how to solve this?

micheljung commented 1 year ago

The issue extends to closing the stream, which my code above lacks.

ejona86 commented 11 months ago

@Transactional assumes all processing is done within the method. That requires the method to be blocking. But for streaming you have to implement the service method in an async fashion (in order to get onReady() callbacks). So @Transactional is simply incompatible with gRPC streaming until we have some blocking API on server-side. But that's not specific to the StreamObservers utility.

larry-safran commented 11 months ago

Agreed to mark it deprecated because it is of minimal usefulness and is not frequently used (nothing in g3 or sourcegraph). Prefer to have grpc-java usages of it replaced, instead of creating an iterator just use onReady.

ejona86 commented 10 months ago

We should close this when the class is actually removed, which is also the point the reference to this issue is removed.