scalapb / zio-grpc

ScalaPB meets ZIO: write purely functional gRPC services and clients using ZIO
Apache License 2.0
259 stars 82 forks source link

Python client receives out-of-order stream #611

Open luan-xiaokun opened 6 months ago

luan-xiaokun commented 6 months ago

Hi all, I have a server implemented in Scala using zio-grpc, and a client in Python.

I found that sometimes the stream received by the client is out of order. For example, the sequence of data sent by the server is [1, 2, 3, 4, 5], but those received by the client are [1, 2, 4, 3, 5]. Though I'm new to grpc, I don't think this is an expected behavior.

Here is a minimal example: https://drive.google.com/file/d/1Eew2sOhjSt2tCBEupE1glo6PALYkB0t1/view

import io.grpc.StatusException
import scalapb.zio_grpc.ServerMain
import scalapb.zio_grpc.ServiceList
import zio.{ZIO, stream}
import zio.stream.ZStream
import example.demo._

class DemoService extends ZioDemo.Demo {
  def foo(request: FooRequest): stream.Stream[StatusException, FooResponse] = {
    ZStream.fromIterable(getStream)
  }

  private def getStream: List[FooResponse] = List.range(0, 100).map(i => FooResponse(i.toString))
}

object Main extends ServerMain {
  override def port: Int = 8981

  override def services: ServiceList[Any] =
    ServiceList.add(new DemoService)
}
import grpc
import demo_pb2
import demo_pb2_grpc

def get_streaming_response(stub):
    responses = list(stub.Foo(demo_pb2.FooRequest(data="hello")))
    if any(r.data != str(i) for i, r in enumerate(responses)):
        print("Expected:", list(range(len(responses))))
        print("Got:     ", [int(r.data) for r in responses])
        print("Diff:    ", [i for i, r in enumerate(responses) if r.data != str(i)])
        raise ValueError("Out of order response")

if __name__ == "__main__":
    with grpc.insecure_channel("localhost:8981") as channel:
        demo_stub = demo_pb2_grpc.DemoStub(channel)

        n = 10000
        for _ in range(n):
            get_streaming_response(demo_stub)

#### output looks like:
# Expected: [0, 1, 2, ...]
# Got:      [0, 1, 2, ...]
# Diff:     [x, y, ...]

According to my testing results, this happens rarely (with n=10000 this will almost always happen), and usually only one or two pairs of adjacent items are swapped.

I'm not sure if this problem is with zio-grpc or Pyhton's grpcio, below is some relevant information: OS: Ubuntu 22.04 Python version: 3.10.13 grpcio version: 1.51.1 (couldn't find 1.50.1) grpcio-tools version: 1.51.1 grpc version: 4.25.3 Scala version: 2.13.13 grpc-netty version: 1.50.1

thesamet commented 6 months ago

I was able to reproduce - looks like it's fairly common for messages to go out of order when streaming. Testing this with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE=1000 makes the problem go away. Testing with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE=-1 results in the Python client receiving an error. This suggests backpressue logic needs to be fixed. @regiskuckaertz @cipriansofronia @ghostdogpr - will one of you have time to look into this?

ghostdogpr commented 6 months ago

The main difference with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE being -1 and 1000 is that when using 1000, we call stream.buffer which loses the chunking. With -1, we keep the chunking which is quite high here since ZStream.fromIterable uses DefaultChunkSize = 4096. When we keep the chunking, we're doing chunk.foreach(call.call.sendMessage) in a single IO, while when we lose it, we're doing individual call.call.sendMessage per IO, which is much slower but I guess contributes to keeping the order. I am a little surprised that sendMessage doesn't guarantee the order if we call it very quickly 🤔

regiskuckaertz commented 6 months ago

I've been playing with it for a while and it is hard to reproduce on my end. But what I noticed is that this is definitely not a bug in this library. To check this I changed the handler to:

ZChannel.readWithCause(
          xs =>
            ZChannel.fromZIO(GIO.attempt {
              println("--")
              println(xs.mkString(","))
              xs.foreach(call.call.sendMessage)
            }) *> ...

and then the user code stops when finding a reproducer:

ZioTestservice.GreeterClient
                 .sayHelloStreaming(HelloRequest(request = Some(Hello(name = "Testing streaming"))))
                 .runCollect
                 .repeatUntilZIO { ys =>
                   if (ys.map(_.i) != (0 until 100)) {
                     println(ys.map(_.i).mkString(", "))
                     ZIO.succeed(true)
                   } else ref.updateAndGet(_ - 1).map(_ == 0)
                 }

What I observed is that:

0, 1, ..., 84, 83, ..., 99

and

--
HelloReply(84, UnknownFieldSet(Map()))
--
HelloReply(83, UnknownFieldSet(Map()))

I also note that the bug only happens (at least for me) when using ZStream.fromIterable(0 until 100), I was unable to observe it is with ZStream.iterate(0)(_ + 1).take(100) for instance—doesn't mean it can't happen for someone else though.

Anyway, this appears to be a bug in the stream runtime and it will be fun to find it 🤠

regiskuckaertz commented 6 months ago

xs always has size 1, which is surprising to me but fair enough

that's because map rechunks in ZStream.fromIterable(0 until 100).map(HelloReply(_))

ghostdogpr commented 6 months ago

xs always has size 1, which is surprising to me but fair enough

that's because map rechunks in ZStream.fromIterable(0 until 100).map(HelloReply(_))

Isn't it because of buffer that destroys chunking rather than map?

ghostdogpr commented 6 months ago

Small reproducer using zio only:

import zio.*
import zio.stream.*

object Test extends ZIOAppDefault {
  val expected = Chunk.fromIterable(0 until 100)
  val s        = ZStream.fromChunk(expected).buffer(16)

  def run = s.runCollect.map(_ == expected).debug.repeatWhile(identity)
}

It seems to work without buffer. Will open an issue on the zio repo.

ghostdogpr commented 6 months ago

https://github.com/zio/zio/issues/8699

regiskuckaertz commented 6 months ago

xs always has size 1, which is surprising to me but fair enough

that's because map rechunks in ZStream.fromIterable(0 until 100).map(HelloReply(_))

Isn't it because of buffer that destroys chunking rather than map?

I had added a rechunk and forgotten about it 😅

regiskuckaertz commented 6 months ago

Speaking of buffer, I thought about using bufferChunks instead, that would avoid the rechunking, though one would need to change the size of the queue. Thoughts?

ghostdogpr commented 6 months ago

I actually added https://github.com/scalapb/zio-grpc/pull/578 to not buffer at all 😆 But yeah bufferChunks sounds better than buffer.