helios741 / myblog

觉得好请点小星星,欢迎有问题交流(issue/email)
109 stars 21 forks source link

浅谈python中的多进程 #62

Open helios741 opened 4 years ago

helios741 commented 4 years ago

一、为什么要实现多进程编程

浅谈python中的多线程编程中说到了,因为Python中的GIL机制,python的多线程不能充分利用多核CPU。

简单的说GIL就是对于一个进程来说,同时只能处理其中的一个线程

在python中,如果要充分利用计算机多核的特性或者说要进行高计算量的任务的话就要用的多进程。

多进程 多线程 备注
对资源要求 OS创建一个进程的耗费肯定要比创建一个线程浪费的多,具体可以参考我的两个笔记内核创建进程创建线程
高计算场景 不适用 适用 多线程在高计算的场景下容易阻塞,因为就用一个核
高I/O场景 适用 中度适用 虽然多进程也适用于高I/O,但是进程间通信慢类的原因,可是导致还不如多线程

等以后将完协程,会把这个表进一步完善。

二、多进程编程

1. 通过操作系统的fork函数

import os
print("not fork")
pid = os.fork()
print("already fork")

if pid == 0:
    print("my is child process id is: {}, father is: {}".format(os.getpid(), os.getppid()))
else:
    print("my is father, pid is :{}".format(pid))

output:

not fork
already fork
my is father, pid is :78348
already fork
my is child process id is: 78348, father is: 78347

在调用fork之后,操作系统就会产生一个新的进程,所有就会执行两次pid = os.fork()下面的逻辑:

2. 通过multiprocess模块

import multiprocessing
import time
def foo(p):
    print("current process name is {}".format(multiprocessing.current_process().name))
    time.sleep(1)
    print("this is p: {}".format(p))

multiprocessing.Process(target=foo, name="test_process", args=["params"]).start()

print("main end")

ouput:

main end
current process name is test_process
this is p: params

很明显这个要主进程等到子进程结束才会结束,那么如果我们想要把子进程变为后台运行,可以看下面这个例子:

import multiprocessing
import time
def foo(p):
    print("current process name is {}".format(multiprocessing.current_process().name))
    time.sleep(1)
    print("this is p: {}".format(p))

p = multiprocessing.Process(target=foo, name="test_process", args=["params"])

p.daemon = True
p.start()
# time.sleep(2)
print("main end")

主进程会根据子进程的daemon属性来决定这个子进程的命运:

3. 通过继承multiprocessing.Process

import multiprocessing

class MyProcess(multiprocessing.Process):
    def __init__(self, name):
        super().__init__(name=name)
    def run(self):
        print("child process running")

p = MyProcess("test_process_class")
p.start()
print("child process name is: {}".format(p.name))
print("main end")

三、进程间管理状态

python的多进程模块提供了在多进程中共享信息的对象(Manager),该对象能保证当一个进程修改了共享对象的时候,所有的进程都能拿到更新后的共享对象。

import multiprocessing
import time
def worker(d, k, v):
    time.sleep(1)
    d[k] = v
    print("key = {}, value = {}".format(k, v))

mgr = multiprocessing.Manager()
dict = mgr.dict()

[multiprocessing.Process(target=worker, args=(dict, i, i*2)).start() for i in range(10)]
time.sleep(2)
print(dict)

output:

key = 1, value = 2
key = 2, value = 4
key = 0, value = 0
key = 3, value = 6
key = 4, value = 8
key = 6, value = 12
key = 5, value = 10
key = 7, value = 14
key = 8, value = 16
key = 9, value = 18
{1: 2, 2: 4, 0: 0, 3: 6, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}

三、进程通信

 四、进程同步

1. 通过Lock

下面的程序多次执行的话,结果可能会不同:

import multiprocessing
import time
def worker(d):
    v = d.get("k", 0)
    d["k"] = v + 1
mgr = multiprocessing.Manager()
dict = mgr.dict()
[multiprocessing.Process(target=worker, args=(dict,)).start() for i in range(10)]
time.sleep(2)
print(dict)

通过加锁的方式能够解决:

import multiprocessing
import time

def worker(d, lock):
    with lock:
        v = d.get("k", 0)
        d["k"] = v + 1

lock = multiprocessing.Lock()
mgr = multiprocessing.Manager()
dict = mgr.dict()
[multiprocessing.Process(target=worker, args=(dict, lock, )).start() for i in range(10)]
time.sleep(2)
print(dict)

2. 通过信号(Semaphore)

表示最多能够多有多少个进程操作共享资源:

import multiprocessing
import time
def worker(sema, i):
    sema.acquire()
    print(multiprocessing.current_process().name + " acquire")
    time.sleep(i)
    print(multiprocessing.current_process().name + " release")
    sema.release()
s = multiprocessing.Semaphore(2)
[multiprocessing.Process(target=worker, args=(s, i * 1.5)).start() for i in range(5)]

time.sleep(10)

3. 通过Event

内部维护一个变量,当变量为true时候,wait将被唤醒或者调用wait不被阻塞。 一个进程发送事件信号,一个进程等待事件信号。主要有下面三个API:

import multiprocessing
import time

def event_set(e):
    time.sleep(2)
    print("start set")
    e.set()
    e.clear()
    print("end set")

def event_wait(e, i):
    time.sleep(i)
    print("start wait {}".format(i))
    e.wait()
    print("end wait {}".format(i))

eve = multiprocessing.Event()

multiprocessing.Process(target=event_wait, args=(eve, 1)).start()
multiprocessing.Process(target=event_set, args=(eve,)).start()
multiprocessing.Process(target=event_wait, args=(eve, 3)).start()

time.sleep(5)

结果是只有一个进程会被唤醒,其余的都继续等待。

4. 通过Condition

5. Barrier

栅栏类提供一个简单的同步原语,用于应对固定数量的进程需要彼此相互等待的情况。进程调用 wait() 方法后将阻塞,直到所有进程都调用了 wait() 方法。此时所有进程将被同时释放。

就是当一个任务需要多个进程同时合作完成的时候就派上了用场。

import multiprocessing
import time

def worker(bar, i):
    time.sleep(i)
    print("barrier {}  start".format(i))
    bar.wait()
    print("barrier  {} end".format(i))

bar = multiprocessing.Barrier(2)
multiprocessing.Process(target=worker, args=(bar, 1)).start()
multiprocessing.Process(target=worker, args=(bar, 2)).start()

time.sleep(3)

1. 通过queue

2.

总结

参考