laixintao / python-parallel-programming-cookbook-cn

📖《Python Parallel Programming Cookbook》中文版
http://python-parallel-programmning-cookbook.readthedocs.io/
1.49k stars 93 forks source link

第三章第7小节管道实现进程通信部分 #90

Open Bestporter opened 2 years ago

Bestporter commented 2 years ago

函数create_items、multiply_items部分无注释,看了好久才明白。 第一个费解的地方pipe_1 = multiprocessing.Pipe(True),不知道这个Pipe具体干了什么的,查看源码后发现就是弄了两个队列,然后建立连接,至于至于send,就是使用的Queue的put函数,recv就是使用的Queue的get函数,用两个队列实现双工。

# 源代码位于multiprocessing/dummy/connection.py
def Pipe(duplex=True):
    a, b = Queue(), Queue()
    return Connection(a, b), Connection(b, a)

class Connection(object):

    def __init__(self, _in, _out):
        self._out = _out
        self._in = _in
        self.send = self.send_bytes = _out.put
        self.recv = self.recv_bytes = _in.get

    def poll(self, timeout=0.0):
        if self._in.qsize() > 0:
            return True
        if timeout <= 0.0:
            return False
        with self._in.not_empty:
            self._in.not_empty.wait(timeout)
        return self._in.qsize() > 0

    def close(self):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.close()

自己写了一下文中的实现,p1_a与p1_b是一组能够通信的,p2_a与p2_b是一组互相能通信的

import multiprocessing

def create_items(pipe):
    p1_a, p1_b = pipe
    for item in range(10):
        p1_a.send(item)
    p1_a.close()

def multiply_items(pipe_1, pipe_2):
    p1_a, p1_b = pipe_1
    p1_a.close()
    p2_a, p2_b = pipe_2
    try:
        while True:
            item = p1_b.recv()
            p2_a.send(item * item)
    except EOFError:
        p2_a.close()

if __name__== '__main__':
    # 第一个进程管道发出数字
    pipe_1 = multiprocessing.Pipe(True)
    process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()
    # 第二个进程管道接收数字并计算
    pipe_2 = multiprocessing.Pipe(True)
    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
    process_pipe_2.start()
    pipe_1[0].close()
    pipe_2[0].close()
    try:
        while True:
            print(pipe_2[1].recv())
    except EOFError:
        print("End")