dabeaz / curio

Good Curio!
Other
4.02k stars 241 forks source link

How can a task and a process communicate #261

Closed skgbanga closed 5 years ago

skgbanga commented 6 years ago

In the section how can a task and a thread communicate, it is suggested to use UniversalQueue to communicate between them.

I was wondering if the same can be used for processes as well.

The code in the section above doesn't work for processes:

import curio
import multiprocessing

def producer(queue):
    for n in range(10):
        queue.put(n)
    queue.join()

async def consumer(queue):
    while True:
        item = await queue.get()
        print('Consumer: got', item)
        await queue.task_done()

async def main():
    q = curio.UniversalQueue()
    prod_task = multiprocessing.Process(target=producer, args=(q,))
    prod_task.start()
    cons_task = await curio.spawn(consumer, q)
    # to wait for th process to finish
    await curio.run_in_thread(prod_task.join)
    await cons_task.cancel()

if __name__ == '__main__':
    curio.run(main, with_monitor=True)

The above process just hangs, and both the main and consumer tasks are in FUTURE_WAIT state.

For normal synchronous code, the recommended (?) way is to use manager instances. What is your recommendation for async/curio code?

skgbanga commented 6 years ago

Oh I think this question has already been asked here: https://github.com/dabeaz/curio/issues/216

Please feel free to close this.

skgbanga commented 6 years ago

Ah, one can also use curio.abide to use other synchronization primitives:

import curio
import multiprocessing

def producer(queue):
    for n in range(10):
        queue.put(n)
    queue.join()

async def consumer(queue):
    while True:
        item = await curio.abide(queue.get)
        print('Consumer: got', item)
        await curio.abide(queue.task_done)

async def main():
    m = multiprocessing.Manager()
    q = m.Queue()
    prod_task = multiprocessing.Process(target=producer, args=(q,))
    prod_task.start()

    cons_task = await curio.spawn(consumer, q)
    await curio.run_in_thread(prod_task.join)
    await cons_task.cancel()

if __name__ == '__main__':
    curio.run(main, with_monitor=True)

I think the above might be a good example to add in the "how to" section to communicate with outside world (processes)

imrn commented 6 years ago

Multiprocessing documentation mentions that data is shared via shared memory between python processes. But same page also has this note: "When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe".

What exactly does python do at the background when using multiprocessing and how does curio utilize it?

dabeaz commented 6 years ago

All things equal, use of multiprocessing with curio scares me a bit. Curio has a fairly complex runtime environment potentially involving an async event loop, threads, and signal handling. Internally, Curio uses multiprocessing.Process objects, but they are launched using the "spawn" method of multiprocessing which creates them in an entirely clean interpreter process.

My general thought on external processes is to rely on the Channel machinery described here: http://curio.readthedocs.io/en/latest/reference.html#module-curio.channel

It's not stated in the docs, but I'm pretty sure Curio Channel objects are wire-compatible with multiprocessing connection objects. So, one could have Curio on one side of the connection and normal synchronous Python code on the other (using multiprocessing.connection).