vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
800 stars 34 forks source link

aiter-like creation operator #64

Closed hniksic closed 4 years ago

hniksic commented 4 years ago

A StackOverflow question pointed out that there is no async equivalent to the two-argument iter. Although in the OP's particular case the issue was easily resolved through the use of the walrus operator, it occurred to me that a function that creates a stream from a function might actually be useful in many situations, such as for iterating over a queue or other situations where the data comes from a possibly-async function.

An operator that converts a callable into a stream would nicely fit among aiostream's creational operators and would allow combining such a source with other aiostream operators. Assuming the name from_func (in analogy to from_iterable), one could iterate over an asyncio queue with:

async for item in from_func(queue.get):
    ...

or from binary chunks of a file:

async for chunk in from_func(fileobj.read, 4096, sentinel=b''):
    ...

The implementation would be something like:

_no_sentinel = object()

@operator
def from_func(func, *args, sentinel=_no_sentinel):
    """Generate values by awaiting func(*args).

    Await the result of the function if the function is asynchronous.
    If sentinel is given, terminate when the function returns the sentinel.
    """
    is_async = asyncio.iscoroutinefunction(func)
    while True:
        item = func(*args)
        if is_async:
            item = await item
        if item is sentinel:
            break
        yield item
vxgmichel commented 4 years ago

Hi @hniksic, and thanks for the report!

I was about to merge this operator into the code base last year actually: #48

But then I realized the same logic can be written as:

# Equivalent to the confusing usage of `stream.iterate(callable, sentinel)
xs = stream.call(callable) | pipe.cycle() | pipe.takewhile(lambda x: x != sentinel)

and I decided it wasn't worth adding it to the operator list. But I guess it's not exactly something one can come up with on their own? I'd be curious to hear what your think about PR #48 :)

Thanks!

hniksic commented 4 years ago

I probably wouldn't think of call(...) | pipe.cycle(), but I don't have much experience with aiostream, so I might not be the best person to assess the discoverability of the idiom.

I'm not sure how I feel about the name iterate. If I understand it right, it makes sense in a pure-CS way because it refers to iteration over a given lambda, but it might confuse people than it helps. I think I'd prefer the name from_func, in which I also like that it automatically applies args to the function. (But not kwargs, because it needs a kwarg for the sentinel, and possibly other arguments to be added later - this is inconcsistent with some other functions in the module, but consistent with asyncio.)

Using pipe.takewhile() to stop at sentinel is very elegant for its composability, but having to actually spell out the lambda makes it unappealing.

I don't have a personal use case for the proposed feature, I just got the idea while considering a more elegant answer to the StackOverflow question (I go by user4815162342 on python-asyncio) and finding that aiostream has a number of creational operators, but not quite the one needed for the question. But as you've obviously considered it before, feel free to close this if the idea doesn't seem that after all.

vxgmichel commented 4 years ago

I'm not sure how I feel about the name iterate. If I understand it right, it makes sense in a pure-CS way because it refers to iteration over a given lambda, but it might confuse people than it helps.

This decision mostly comes from the idea that stream.iterate is supposed to be the stream equivalent to the iter builtin, although I agree both the naming and the iterate-over-a-function feature are quite obscure.

I think I'd prefer the name from_func, in which I also like that it automatically applies args to the function.

I think from_func is OK, but it can easily be confused with the call operator. Maybe call_repeatedly? or call_until? Since stream operators usually don't include a _ that would make it calluntil, which seems acceptable.

But as you've obviously considered it before, feel free to close this if the idea doesn't seem that after all.

Honestly I think adding this operator makes sense but it's getting harder and harder for me to make decisions about this project (as seen with the 6 months old PR #60, "release version 1.0"). I think I'll wait for someone to explicitly ask for it before re-opening #48.

Thanks a lot for your insight, and your work on the python-asyncio SO tag :wink: