vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
793 stars 34 forks source link

question: how to use chain #116

Closed gnzsnz closed 4 months ago

gnzsnz commented 4 months ago

probably a stupid question, but here i go. how am i supposed to use chain?

example

import random
import asyncio
from aiostream import pipe, stream

async def process_async(i):
    await asyncio.sleep(random.random())
    return [i]

val = (i for i in range(10))

c = (stream.iterate(val) | 
    pipe.map(process_async,ordered=False)|
    pipe.list() |
    pipe.chain()
    )

I would expect this to work, but i get

[[7], [9], [0], [2], [4], [6], [5], [8], [1], [3]]

same result if i do

c = stream.chain(stream.iterate(val) | 
    pipe.map(process_async,ordered=False)|
    pipe.list()
    )

await c

[[5], [1], [2], [7], [8], [9], [0], [4], [3], [6]]

what i'm actually trying to do (the question behind the question, although i would like to understand both questions), is to generate a flat list (iterator), then chunck it and processes the chunks in parallel tasks

c = (stream.iterate(val) | 
    pipe.map(process_async,ordered=False)|
    pipe.list()|
    pipe.chain()| # this stops me 
    pipe.chunk(3) |
    pipe.iterate|
    pipe.map(process_one_in_chunk,ordered=False) # run batches of chunck size in parallel
    )
vxgmichel commented 4 months ago

how am i supposed to use chain?

chain is simply the concatenation of several asynchronous sequences.

So stream.chain(a, b, c) (or equivalently a | pipe.chain(b, c)) would produce all items from a then all items from b then all items from c.

In the code you wrote, | pipe.chain() is basically a no-op (as it will simply forwards the items produced by map).

I would expect this to work, but i get [...]

Could you explain more precisely the result you expected from this code? In any case what you get is the expected behavior:

So [[7], [9], [0], [2], [4], [6], [5], [8], [1], [3]] is the expected result.

gnzsnz commented 4 months ago

that was quick, thanks

I would like to get [3,6,1,8,0,...] basically i need to flatten the nested list (what itertools.chain does)

vxgmichel commented 4 months ago

Oh I see, I think you need the flatmap or flatten operator. Would this snippet help?

import random
import asyncio
from aiostream import pipe, stream

async def process_async(i):
    await asyncio.sleep(random.random())
    return [f"{i}a", f"{i}b"]

async def process_one_in_chunk(chunk):
    await asyncio.sleep(random.random())
    return chunk

async def main():
    values = (i for i in range(10))
    xs = (
        stream.iterate(values)
        | pipe.map(process_async)
        | pipe.flatmap(stream.iterate)
        | pipe.chunks(3)
        | pipe.map(process_one_in_chunk, ordered=False)
    )
    async with xs.stream() as streamer:
        async for item in streamer:
            print(item)

if __name__ == "__main__":
    asyncio.run(main())

It prints:

['1b', '2a', '2b']
['3a', '3b', '4a']
['6a', '6b', '7a']
['4b', '5a', '5b']
['7b', '8a', '8b']
['9a', '9b']
['0a', '0b', '1a']
vxgmichel commented 4 months ago

basically i need to flatten the nested list (what itertools.chain does)

By the way you would get the same result with itertools.chain:

>>> from itertools import chain
>>> list(chain(map(lambda i: [i], [1, 2, 3])))
[[1], [2], [3]]
gnzsnz commented 4 months ago

sorry my bad, this is what i'm looking for

from itertools import chain
list(chain.from_iterable(map(lambda i: [i], [1, 2, 3])))
[1, 2, 3]

thanks for your help, i need to go through your example

vxgmichel commented 4 months ago

Alright, then pipe.concatmap(stream.iterate) (rather than flatmap) is probably the best equivalent to chain.from_iterable.

gnzsnz commented 4 months ago

thanks a lot, i think that i'm starting to get it. i manage to build a working pipeline. and it ended up quite differently than what i originally thought.

I was thinking to manage the number of concurrent tasks with chunk, but i realized that map can do that on it's own. aiostream is amazing, i can rebuild a lot of thinks using it and the code will be so easy to understand.

thanks a lot for putting aiostream together.

PS: I would suggest you to enable discussions. I think it will be great if people could contribute their recipes there.

vxgmichel commented 4 months ago

I was thinking to manage the number of concurrent tasks with chunk, but i realized that map can do that on it's own. aiostream is amazing, i can rebuild a lot of thinks using it and the code will be so easy to understand.

I'm glad you found it useful :)

PS: I would suggest you to enable discussions. I think it will be great if people could contribute their recipes there.

Good idea, done! :tada: