dano / aioprocessing

A Python 3.5+ library that integrates the multiprocessing module with asyncio
Other
652 stars 33 forks source link

AioPipe and AioQueue performance #27

Open kaotika opened 5 years ago

kaotika commented 5 years ago

Hi,

I wanted to measure the time it takes to send/receive some basic values (float) from a process to another. One test with a pipe and another with a queue.

The code in short:

On my dev machine (i7-3520M, 3.6GHz, 16 GB Ram) I got around 0.5-1.9ms for pipes and 0.9-1.1 ms for queues. I expected pipes and queues to be faster than ~1ms. Are my expectations or my testcode wrong?

#!/usr/bin/env python3
# -*- coding=utf-8 -*-

import aioprocessing
import asyncio
from collections import namedtuple
import os
import sys
import time

async def sec_to_ms(timediff):
    return timediff * 1000

async def start_pipe_listener(exit_event, pipe):
    pid = os.getpid()
    while not exit_event.is_set():
        send_time = await pipe.coro_recv()
        recv_time = time.time()
        timediff = await sec_to_ms(recv_time - send_time)
        print(f"Received from manager pipe, took {timediff} ms on pid {pid}")

async def start_queue_listener(exit_event, queue, pid=os.getpid()):
    pid = os.getpid()
    while not exit_event.is_set():
        send_time = await queue.coro_get()
        recv_time = time.time()
        timediff = await sec_to_ms(recv_time - send_time)
        print(f"Received from manager queue, took {timediff} ms on pid {pid}")

def worker(exit_event, pipe, queue):
    pid = os.getpid()

    # define new loop inside this worker
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    print(f"Worker started as {pid} with loop {hex(id(loop))}")

    pipe_task = asyncio.ensure_future(start_pipe_listener(exit_event, pipe))
    queue_task = asyncio.ensure_future(start_queue_listener(exit_event, queue))

    try:
        loop.run_until_complete(asyncio.wait([pipe_task, queue_task]))
    except KeyboardInterrupt:
        print(f"\nGracefully shutting down worker {pid}")
    finally:
        loop.close()

async def _pipe_sender(pipe):
    pipe.manager_pipe.coro_send(time.time())

async def _queue_sender(queue):
    queue.put(time.time())

async def process_manager(exit_event, worker_count=1):
    pid = os.getpid()
    print(f"Manager started as {pid}")

    DuplexPipe = namedtuple('DuplexPipe', ['manager_pipe', 'worker_pipe'])

    pipelines = [
        DuplexPipe(*aioprocessing.AioPipe())
        for w in range(worker_count)
    ]
    queues = [
        aioprocessing.AioQueue()
        for w in range(worker_count)
    ]
    processes = [
        aioprocessing.AioProcess(
            target=worker, args=(exit_event, pipelines[i].worker_pipe, queues[i]))
        for i in range(worker_count)
    ]
    [p.start() for p in processes]

    while not exit_event.is_set():
        print(f"Manager Ticker {pid}")
        asyncio.gather(*[_pipe_sender(pipe) for pipe in pipelines])
        asyncio.gather(*[_queue_sender(queue) for queue in queues])
        await asyncio.sleep(1)

    # print("shutting down processes and manager")
    await asyncio.wait([p.coro_join() for p in processes])

def main():
    pid = os.getpid()
    loop = asyncio.get_event_loop()

    print(f"Main started as {pid} with loop {hex(id(loop))}")

    worker_count = 1
    exit_event = aioprocessing.AioEvent()
    manager_task = asyncio.ensure_future(
        process_manager(exit_event, worker_count))

    try:
        done, pending = loop.run_until_complete(asyncio.wait([manager_task]))
    except KeyboardInterrupt:
        print("\nCtrl-C received")
        exit_event.set()
        manager_task.cancel()
        done, pending = loop.run_until_complete(asyncio.wait([manager_task]))
    finally:
        loop.close()

if __name__ == "__main__":
    main()
dmgolembiowski commented 5 years ago

I noticed the line recv_time = time.time() near the top and I'm wondering if that might be causing your problem (but I'm not 100%). I thought Raymond Hettinger said (at the 2017 San Francisco Bay python event), at one point, that most of the standard library didn't support asyncio because it's written entirely in blocking calls. Perhaps you can maneuver this by using a coroutine like: recv_time = asyncio.subprocess.subprocess.time.time() and maybe wrap in a async with aioprocessing.AioLock()

dmgolembiowski commented 5 years ago

I noticed the line recv_time = time.time() near the top and I'm wondering if that might be causing your problem (but I'm not 100%). I thought Raymond Hettinger said (at the 2017 San Francisco Bay python event), at one point, that most of the standard library didn't support asyncio because it's written entirely in blocking calls. Perhaps you can maneuver this by using a coroutine like: recv_time = asyncio.subprocess.subprocess.time.time() and maybe wrap in a async with aioprocessing.AioLock()

Alternatively, an (unlikely) source could be caused by the collections.namedtuple https://lwn.net/Articles/731423/

Joshuaalbert commented 2 years ago

@kaotika Any update on this? I'd like to understand the latency with aioprocessing before making the decision to use it for a project. Cheers

mahinoresearch commented 2 years ago

@kaotika, @Joshuaalbert To identify the performance of the queue mechanism, you may find it helpful to minimise the impact of your test process. Rather than doing some processing on each data item sent via the queues, try sending 1,000,000 items from a generator and time how long that takes overall. Compare your result to the time taken by the generator alone, sending the same data to your timing system without using queues. The difference will be the queue overhead. On a round trip from main() to an echo worker, via one queue in each direction, I have no problem getting 25k round trip messages sent. The rate limiting factor in my case is still the infrastructure and not the queues. The latency you obtain in practice will depend on how long the queue is (i.e. how many items in it). You can further accelerate things if you short-cut sending and receiving by presuming the queues are neither empty nor full. For example, you might insert items into the queue like this:

try:
    queue.put_nowait(item)
except QueueFull:
    await queue.coro_put(item)

In general, for a processing chain using queues, the framework of your application will have more impact on the throughput and latency than the performance of the queues themselves. Options for improving performance are fairly well covered in the literature - ensure that producer and consumer tasks use "pull" rather than "push" techniques, wait somewhere sensible, and handoff the asyncio loop cooperatively, even using await asyncio.sleep(0) if required, etc. If the queues really do become the rate limiting factor in your system, consider batching items rather than sending one by one, or consider using multiple queues, so the the data rate gets up to the pace you seek.