vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
799 stars 34 forks source link

Where should I create a stream context manager? #46

Closed blaiseli closed 4 years ago

blaiseli commented 5 years ago

I'm trying to convert some code using classical generators into code using async features. As far as I understood, aiostream looks like an helpful package to achieve this.

I came up with a small example mimicking the kind of things I'm trying to do: pairing pre-sorted numbers read from two files, using None when one stream of numbers has a gap until next matching pair.

Example input:

$ cat f1.txt
2
4
6
7
8
$ cat f2.txt
1
3
4
6
7
8
9

Desired output:

(None, 1)
(2, None)
(None, 3)
(4, 4)
(6, 6)
(7, 7)
(8, 8)
(None, 9)

And here is some code that does that using classical generators:

#!/usr/bin/env python3

import sys
from itertools import repeat

def get_nums_from_file(fname):
    with open(fname) as fh:
        for line in fh:
            yield int(line)

def try_take_from(generator):
    try:
        return next(generator)
    except StopIteration:
        return None

def pair_nums(fname1, fname2):
    f1_nums = get_nums_from_file(fname1)
    f2_nums = get_nums_from_file(fname2)
    num1 = try_take_from(f1_nums)
    if num1 is None:
        yield from zip(repeat(None), f2_nums)
        return
    num2 = try_take_from(f2_nums)
    if num2 is None:
        yield from zip(f1_nums, repeat(None))
        return
    while True:
        if num1 > num2:
            yield (None, num2)
            num2 = try_take_from(f2_nums)
            if num2 is None:
                yield from zip(f1_nums, repeat(None))
                return
        elif num1 < num2:
            yield (num1, None)
            num1 = try_take_from(f1_nums)
            if num1 is None:
                yield from zip(repeat(None), f2_nums)
                return
        else:
            yield (num1, num2)
            num1 = try_take_from(f1_nums)
            if num1 is None:
                yield from zip(repeat(None), f2_nums)
                return
            num2 = try_take_from(f2_nums)
            if num2 is None:
                yield from zip(f1_nums, repeat(None))
                return

def main():
    for pair in pair_nums("f1.txt", "f2.txt"):
        print(pair)
    return 0

sys.exit(main())

I managed to get something working using async features and aiostream, but I get warned that AsyncIteratorContext is iterated outside of its context, and I don't understand where I should change my code to avoid the warning. Here is my code:

#!/usr/bin/env python3

import sys
import asyncio
from itertools import repeat
from aiostream import stream

def get_nums_from_file(fname):
    with open(fname) as fh:
        for line in fh:
            yield int(line)

async def try_take_from(generator):
    try:
        return await generator[0]
    except IndexError:
        return None

async def pair_nums(fname1, fname2):
    f1_nums = stream.iterate(get_nums_from_file(fname1))
    f2_nums = stream.iterate(get_nums_from_file(fname2))
    num1 = await try_take_from(f1_nums)
    if num1 is None:
        async for pair in stream.zip(stream.repeat(None), f2_nums):
            yield pair
        return
    num2 = await try_take_from(f2_nums)
    if num2 is None:
        async for pair in stream.zip(f1_nums, stream.repeat(None)):
            yield pair
        return
    while True:
        if num1 > num2:
            yield (None, num2)
            num2 = await try_take_from(f2_nums)
            if num2 is None:
                async for pair in stream.zip(f1_nums, stream.repeat(None)):
                    yield pair
                return
        elif num1 < num2:
            yield (num1, None)
            num1 = await try_take_from(f1_nums)
            if num1 is None:
                async for pair in stream.zip(stream.repeat(None), f2_nums):
                    yield pair
                return
        else:
            yield (num1, num2)
            num1 = await try_take_from(f1_nums)
            if num1 is None:
                async for pair in stream.zip(stream.repeat(None), f2_nums):
                    yield pair
                return
            num2 = await try_take_from(f2_nums)
            if num2 is None:
                async for pair in stream.zip(f1_nums, stream.repeat(None)):
                    yield pair
                return

async def main():
    async for pair in pair_nums("f1.txt", "f2.txt"):
        print(pair)
    return 0

sys.exit(asyncio.run(main()))

And here is the output:

(None, 1)
(2, None)
(None, 3)
(4, 4)
(6, 6)
(7, 7)
(8, 8)
/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/aiter_utils.py:120: UserWarning: AsyncIteratorContext is iterated outside of its context
  "AsyncIteratorContext is iterated outside of its context")
(None, 9)

I don't exactly understand the meaning of the warning, so I tried two approaches to avoid it.

  1. I tried to change the main function as follows:
async def main():
    pair_stream = stream.iterate(pair_nums("f1.txt", "f2.txt"))
    async with pair_stream.stream() as pairs:
        async for pair in pairs:
            print(pair)
    return 0

But this doesn't seem to change anything.

  1. I also tried to create the context managers inside the pair_nums function, as follows:
async def pair_nums(fname1, fname2):
    f1_nums_stream = stream.iterate(get_nums_from_file(fname1))
    f2_nums_stream = stream.iterate(get_nums_from_file(fname2))
    async with f1_nums_stream.stream() as f1_nums, f2_nums_stream.stream() as f2_nums: 
        # [same as before, but indented]

But this won't run properly:

(None, 1)
Traceback (most recent call last):
  File "./test_pair_nums_async_fix2.py", line 70, in <module>
    sys.exit(asyncio.run(main()))
  File "/local/gensoft2/exe/Python/3.7.2/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/local/gensoft2/exe/Python/3.7.2/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "./test_pair_nums_async_fix2.py", line 65, in main
    async for pair in pair_nums("f1.txt", "f2.txt"):
  File "./test_pair_nums_async_fix2.py", line 38, in pair_nums
    num2 = await try_take_from(f2_nums)
  File "./test_pair_nums_async_fix2.py", line 16, in try_take_from
    return await generator[0]
  File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/core.py", line 29, in wait_stream
    async for item in streamer:
  File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/stream/select.py", line 147, in item
    result = await anext(streamer)
  File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/stream/select.py", line 56, in skip
    async for i, item in streamer:
  File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/stream/transform.py", line 27, in enumerate
    async with streamcontext(source) as streamer:
  File "/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/aiter_utils.py", line 126, in __aenter__
    "AsyncIteratorContext is closed and cannot be iterated")
RuntimeError: AsyncIteratorContext is closed and cannot be iterated

What exactly is expected from me with this warning?

vxgmichel commented 5 years ago

Hi @blaiseli, thanks for the report!

There's definitely something wrong with the way this warning is reported as it doesn't show which stream is iterated out of its context. This should get fixed by the following patch, that I'll include in a later PR:

diff --git a/aiostream/aiter_utils.py b/aiostream/aiter_utils.py
index b6ff375..0bbaa24 100644
--- a/aiostream/aiter_utils.py
+++ b/aiostream/aiter_utils.py
@@ -117,7 +117,8 @@ class AsyncIteratorContext(AsyncIterator):
                 "AsyncIteratorContext is closed and cannot be iterated")
         if self._state == self._STANDBY:
             warnings.warn(
-                "AsyncIteratorContext is iterated outside of its context")
+                "AsyncIteratorContext is iterated outside of its context",
+                stacklevel=2)
         return anext(self._aiterator)

     async def __aenter__(self):

About the issue itself, you have to protect stream.zip(stream.repeat(None), f2_nums) and stream.zip(f1_nums, stream.repeat(None)) with a stream context in order to make sure that the streams are closed when you're done iterating on them:

async with stream.zip(stream.repeat(None), f2_nums).stream() as pairs:
    async for pair in pairs:
        yield pair

Here's a working version of your example with a bit of factorization:

