omnilib / aiomultiprocess

Take a modern Python codebase to the next level of performance.
https://aiomultiprocess.omnilib.dev
MIT License
1.77k stars 101 forks source link

[Question]: A usecase for numpy processing with frequent IO communication #8

Closed zuoxingdong closed 6 years ago

zuoxingdong commented 6 years ago

I am trying to use aiomultiprocess in my project which the simplified idea of current implementation is

# pip install gym
import gym
import numpy as np
from multiprocessing import Process, Pipe

def worker(master_conn, worker_conn):
    master_conn.close()

    env = gym.make('Pendulum-v0')
    env.reset()

    while True:
        cmd, data = worker_conn.recv()

        if cmd == 'close':
            worker_conn.close()
            break
        elif cmd == 'step':
            results = [env.step(data) for _ in range(1000)]
            worker_conn.send(results)

class Master(object):
    def __init__(self):
        self.master_conns, self.worker_conns = zip(*[Pipe() for _ in range(10)])
        self.list_process = [Process(target=worker, args=[master_conn, worker_conn], daemon=True) 
                             for master_conn, worker_conn in zip(self.master_conns, self.worker_conns)]
        [p.start() for p in self.list_process]
        [worker_conn.close() for worker_conn in self.worker_conns]

    def go(self, actions):
        [master_conn.send(['step', action]) for master_conn, action in zip(self.master_conns, actions)]
        results = [master_conn.recv() for master_conn in self.master_conns]

        return results

    def close(self):
        [master_conn.send(['close', None]) for master_conn in self.master_conns]
        [p.join() for p in self.list_process]

master = Master()
master.go(np.random.rand(10, 1))

It has a lot of IO communication through Pipes, I am wondering how could I speed it up with aiomultiprocess

amyreese commented 6 years ago

Sorry I missed this. Given your example code, where the individual work items appear to be CPU bound, I'm not sure aiomultiprocess would help you much. aiomultiprocess depends on serializing and deserializing data between parent/child processes, which still takes the GIL in both processes. If your workload instead was itself IO bound, such as making or waiting on network requests, then aiomultiprocess might be worth considering.