0rpc / zerorpc-python

zerorpc for python
http://www.zerorpc.io
Other
3.17k stars 378 forks source link

need a little help with streaming (python server) async/await #267

Open devzzzero opened 1 week ago

devzzzero commented 1 week ago

Hi!

I looked at [https://github.com/0rpc/zerorpc-node/issues/96] and #131

My python server code is all async/await code. I managed to get to limp along with zerorpc using nest_asyncio

Underneath the hood, the server will periodically generate items into a fifo queue, and I want the zerorpc server to return that to a node.js client.

I am sort of as loss as to how to engineer this.

Ideally, on the python server side, when the new blob is available, I'd like to have it sent back to the client. (I am currently saving it onto a deque for test purposes.)

Is there a pattern to adapt an infinite producer (which periodically produces blobs) and have it sent back to the client via the zerorpc server?

I'd like avoid having the server do its own zerorpc call, as its on a async/await world.

Here is the pseudocode

# deep inside the async world, I have a loop similar to below:
async def myserver(...):
  try:
    responseQueue = someCtx['responseQueue']
     async for response in someclient.doRequest():
      newItem = getItem(response)
      # currently, newItem is just saved for testing purposes
      responseQueue.append(newItem)
  except Exception as e:
    ...

In other words, on the SYNCHRONOUS zerorpc server code, how can I check whether len(responseQueue) > 0, and then send back each queued item one at a time? Is there a pattern to do this effectively?

Alternatively, is there a callback I can access from the ASYNC side, i.e. as soon as a newItem is generated, I can do something like

ZRPC =  getZeroRPCConnection(??)
await ZRPC.sendOneReponse(toJSONString(newItem))

from the async code blob?

This means that hopefully the 'sendOneResponse()' should not be a blocking call...

I suppose it's possible for the server to act as its own client and send one item at a time, but the same issue of blocking vs async API still applies, I guess.

Am I just S.O.L.? Or is there some magic pattern to adapt an async generator to a SYNC generator? (i.e. somehow adapt an async yield to a sync yield?)

Thank you.