async def pair_nums(fname1, fname2):
    f1_nums = stream.iterate(get_nums_from_file(fname1))
    f2_nums = stream.iterate(get_nums_from_file(fname2))
    num1 = await try_take_from(f1_nums)
    num2 = await try_take_from(f2_nums)

    while num1 is not None and num2 is not None:
        if num1 > num2:
            yield (None, num2)
            num2 = await try_take_from(f2_nums)
        elif num1 < num2:
            yield (num1, None)
            num1 = await try_take_from(f1_nums)
        else:
            yield (num1, num2)
            num1 = await try_take_from(f1_nums)
            num2 = await try_take_from(f2_nums)

    if num1 is None and num2 is not None:
        yield (None, num2)
        async with stream.zip(stream.repeat(None), f2_nums).stream() as pairs:
            async for pair in pairs:
                yield pair

    if num2 is None and num1 is not None:
        yield (num1, None)
        async with stream.zip(f1_nums, stream.repeat(None)).stream() as pairs:
            async for pair in pairs:
                yield pair

Note that you can get rid of try_take_from by chaining the number streams with stream.repeat(None):

async def pair_nums(fname1, fname2):
    f1_nums = stream.iterate(get_nums_from_file(fname1)) + stream.repeat(None)
    f2_nums = stream.iterate(get_nums_from_file(fname2)) + stream.repeat(None)
    num1 = await f1_nums[0]
    num2 = await f2_nums[0]

    while num1 is not None and num2 is not None:
        if num1 > num2:
            yield (None, num2)
            num2 = await f2_nums[0]
        elif num1 < num2:
            yield (num1, None)
            num1 = await f1_nums[0]
        else:
            yield (num1, num2)
            num1 = await f1_nums[0]
            num2 = await f2_nums[0]

    if num1 is not None or num2 is not None:
        yield (num1, num2)

    async with stream.zip(f1_nums, f2_nums).stream() as pairs:
        async for (num1, num2) in pairs:
            if num1 is None and num2 is None:
                return
            yield (num1, num2)

Or, even simpler:

async def pair_nums(fname1, fname2):
    f1_nums = stream.iterate(get_nums_from_file(fname1)) + stream.repeat(None)
    f2_nums = stream.iterate(get_nums_from_file(fname2)) + stream.repeat(None)
    num1 = await f1_nums[0]
    num2 = await f2_nums[0]

    while num1 is not None or num2 is not None:
        if num1 is None or num1 > num2:
            yield (None, num2)
            num2 = await f2_nums[0]
        elif num2 is None or num1 < num2:
            yield (num1, None)
            num1 = await f1_nums[0]
        else:
            yield (num1, num2)
            num1 = await f1_nums[0]
            num2 = await f2_nums[0]

Also, using await f2_nums[0] is not aiostream-friendly as it creates a new stream that returns its first element. For instance, it wouldn't work if get_nums_from_file was an aiostream operator:

import aiofiles
from aiostream import operators

@operator
async def integers_from_file(fname):
    async with aiofiles.open(fname) as fh:
        async for line in fh:
            yield int(line)

In this case, await f1_nums[0] would produce the first line of the f1.txt file everytime it is called. Instead, I would write pair_nums this way:

@operator(pipable=True)
async def pair_nums(source1, source2):
    f1_stream = stream.chain(source1, stream.repeat(None))
    f2_stream = stream.chain(source2, stream.repeat(None))

    async with f1_stream.stream() as f1_nums, f2_stream.stream() as f2_nums:

        num1 = await anext(f1_nums)
        num2 = await anext(f2_nums)

        while num1 is not None or num2 is not None:
            if num1 is None or num1 > num2:
                yield (None, num2)
                num2 = await anext(f2_nums)
            elif num2 is None or num1 < num2:
                yield (num1, None)
                num1 = await anext(f1_nums)
            else:
                yield (num1, num2)
                num1 = await anext(f1_nums)
                num2 = await anext(f2_nums)

And that's how it is used:

