spitfire-sidra / myapollo-issues

#utterances
0 stars 0 forks source link

blog/python-aiomultiprocess-study/ #18

Open utterances-bot opened 3 months ago

utterances-bot commented 3 months ago

Python aiomultiprocess 套件是如何運作的? - MyApollo

aiomultiprocess 是 1 個既實用又有趣的套件。 它實用的地方在於能將多個 coroutines 分散到多個 processes 執行(底層使用 multiprocessing 模組),藉此提升 asyncio 的效能。 而有趣的地方在於 multiprocessing 並不是 1 個 asynchronous 模組,而 aiomultiprocess 卻能將 multiprocessing 與 asyncio 整合再一起

https://myapollo.com.tw/blog/python-aiomultiprocess-study/

KuanJuChenHermanChen commented 3 months ago

試著自己寫,理解一遍,run 起來 ok ~

剛好最近在寫的專案有這個需求,感謝作者 ~

import asyncio
import os
import queue
import threading
import time
from multiprocessing import Process, Queue

from aiohttp import request

async def run_async(tx: Queue, rx: Queue) -> None:
    while True:
        try:
            co, args = tx.get_nowait()
            task = asyncio.create_task(co(*args))
            result = await task
            rx.put(result)
        except queue.Empty:
            break
        except TypeError:
            break

def run(tx: Queue, rx: Queue) -> None:
    asyncio.run(run_async(tx, rx))

async def get(url):
    start_time = time.time()
    pid = os.getpid()
    tid = threading.get_ident()
    evid = id(asyncio.get_event_loop())
    async with request("GET", url) as response:
        end_time = time.time()
        return (
            f"Process ID: {pid}", 
            f"Thread ID: {tid}", 
            f"Event Loop ID: {evid}", 
            f"URL: {url}", 
            f"Status: {response.status}",
            f"Start Time: {start_time}",
            f"End Time: {end_time}",
            f"Duration: {end_time - start_time}"
        )        

async def main(workers: int) -> None:
    tx = Queue()
    rx = Queue()
    processes: list[Process] = []

    tx.put((get, ("https://example.com", )))
    tx.put((get, ("https://example.com", )))
    tx.put((get, ("https://example.com", )))
    tx.put((get, ("https://example.com", )))

    for _ in range(workers):
        p = Process(target=run, args=(tx, rx))
        p.start()
        processes.append(p)

    count = 0
    while True:
        print(rx.get())
        count += 1
        if count == 4:
            break

    for p in processes:
        p.join()

if __name__ == '__main__':
    asyncio.run(main(4))
spitfire-sidra commented 3 months ago

@KuanJuChenHermanChen 讚!以下這部分最好也改成 rx.get_nowait(),原本文章的寫法太偷懶,我也一併更新了 :)

    count = 0
    while True:
        print(rx.get())
        count += 1
        if count == 4:
            break
KuanJuChenHermanChen commented 3 months ago

昨天仔細看過一遍程式碼,目前模擬應該有 70% 程度了

1. 簡化輪詢機制

2. 簡化任務回收機制

python version: 3.11.9

import asyncio
import math
import os
import threading
import time
import queue
from multiprocessing import Process

from aiohttp import ClientSession
import multiprocessing

TASKS = 200 # 任務數量
CHILD_PROCESSES = 10 # 要開的子進程數量
TASK_PER_PROCESS = 50 # 每個子進程的生命週期內要處理的任務數量
CHILD_CONCURRENCY = 15  # 每個子進程同時併發的 task 數量上限
BATCH_SIZE = 20  # 每次從 rx Queue 中取出的 task 數量

def run(tx: queue.Queue, rx: queue.Queue):
    asyncio.run(run_async(tx, rx))

async def run_async(tx: queue.Queue, rx: queue.Queue):
    semaphore = asyncio.Semaphore(CHILD_CONCURRENCY)
    tasks: list[asyncio.Task] = []
    task_count = 0
    while task_count <= TASK_PER_PROCESS:
        try:                   
            co, args = tx.get(timeout=1)
            await asyncio.sleep(0.05)
            task_count += 1
            task = asyncio.create_task(limited_run(semaphore, co, *args))
            tasks.append(task)         
        except queue.Empty:
            print(f"PID: {os.getpid()} -> Queue is empty")
            break
        except TypeError:
            break

    print(f"PID: {os.getpid()} -> {time.time()} -> 該 Process 分配到的任務數量: {task_count}")

    if tasks:
        for task in asyncio.as_completed(tasks):
            result = await task
            rx.put_nowait(result)    
    print(f"PID: {os.getpid()} -> 任務處理完畢")

async def limited_run(semaphore, co, *args):
    async with semaphore:  # use semaphore to limit process concurrency
        return await co(*args)

async def get(url):
    start_time = time.time()
    pid = os.getpid()
    tid = threading.get_ident()
    evid = id(asyncio.get_event_loop())
    async with ClientSession() as session:
        async with session.get(url) as response:
            time.sleep(1)
            end_time = time.time()
            return (
                f"Process ID: {pid}", 
                f"Thread ID: {tid}", 
                f"Event Loop ID: {evid}", 
                f"URL: {url}", 
                f"Status: {response.status}",
                f"Start Time: {start_time}",
                f"End Time: {end_time}",
                f"Duration: {end_time - start_time}"
            )

async def run_pocess(tx: queue.Queue, rx: queue.Queue):
    while True:
        while not tx.empty():
            processes: list[Process] = []
            _start_time = time.time()
            for _ in range(CHILD_PROCESSES):
                p = Process(target=run, args=(tx, rx))
                p.start()
                processes.append(p)     
            for p in processes:
                p.join()
                print("Process ID: ", p.pid, "子進程已經結束")      
            count = 0
            while count < TASKS:
                batch_results = []
                for _ in range(BATCH_SIZE):
                    if count < TASKS:
                        try:
                            result = rx.get_nowait()
                            batch_results.append(result)
                            count += 1
                        except queue.Empty:
                            break
            for result in batch_results:
                # print(result)
                ...
            print(f"Duration: {time.time() - _start_time}")
            await asyncio.sleep(1)
            print("執行下一批任務")

        print("所有任務已經處理完畢,程式休息 60 秒,再去檢查是否有新任務")
        await asyncio.sleep(60)

async def main():
    manager = multiprocessing.Manager()
    urls = ["https://example.com"] * TASKS

    tx = manager.Queue()
    rx = manager.Queue()

    count = 0
    while True:
        try:        
            for url in urls:
                tx.put((get, (url,)))
            count += 1
            if count == 5:
                break    
            await asyncio.sleep(0.05)
        except KeyboardInterrupt:
            break    

    task = asyncio.create_task(run_pocess(tx, rx))

    await asyncio.gather(task)

if __name__ == '__main__':
    asyncio.run(main())