h2non / paco

Small utility library for coroutine-driven asynchronous generic programming in Python
http://paco.rtfd.io
MIT License
202 stars 13 forks source link

Current limitations of map and each #38

Open cgarciae opened 6 years ago

cgarciae commented 6 years ago

Hi! First of I think paco is a very nice library and would like to help improve it. That said I have a particular problem: I need to download millions of images as fast as possible. I looked into these resources:

Using paco my initial code was:

import os
import aiohttp
import aiofiles
import paco

urls = [
    "https://static.pexels.com/photos/67843/splashing-splash-aqua-water-67843.jpeg",
    "https://cdn.pixabay.com/photo/2016/10/27/22/53/heart-1776746_960_720.jpg",
    "http://www.qygjxz.com/data/out/240/4321276-wallpaper-images-download.jpg"
] * 1000000

def download_file(path):
    async def do_download_file(url):

        filename = os.path.basename(url)
        filepath = os.path.join(path, filename)

        print(f"Downloading {url}")

        async with aiohttp.request("GET", url) as resp:
            context = await resp.read()

        print(f"Completed {filename}")

        async with aiofiles.open(filepath, "wb") as f:
            await f.write(context)

    return do_download_file

coro = paco.each(download_file(path))
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)

I like the API of paco.each but when testing it my computer froze as its memory blew up while trying to create 1 million coroutines. The main problem is in these lines of code:

# paco/each.py

for index, value in enumerate(iterable):
    pool.add(collector(index, value))

I observe the following:

  1. It creates all the objects in memory before starting their tasks
  2. It also assumes that the collection fits in memory
  3. It also assumes that the collection is fast to iterate over
  4. Preserves order (nice to have)

Since my problem speed and memory then 1 to 3 are more relevant. I recreated the map and each using asyncio.Queue and limiting the amount of tasks to exist at the same time. This involved creating and structure I called Stream that just holds a coroutine and a Queue. My API enforces the limit on each to not surpass that amount of objects in memory.

urls = [
    "https://static.pexels.com/photos/67843/splashing-splash-aqua-water-67843.jpeg",
    "https://cdn.pixabay.com/photo/2016/10/27/22/53/heart-1776746_960_720.jpg",
    "http://www.qygjxz.com/data/out/240/4321276-wallpaper-images-download.jpg"
] * 1000000

path = "/data/tmp/images"

stream = from_iterable(urls)
coro = each(download_file(path), stream, limit = 10)

loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_async_exception)
loop.run_until_complete(coro)

Both the new from_iterable and map functions have queue_maxsize parameter that further limits how the data flows and enforces a back-pressure mechanism. The code is at the end. I wanted to share the experiment and also open the possibility of creating a paco.stream module to continue the life of this code.

from collections import namedtuple
import asyncio

DONE = object()

Stream = namedtuple("Stream", "coroutine queue")

def _active_tasks(tasks):
    return [ task for task in tasks if not task.done() ]

def _f_wrapper(f, queue = None):
    async def __f_wrapper(x):

        y = f(x)

        if hasattr(y, "__await__"):
            y = await y

        if queue is not None:
            await queue.put(y)

    return __f_wrapper

async def _task_limit(tasks, limit):

    tasks = _active_tasks(tasks)

    while len(tasks) >= limit:
        await asyncio.sleep(0)

        tasks = _active_tasks(tasks)

    return tasks

def map(f, stream, limit = 0, queue_maxsize = 0):
    coroin = stream.coroutine
    qin = stream.queue

    qout = asyncio.Queue(maxsize = queue_maxsize)

    async def _map(f):
        coroin_task = asyncio.ensure_future(coroin)

        tasks = []
        f = _f_wrapper(f, queue = qout)

        x = await qin.get()

        while x is not DONE:

            if limit:
                tasks = await _task_limit(tasks, limit)

            fcoro = f(x)
            ftask = asyncio.ensure_future(fcoro)
            tasks.append(ftask)

            x = await qin.get()

        # await tasks
        tasks = _active_tasks(tasks)
        while len(tasks) > 0:
            await asyncio.sleep(0)
            tasks = _active_tasks(tasks)

        await qout.put(DONE)

        await coroin_task

    return Stream(_map(f), qout)

def from_iterable(iterable, queue_maxsize = 0):
    qout = asyncio.Queue(maxsize=queue_maxsize)

    async def _from_iterable():

        for x in iterable:
            await qout.put(x)

        await qout.put(DONE)

    return Stream(_from_iterable(), qout)

def each(f, stream, limit = 0):
    coroin = stream.coroutine
    qin = stream.queue

    async def _each(f):
        coroin_task = asyncio.ensure_future(coroin)

        tasks = []
        f = _f_wrapper(f)

        x = await qin.get()

        while x is not DONE:

            if limit:
                tasks = await _task_limit(tasks, limit)

            fcoro = f(x)
            ftask = asyncio.ensure_future(fcoro)
            tasks.append(ftask)

            x = await qin.get()

         # await tasks
        tasks = _active_tasks(tasks)
        while len(tasks) > 0:
            await asyncio.sleep(0)
            tasks = _active_tasks(tasks)

        await coroin_task

    return _each(f)

def run(stream):
    return stream.coroutine
aparamon commented 5 years ago

@cgarciae Thanks for sharing your implementation!

What do you think of the alternative interface for the same task? It's basically a further generalization of paco.gather:

def igather(coros_or_futures, limit=0, loop=None, timeout=None,
        return_exceptions=False):
    """
    Arguments:
        coros_or_futures (iterable|asynchronousiterable): iterator yielding
            coroutines functions.
        limit (int): max concurrency limit. Use ``0`` for no limit.
        loop (asyncio.BaseEventLoop): optional event loop to use.
        timeout (int|float): timeout can be used to control the maximum number
            of seconds to wait before returning. timeout can be an int or
            float. If timeout is not specified or None, there is no limit to
            the wait time.
        return_exceptions (bool): returns exceptions as valid results.

    Returns:
        asynchronousiterable: sequence of values yielded by coroutines,
            as completed

Making the result ordered should be also possible, albeit a bit harder to implement and memory-hungry in the worst case.

aparamon commented 5 years ago

An implementation sketch inspired by https://bugs.python.org/issue30782#msg336237:

async def igather(tasks, limit=None):

    async def submit(tasks, buf):
        # TODO: additionally support async iterators
        for task in tasks:
            await buf.put(asyncio.create_task(task))
        await buf.put(None)

    async def consume(buf):
        while True:
            task = await buf.get()
            if task:
                yield await asyncio.wait_for(task, None)
            else:
                break

    buf = asyncio.Queue(limit or 0)
    asyncio.create_task(submit(tasks, buf))
    async for result in consume(buf):
        yield result

It preserves task submission order in efficient way, but lacks proper exception handling.