zengbin93 / blog

17 stars 10 forks source link

实践 - Python - 高性能编程 #23

Open zengbin93 opened 6 years ago

zengbin93 commented 6 years ago

高性能编程重点关注提高并发数,降低响应时间。通常实现方法是使用多线程、多进程、分布式等。

参考资料

zengbin93 commented 6 years ago

多进程 - Multiprocessing

Multiprocessing是一个跨平台的多进程模块,同时提供本地和远程的并发支持。进程间通信可以通过Queue、Pipe等实现。

Process对象


创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动。join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

以下是使用Process创建多进程程序的案例:

import multiprocessing as mp
import time
import random

def worker(process_id, sent):
    print('I am worker {}, my sentence is {}'.format(process_id, sent))
    time.sleep(random.random() * 10)
    print('worker {} finished his job!'.format(process_id))

if __name__ == '__main__':
    words = ['what', 'a', 'beautiful', 'world', 'enjoy', 'it']
    args = [(i, s) for i, s in enumerate(words)]
    process = [mp.Process(target=worker, args=i) for i in args]
    for p in process:
        p.start()
    for p in process:
        p.join()

创建多进程的另外一种方法是继承Process类,改写run方法,我不推荐这种方式,因为这个不太符合python的编程习惯。

为了理解Process的工作原理,可以看看所有独立进程id:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Pool对象


from multiprocessing import Pool, cpu_count

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=cpu_count()) as p:
        print(p.map(f, [1, 2, 3]))

案例:使用Queue进行任务分发和结果收集

import time
import random
from multiprocessing import Process, Queue, current_process

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # 创建两个队列,分别作为任务队列和结果队列
    task_queue = Queue()
    done_queue = Queue()

    # 向任务队列中put任务
    for task in TASKS1:
        task_queue.put(task)

    # 多进程执行(启动后会一直存在,直到收到stop命令)
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # 从结果队列中获取任务执行结果
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # 使用 `put()` 向队列中添加新的任务,这些任务会立即执行
    for task in TASKS2:
        task_queue.put(task)

    # 从结果队列中获取新结果
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # 向所有子进程发送 STOP 指令
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')

if __name__ == '__main__':
    test()
zengbin93 commented 6 years ago

Queue

Queue是进程和线程安全的对象。

zengbin93 commented 6 years ago

多线程