vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
801 stars 34 forks source link

How to create a stream from a list of tasks. #30

Closed JerzySpendel closed 6 years ago

JerzySpendel commented 6 years ago

What I really wanted to ask in previous issue was something different :D

Let's say I have a list of tasks/futures and I want to create a stream of values returned by those tasks. What I managed to do is:

async def value(x):
    await asyncio.sleep(random.randint(1, 10) / 10)
    return x

async def main():
    tasks = stream.iterate([stream.just(asyncio.ensure_future(value(x))) for x in range(10)])

    async for x in stream.flatten(tasks):
        print(x)

asyncio.get_event_loop().run_until_complete(main())

Looks over-complicated and I wonder if the same can be achieved in a shorter form.

BTW. I'm so excited this library exists! Thank you!

JerzySpendel commented 6 years ago

Ok, I can omit the asyncio.ensure_future, but still.

vxgmichel commented 6 years ago

Hi @JerzySpendel,

What about using stream.map:

from aiostream import stream, await_
[...]
tasks = [asyncio.ensure_future(value(x)) for x in range(10)]
xs = stream.iterate(tasks)
ys = stream.map(xs, await_)
zs = stream.print(ys)
await zs

Or, even simpler:

xs = stream.range(10)
ys = stream.map(xs, value)
zs = stream.print(ys)
await zs

BTW. I'm so excited this library exists! Thank you!

Thank you for your kind words :)

JerzySpendel commented 6 years ago

Second solution is nice, it's optimal for exactly the problem I wanted to solve. But let's say I have arbitrary task to schedule (so I can't use stream.range and I have to use the first solution you have written). Using asyncio.ensure_future schedules those coroutines immediately and that's not what I want. I want to execute tasks when iterating asynchronously over them, just as stream.range doesn't create values when it's created, but throws out values asynchronously. I can provide good example later if you want (I will in couple hours if you won't respond earlier)

vxgmichel commented 6 years ago

In asyncio, a task is a coroutine that has been scheduled. However, it's perfectly fine to get rid of the ensure_future call and provide a stream of coroutines to stream.map:

coros = [value(x) for x in range(10)]
xs = stream.iterate(coros)
ys = stream.map(xs, await_, task_limit=5)
zs = stream.print(ys)
await zs

It is possible to limit the number of tasks running concurrently using the task_limit argument. This can be useful if the list of coroutines happens to be too large.