vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
799 stars 34 forks source link

concat, flatten, switch and their corresponding *map operators #3

Closed dmzkrsk closed 6 years ago

dmzkrsk commented 7 years ago
import asyncio
from aiostream import stream

async def g1():
    for i in range(10):
        yield 2 * i
        await asyncio.sleep(.3)

async def g2(n):
    yield n
    await asyncio.sleep(.1)
    yield n + 1

async def main():
    # Create stream of numbers
    xs = stream.iterate(g1())
    # Map each number to a new stream
    ys = stream.map(xs, lambda n: stream.iterate(g2(n)))
    # Flatten a stream of streams into a stream of numbers
    zs = stream.flatten(ys)

    # Or just
    # zs = stream.flat_map(xs, g2)  # automatically wrapped into stream.iterate

    await stream.list(zs)
    # Expecting [0, 1, 2, 3, 4, ..., 19]

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

flat_map is quite a handy thing in reactive world. Is it possible to implement it in aiostream?

It should work like a stream.merge, but merge expects all streams given as arguments prior to call, but in the example above streams also arrive asynchronly.

P.S. Also, switch_map should be easy implement after flat_map.

dmzkrsk commented 7 years ago

Well I took some inspiration from merge and implemented both flatten and switch:

@operator(pipable=True)
async def base_merge(source, *, switch):
    async with AsyncExitStack() as stack:
        meta_streamer = await stack.enter_context(streamcontext(source))
        task = asyncio.ensure_future(anext(meta_streamer))
        streamers = {task: meta_streamer}

        while streamers:
            done, _pending = await asyncio.wait(
                list(streamers), return_when="FIRST_COMPLETED")

            for task in done:
                try:
                    result = task.result()
                except StopAsyncIteration:
                    # End of stream
                    streamers.pop(task)
                    continue

                streamer = streamers.pop(task)

                if streamer is meta_streamer:
                    if switch:
                        # Cancel all other streams
                        streamers.clear()

                    # Setup a new source
                    streamer = await stack.enter_context(streamcontext(result))
                    stream_task = asyncio.ensure_future(anext(streamer))
                    streamers[stream_task] = streamer

                    # Schedule next anext
                    meta_task = asyncio.ensure_future(anext(meta_streamer))
                    streamers[meta_task] = meta_streamer
                else:
                    # Simply yield a result
                    yield result

                    # Schedule next anext
                    task = asyncio.ensure_future(anext(streamer))
                    streamers[task] = streamer

@operator(pipable=True)
async def switch(source):
    async for item in base_merge.raw(source, switch=True):
        yield item

@operator(pipable=True)
async def flatten(source):
    async for item in base_merge.raw(source, switch=False):
        yield item

@operator(pipable=True)
async def merge(*sources):
    # Re-implemented merge
    async for item in base_merge.raw(stream.iterate.raw(sources), switch=False):
        yield item

Hoping for some review (or I can prepare a merge request and review it there)

A question about my switch implementation.

I call streamers.clear() when in switch mode. So previous streams are ignored.

Are ignored streams "closed" correctly in this case?

vxgmichel commented 7 years ago

Very interesting! I'm now reading about flatMap vs concatMap, and a possible implementation for concatMap could be:

@operator(pipable=True)
async def flatten(source):
    async with streamcontext(source) as streamer:
        async for iterator in streamer:
            subsource = stream.iterate(iterator)
            async with streamcontext(subsource) as substreamer:
                async for item in substreamer:
                    yield item

@operator(pipable=True)
def concatmap(source, func):
    return flatten.raw(map.raw(source, func))
vxgmichel commented 7 years ago

Are ignored streams "closed" correctly in this case?

Yes, as soon as the AsyncExitStack is exited. It might make sense to clean them up earlier though, using await streamer.__aexit__(None, None, None).

dmzkrsk commented 7 years ago

Found a bug with switch version when a new stream and a value appeared at the same time.

This version works better:

async def add_source(streamers, stack, source, do_switch):
    if do_switch:
        for t, s in streamers.items():
            t.cancel()
            await s.__aexit__(None, None, None)

        streamers.clear()

    return await stack.enter_context(streamcontext(source))

def schedule(streamers, streamer):
    task = asyncio.ensure_future(anext(streamer))
    streamers[task] = streamer

async def base_merge(meta_source, *, do_switch):
    async with AsyncExitStack() as stack:
        streamers = {}

        meta_streamer = await stack.enter_context(streamcontext(meta_source))
        meta_task = asyncio.ensure_future(anext(meta_streamer))

        while streamers or meta_task:
            wait_tasks = list(streamers) + ([meta_task] if meta_task else [])
            done, _pending = await asyncio.wait(wait_tasks, return_when="FIRST_COMPLETED")

            # For "do_switch=True" to work properly, meta_streamer should be processed first
            if meta_task and meta_task in done:
                done = [t for t in done if t != meta_task and not do_switch]

                try:
                    new_source = meta_task.result()
                except StopAsyncIteration:
                    meta_task = None
                else:
                    # Re-scheduler meta_streamer
                    meta_task = asyncio.ensure_future(anext(meta_streamer))

                    # Setup a new source
                    streamer = await add_source(streamers, stack, new_source, do_switch)
                    schedule(streamers, streamer)

            for task in done:
                streamer = streamers.pop(task)

                try:
                    yield task.result()
                except StopAsyncIteration:
                    pass  # End of stream
                else:
                    # Re-Schedule streamer
                    schedule(streamers, streamer)

@operator(pipable=True)
async def switch(source):
    async for item in base_merge(source, do_switch=True):
        yield item

@operator(pipable=True)
async def flatten(source):
    async for item in base_merge(source, do_switch=False):
        yield item

@operator(pipable=True)
async def switch_map(source, func):
    xs = stream.map.raw(source, lambda v: stream.iterate(func(v)))

    async for item in base_merge(xs, do_switch=True):
        yield item

@operator(pipable=True)
async def flat_map(source, func):
    xs = stream.map.raw(source, lambda v: stream.iterate(func(v)))

    async for item in base_merge(xs, do_switch=False):
        yield item
vxgmichel commented 7 years ago

Looks good! A few comments:

@operator(pipable=True)
def switch(source):
    return base_merge.raw(source, do_switch=True)

@operator(pipable=True)
def flatten(source):
    return base_merge.raw(source, do_switch=False)

@operator(pipable=True)
def switch_map(source, func):
    xs = stream.map.raw(source, lambda v: stream.iterate(func(v)))
    return base_merge.raw(xs, do_switch=True)

@operator(pipable=True)
def flat_map(source, func):
    xs = stream.map.raw(source, lambda v: stream.iterate(func(v)))
    return base_merge.raw(xs, do_switch=False)
vxgmichel commented 7 years ago

I've been thinking about it, and there is 3 ways to deal with an async sequence of async subsequences:

They all have their corresponding map operator: concatmap, flatmap, switchmap.

Now, in the case of a sync sequence of async subsequences:

Now two questions need to be answered:

@dmzkrsk What do you think?

vxgmichel commented 7 years ago

Also, I think it'd be clearer to let switch and flat have separate implementations:

async def switch(source):
    async with AsyncExitStack() as stack:
        # Initialize
        streamer = await stack.enter_context(streamcontext(source))
        streamer_task = asyncio.ensure_future(anext(streamer))
        substreamer_task = None
        # Loop over events
        while streamer_task or substreamer_task:
            wait_tasks = filter(None, (streamer_task, substreamer_task))
            done, _ = await asyncio.wait(wait_tasks, return_when="FIRST_COMPLETED")
            # Substreamer event
            if substreamer_task in done:
                try:
                    yield substreamer_task.result()
                except StopAsyncIteration:
                    await substreamer.aclose()
                    substreamer_task = None
            # Streamer event
            if streamer_task in done:
                try:
                    subsource = streamer_task.result()
                except StopAsyncIteration:
                    streamer_task = None
                else:
                    # Clean up
                    await substreamer.aclose()
                    if substreamer_task:
                        substreamer_task.cancel()
                    # New substream
                    streamer_task = asyncio.ensure_future(anext(streamer))
                    substreamer = await stack.enter_context(streamcontext(subsource))
                    substreamer_task = asyncio.ensure_future(anext(substreamer)
dmzkrsk commented 7 years ago

The namings are ok, but I'd used snake-case versions for *map: flat_map etc. And I'd keep switch and the others as separate operators.

I like your switch implementation, a separate version looks much cleaner. What about merge? The current variant, that expects all sequences to be known is also useful. Should it have a separate implementation? Or use base_merge?

And couple of fixes for your switch version:

async def switch(source):
    async with AsyncExitStack() as stack:
        # Initialize
        streamer = await stack.enter_context(streamcontext(source))
        streamer_task = asyncio.ensure_future(anext(streamer))
        substreamer = None
        substreamer_task = None
        # Loop over events
        while streamer_task or substreamer_task:
            wait_tasks = filter(None, (streamer_task, substreamer_task))
            done, _ = await asyncio.wait(wait_tasks, return_when="FIRST_COMPLETED")

            # Streamer event
            if streamer_task in done:
                try:
                    subsource = streamer_task.result()
                except StopAsyncIteration:
                    streamer_task = None
                else:
                    # Clean up
                    if substreamer:
                        # await substreamer.aclose()
                        await substreamer.__aexit__(None, None, None)
                    if substreamer_task:
                        substreamer_task.cancel()
                    # New substream
                    streamer_task = asyncio.ensure_future(anext(streamer))
                    substreamer = await stack.enter_context(streamcontext(subsource))
                    substreamer_task = asyncio.ensure_future(anext(substreamer))

            # Substreamer event
            elif substreamer_task in done:
                try:
                    yield substreamer_task.result()
                except StopAsyncIteration:
                    # await substreamer.aclose()
                    await substreamer.__aexit__(None, None, None)
                    substreamer_task = None
                else:
                    substreamer_task = asyncio.ensure_future(anext(substreamer))
vxgmichel commented 7 years ago

The namings are ok, but I'd used snake-case versions for *map: flat_map etc.

Well, the other operators don't use underscores, e.g. takelast, skiplast, takewhile (just like the itertools functions starmap, groupby, etc.).

What about merge? The current variant, that expects all sequences to be known is also useful. Should it have a separate implementation? Or use base_merge?

I was thinking of calling your base_merge implementation flat or flatten, and make merge use it.

And couple of fixes for your switch version

Thanks, that code was untested :sweat_smile:

You forgot to re-schedule substreamer_task

Indeed!

also await substreamer.aclose() doesn't work for me (Streamer has no aclose)

Yes, I was planning to add aclose to Streamer and make __aexit__ use it.

I check streamer_task first, since if we have streamer_task and substreamer_task ready at the same time (done has two items) we should cancel the current substreamer_task and don't yield the value.

Why not yielding the value if it's already here?

dmzkrsk commented 7 years ago

Well, the other operators don't use underscores

I see. OK

Why not yielding the value if it's already here?

Hmm, I need to check how other libs handle this then

SillyFreak commented 6 years ago

I would also be interested in this and could help with a PR.

Why not yielding the value if it's already here?

Hmm, I need to check how other libs handle this then

I had a look at the switchMap implementations of RxJava and RxJS, but I think looking at Rx for reference is not too useful. The issue here is specific to how asyncio.wait can return multiple completed results, select style. The Rx implementations use a next/onNext method that processes one item of one observable, if some select lurks down there somewhere, the switch operator does not get to see that both items really arrived at the "same time".

I'd advocate for yielding; imagine that streamer's and substreamer's anext always completed together. Without yielding, the switch would never yield anything, waiting for the now-current new substreamer.

Is there anything else to be decided before implementing* this?

*i.e. copying your code, adding the same cleanup as in #8, adding tests

vxgmichel commented 6 years ago

@SillyFreak Thank you for investigating this and offering your help, it's much appreciated!

Is there anything else to be decided before implementing* this? i.e. copying your code, adding the same cleanup as in #8, adding tests

Yes I think it's a good starting point, the docs need to be updated as well but I can take care of it.

I'm looking forward to review your PR, let me know if you need some help :)

vxgmichel commented 6 years ago

@dmzkrsk @SillyFreak Thanks for your help! Those operators will become available with version 0.2.5.