capnproto / pycapnp

Cap'n Proto serialization/RPC system - Python bindings
BSD 2-Clause "Simplified" License
458 stars 125 forks source link

read() and read_multiple() block when consuming from python-managed pipes or sockets #354

Open fungs opened 5 months ago

fungs commented 5 months ago

Hi, I know that there is ongoing work to remove or reduce the dependency on raw file descriptors in #283 and #311. However, this issue I have is with the existing implementation (reference 2.0.0b2) and how it works with pipe (os.pipe) and socket (socket.socket) objects in Python, which do expose usable raw file descriptors via fileno().

We can construct a pipe using

pipe_read, pipe_write = os.pipe()

Data can be generated or read in one thread (or process) like

chunk_size = 1024**2  # 1 MiB
with os.fdopen(pipe_write, mode = "wb") as write_file:
  while chunk := file_like_source.read(chunk_size):
    write_file.write(chunk)

and be consumed in another thread like

with os.fdopen(pipe_read, mode = "rb") as read_file:
  for item in MyStruct.read_multiple(read_file):
    do_something(item)

What I observe is, that both MyStruct.read_multiple(read_file) and write_file.write(chunk) block, if chunk_size is shorter than the serialized struct item. I hypothesize, that this has to do how the reader peaks into the data, which is in fact a stream, without actually consuming it, but I don't know.

Strangely, if a process outside Python generates the stream via process = subprocess.Popen() and writes it into a pipe via standard output using stdout=subprocess.PIPE, read_multiple() can read it without issues using process.stdout.

Maybe someone has an idea why this happens and how it could be circumvented or fixed? Happy to hear your thoughts.

LasseBlaauwbroek commented 5 months ago

My first guess would be that your second snippet is somehow blocking in the C++ code, while not relinquishing the GIL. That would prevent the other thread from continuing, creating a deadlock. However, the following snippet shows that the GIL is being released since #308: https://github.com/capnproto/pycapnp/blob/1fb168796d622e1cd19b2353a43ef5bb0b15d5a1/capnp/lib/capnp.pyx#L3872-L3873 Are you sure you are using a pycapnp version that is new enough?

fungs commented 5 months ago

I finally figured out how to make this approach work using processes, not threads!

Most imporantanly, for the pipe version, the buffer on the write side has to be set to 0 using os.fdopen(pipe_write, mode = "wb", buffering=0):

chunk_size = 1024**2  # 1 MiB
with os.fdopen(pipe_write, mode = "wb", buffering=0) as write_file:
  while chunk := file_like_source.read(chunk_size):
    write_file.write(chunk)

The socket version using socket.socket seems to work with buffers on the sender and receiver side.

There are also fallpits when using a process, in particular using os.pipe(): depending on the mode and platform (fork, spawn), you may have to transfer the file descriptors into the child process or make sure to close the file descriptors in both, parent and child process, otherwise the reader will wait for EOF and not terminate.

However, when I run the exact same code using threading.Thread() instead of multiprocessing.Process, the data channel blocks, still. For me, this is a strong indication, that there is an issue involving the GIL. In my case, the parent is actually a thread spawned from the main process, which could also be an issue?