async def main():
    f1 = integers_from_file("f1.txt")
    f2 = integers_from_file("f2.txt")
    pair_stream = pair_nums(f1, f2)
    async with pair_stream.stream() as pairs:
        async for pair in pairs:
            print(pair)

Or, using the pipe syntax and the print operator:

from aiostream import pipe

async def main():
    f1 = integers_from_file("f1.txt")
    f2 = integers_from_file("f2.txt")
    await (f1 | pair_nums.pipe(f2) | pipe.print())

Hope that helps :)

blaiseli commented 5 years ago

Thanks for the patch and your detailed suggestions and advice.

I haven't tried the patch so far.

Just to be sure it works, I tried a factorized version of my still warning-emitting code. (Compared to your suggestion, there's a small difference: after the while loop, no need to test if one of the numbers is None when the other is not):

#!/usr/bin/env python3

import sys
import asyncio
from itertools import repeat
from aiostream import stream

def get_nums_from_file(fname):
    with open(fname) as fh:
        for line in fh:
            yield int(line)

async def try_take_from(generator):
    try:
        return await generator[0]
    except IndexError:
        return None

async def pair_nums(fname1, fname2):
    f1_nums = stream.iterate(get_nums_from_file(fname1))
    f2_nums = stream.iterate(get_nums_from_file(fname2))
    num1 = await try_take_from(f1_nums)
    num2 = await try_take_from(f2_nums)
    while num1 is not None and num2 is not None:
        if num1 > num2:
            yield (None, num2)
            num2 = await try_take_from(f2_nums)
        elif num1 < num2:
            yield (num1, None)
            num1 = await try_take_from(f1_nums)
        else:
            yield (num1, num2)
            num1 = await try_take_from(f1_nums)
            num2 = await try_take_from(f2_nums)

    if num2 is not None:
        yield (None, num2)
        async for pair in stream.zip(stream.repeat(None), f2_nums):
            yield pair

    if num1 is not None:
        yield (num1, None)
        async for pair in stream.zip(f1_nums, stream.repeat(None)):
            yield pair

async def main():
    async for pair in pair_nums("f1.txt", "f2.txt"):
        print(pair)
    return 0

sys.exit(asyncio.run(main()))

This still works, but the warning is now emitted at the end:

(None, 1)
(2, None)
(None, 3)
(4, 4)
(6, 6)
(7, 7)
(8, 8)
(None, 9)
/pasteur/homes/bli/.local/lib/python3.7/site-packages/aiostream/aiter_utils.py:120: UserWarning: AsyncIteratorContext is iterated outside of its context
  "AsyncIteratorContext is iterated outside of its context")

Then, I tried to protect the stream.zip parts:

#!/usr/bin/env python3

import sys
import asyncio
from itertools import repeat
from aiostream import stream

def get_nums_from_file(fname):
    with open(fname) as fh:
        for line in fh:
            yield int(line)

async def try_take_from(generator):
    try:
        return await generator[0]
    except IndexError:
        return None

async def pair_nums(fname1, fname2):
    f1_nums = stream.iterate(get_nums_from_file(fname1))
    f2_nums = stream.iterate(get_nums_from_file(fname2))
    num1 = await try_take_from(f1_nums)
    num2 = await try_take_from(f2_nums)
    while num1 is not None and num2 is not None:
        if num1 > num2:
            yield (None, num2)
            num2 = await try_take_from(f2_nums)
        elif num1 < num2:
            yield (num1, None)
            num1 = await try_take_from(f1_nums)
        else:
            yield (num1, num2)
            num1 = await try_take_from(f1_nums)
            num2 = await try_take_from(f2_nums)
    if num2 is not None:
        yield (None, num2)
        async with stream.zip(stream.repeat(None), f2_nums).stream() as pairs:
            for pair in pairs:
                yield pair
    if num1 is not None:
        yield (num1, None)
        async with stream.zip(f1_nums, stream.repeat(None)).stream as pairs:
            for pair in pairs:
                yield pair

async def main():
    async for pair in pair_nums("f1.txt", "f2.txt"):
        print(pair)
    return 0

sys.exit(asyncio.run(main()))

It seems to me that this is almost exactly what you suggested first. However, after generating the expected output, it seems to be trapped in an infinite loop:

(None, 1)
(2, None)
(None, 3)
(4, 4)
(6, 6)
(7, 7)
(8, 8)
(None, 9)
<aiostream.stream.select.getitem object at 0x7f759cba75f8>
<aiostream.stream.select.getitem object at 0x7f759cba75c0>
<aiostream.stream.select.getitem object at 0x7f759cba79e8>
<aiostream.stream.select.getitem object at 0x7f759cba7a20>
<aiostream.stream.select.getitem object at 0x7f759cba7a58>
<aiostream.stream.select.getitem object at 0x7f759cba7a90>
<aiostream.stream.select.getitem object at 0x7f759cba7ac8>
<aiostream.stream.select.getitem object at 0x7f759cc3abe0>
<aiostream.stream.select.getitem object at 0x7f759cba7a58>
<aiostream.stream.select.getitem object at 0x7f759cba7a20>
<aiostream.stream.select.getitem object at 0x7f759cba79e8>
<aiostream.stream.select.getitem object at 0x7f759cba75c0>
# etc.

Did I miss an important detail?

Ah, while then trying your second version I saw what was missing: the above happens when looping with for instead of async for. That was the important detail.

Sorry for using your issues for rubber duck debugging. I'll leave it like that in case that can be useful for someone else.

Now I should have a look at further features of aiostream to understand your advice regarding aiostream operators.

vxgmichel commented 5 years ago

Sorry for using your issues for rubber duck debugging. I'll leave it like that in case that can be useful for someone else.

That's definitely useful to me :) Using a regular for-loop on a stream or streamer should raise a TypeError, not go in an infinite loop. This is fixed in PR #47. Thanks!

blaiseli commented 5 years ago

I tried experimenting with your @operator based solution, and things are not yet very clear to me.

  1. What do you call a "aiostream operator"? Something that produces a stream?
  2. What are the difference between aiostream.stream.iterate and aiostream.operator? They look like two slightly different ways to turn generators into streams, but the second one requires an async generator
  3. What's the difference between an async generator and a stream, by the way? Is the second one just an async generator with extra features? Can a stream be used in any context where an asyng generator

I did the following experiment:

#!/usr/bin/env python3

import sys
import asyncio
from itertools import repeat
from aiofiles import open as aopen
from aiostream import stream, operator
from aiostream.aiter_utils import anext

def integers_from_file_1(fname):
    with open(fname) as fh:
        for line in fh:
            yield int(line)

async def integers_from_file_2(fname):
    with open(fname) as fh:
        for line in fh:
            yield int(line)

@operator
async def integers_from_file_3(fname):
    with open(fname) as fh:
        for line in fh:
            yield int(line)

source_makers = [integers_from_file_1, integers_from_file_2, integers_from_file_3]

def make_stream_1(int_source):
    return stream.chain(int_source, stream.repeat(None))

def make_stream_2(int_source):
    return stream.chain(stream.iterate(int_source), stream.repeat(None))

stream_makers = [make_stream_1, make_stream_2]

def make_pair_nums(source_maker, stream_maker):
    async def pair_nums(fname1, fname2):
        f1_stream = stream_maker(source_maker(fname1))
        f2_stream = stream_maker(source_maker(fname2))
        async with f1_stream.stream() as f1_nums, f2_stream.stream() as f2_nums:
            [num1, num2] = await asyncio.gather(anext(f1_nums), anext(f2_nums))
            while num1 is not None or num2 is not None:
                if num1 is None or num1 > num2:
                    yield (None, num2)
                    num2 = await anext(f2_nums)
                elif num2 is None or num1 < num2:
                    yield (num1, None)
                    num1 = await anext(f1_nums)
                else:
                    yield (num1, num2)
                    num1 = await anext(f1_nums)
                    num2 = await anext(f2_nums)
    return pair_nums

async def main():
    for (i, source_maker) in enumerate(source_makers):
        for (j, stream_maker) in enumerate(stream_makers):
            pair_nums = make_pair_nums(source_maker, stream_maker)
            print(i, j)
            try:
                async for pair in pair_nums("f1.txt", "f2.txt"):
                    print(pair)
            except TypeError as err:
                print(str(err))
    return 0

sys.exit(asyncio.run(main()))

Only the 0 0 combination fails ('generator' object is not async iterable), presumably due to the fact that stream.chain requires an async generator, not a plain one.

The other 5 all seem to behave correctly.

vxgmichel commented 5 years ago

I tried experimenting with your @operator based solution, and things are not yet very clear to me.

Out of curiosity, did you find the documentation for operator? I wonder if you simply missed it or if it is doing a poor job at explaining what it does :sweat_smile:

What do you call a "aiostream operator"? Something that produces a stream?

Yes, a stream operator is a class defined using with the operator decorator. When instantiated, it produces a stream:

# stream.count is an operator, xs is a stream
xs = stream.count()

All the operators exposed by aiostream are available in this table, but users can also define their own operator using the operator decorator. Note that I consider user-defined operators to be an advanced usage of the aiostream library.

What are the difference between aiostream.stream.iterate and aiostream.operator? They look like two slightly different ways to turn generators into streams, but the second one requires an async generator

So, stream.iterate is an operator producing a stream from any iterable (sync or async). Again, aiostream.operator is simply the way operators are defined. Also note that operator requires either:

What's the difference between an async generator and a stream, by the way? Is the second one just an async generator with extra features?

You can definitely see a stream as an async generator on steroids. The main differences are:

Can a stream be used in any context where an async generator is required?

Yes, but you'd be missing the safe streaming context and you would you would get the warning we were talking about earlier.

Hope it's clearer now :)

Also please let me know if you have ideas about how the documentation could be improved, thanks!

blaiseli commented 5 years ago

Thanks again for your more detailed explanations.

Out of curiosity, did you find the documentation for operator? I wonder if you simply missed it or if it is doing a poor job at explaining what it does

I had seen this page, but had missed some details, and I should have also read more in details the earlier pages of the documentation for this to be clearer.

So, stream.iterate is an operator producing a stream from any iterable (sync or async). Again, aiostream.operator is simply the way operators are defined.

So the thing returned by stream.iterate(<something>) is directly a stream, whereas the thing returned by aiostream.operator(<something>) is a Stream class, so it still needs to be instantiated in order to have a stream.

Also please let me know if you have ideas about how the documentation could be improved, thanks!

Maybe more details about what you mean by "stream operator" would be useful either in the general presentation (https://aiostream.readthedocs.io/en/latest/presentation.html#module-aiostream.stream) or in the page detailing the stream operators (https://aiostream.readthedocs.io/en/latest/operators.html#module-aiostream.stream).

I made a pull request with some attempts at improving the documentation on these points: https://github.com/vxgmichel/aiostream/pull/50

streams are reusable, i.e they produce a new async iterator at each usage

I'm not sure what you mean by "each usage". Maybe adding some examples in the doc would help, with a comparison between an async generator and a stream, and also, in case this could be relevant, the relation between this and what aiostream.stream.preserve does.

Yet another point that is still not clear to me is what a "streamer" is with respect to a "stream".

Also, I'm still experimenting, and I might have more questions...

blaiseli commented 5 years ago

I'm not sure what you mean by "each usage". Maybe adding some examples in the doc would help, with a comparison between an async generator and a stream, and also, in case this could be relevant, the relation between this and what aiostream.stream.preserve does.

Yet another point that is still not clear to me is what a "streamer" is with respect to a "stream".

https://github.com/vxgmichel/aiostream/issues/40#issuecomment-505342198 somewhat makes this clearer:

streams can be iterated multiple times so a new streamer object has to be created for each usage

A streamer is the thing on which one can iterate "safely", and without "exhausting" the underlying stream.

Well, some experiments show me that the non-exhaustion is not due to the use of the streamer. I can iterate directly on the stream several times. What did you mean by "has to be created each time"?

vxgmichel commented 5 years ago

Thanks a lot for the PR, it's much appreciated! I'll try to review it during the week-end :)

Yet another point that is still not clear to me is what a "streamer" is with respect to a "stream".

It is very similar similar to the concept of iterable vs iterator. Take range for instance: it's an iterable, meaning it implements the method __iter__. When called it, returns a range iterator:

>>> iter(range(3))
<range_iterator at 0x7fd69d037de0>

The range iterator is also an iterable, but its __iter__ method returns the range iterator itself. On top of that, it implements the method __next__ that is called repeatedly to produce the values.

Now, what happens when you use range and the range iterator several times?

>>> x = range(3)
>>> list(x)
[0, 1, 2]
>>> list(x)
[0, 1, 2]
>>> y = iter(x)
>>> list(y)
[0, 1, 2]
>>> list(y)
[]

Notice how the range iterator can only be used once: after that it won't produce any more values. Well, it's exactly the same for stream and streamers:

>>> x = stream.range(3)
>>> await stream.list(x)
[0, 1, 2]
>>> await stream.list(x)
[0, 1, 2] 
>>> y = x.stream()
>>> await stream.list(stream.preserve(y))
[0, 1, 2]
>>> await stream.list(y)
[]
>>> await stream.list(y)
RuntimeError: AsyncIteratorContext is closed and cannot be iterated

Now what about generators (and async generators)? Well, they are (async) iterators so they're only meant to be iterated once. So the job of the operator decorator is to transform an async generator function (i.e a function returning an async iterator) into a class of async iterable than can start over every time they're being iterated (again, similar to range).

In short:

operator --[instantiate]--> stream --[iterate (__aiter__)]--> streamer --[produce (__anext__)]--> items

Does that makes sense to you? You asked more questions but I think the stream vs streamer confusion was the biggest point, let me know if you need more clarification!

blaiseli commented 5 years ago

@vxgmichel Thanks again for the explanations. This makes perfect sense.

Using aioconsole, I could experiment with your examples.

I tried a more litteral translation of the range example:

>>> from aiostream import stream
>>> x = stream.range(3)
>>> await stream.list(x)
[0, 1, 2]
>>> y = x.__aiter__()
>>> await stream.list(y)
[0, 1, 2]
>>> await stream.list(y)
RuntimeError: AsyncIteratorContext is closed and cannot be iterated

Then, plugging in the astream.preserve helps understanding what it does. It makes it behave more like the non async range. Why did you chose not to make this the default behaviour?

I was wondering what more the .stream method did, but I see in the source code that x.stream() is actually just returning x.__aiter__()

vxgmichel commented 5 years ago

I was wondering what more the .stream method did, but I see in the source code that x.stream() is actually just returning x.__aiter__()

Yea, I couldn't really find a better name for this method (I thought aiter was a bit cryptic).

Why did you chose not to make this the default behaviour?

I thought I'd be a safer default: the idea is that streamer objects run in a safe context that ensures that all the resources are cleared when the iteration is over. For instance, when you run await y[0], the context is entered (using y.__aenter__), an item is produced (using y.__anext__), a cleanup is performed (using y.__aexit__) and the item is returned. Because the cleanup has closed the inner async generators, it's not possible to restore the execution to ask for the next value.

The explanation is a bit technical but I hope it makes sense. In short, I thought async generators were a bit unsafe to use since it was hard to know when some operations would be performed. So I added this context to improve causality: when the stream has produced a result, the cleanup has already been done.