vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
800 stars 34 forks source link

composition of streams without initial value #51

Closed uriva closed 4 years ago

uriva commented 4 years ago

Is it possible to compose different streams without giving them initial value, sort of like function composition, resulting in an async callable ready for input or further composition?

vxgmichel commented 4 years ago

Interesting! Some of the existing operators are basically the composition of two other operators.

For instance, merge is the composition of flatten and iterate:

https://github.com/vxgmichel/aiostream/blob/f5e42780cc7db5b3c7226538b8ed96495ad83618/aiostream/stream/combine.py#L145-L153

Do user-defined operators fit your use case, or is it more specific than that?

uriva commented 4 years ago

tldr; I want to compose async and sync functions seamlessly and reuse pipelines, or composing them to each other before they are used.

I'm using the toolz lib a lot, and it's great when writing regular python, but completely useless when using async functions.

I have big pipelines comprised of smaller ones, that I'm making programmatically, then using. I'm now in the process of introducing async functions.

e.g. this is a pipeline immediately used

gamla.apipe(
        text,
        func,
        gamla.ajuxt(
            toolz.identity,
            gamla.acompose_left(
                curried.map(lambda x: x.text),
                lambda mapped_vals: ([text], mapped_vals),
                gamla.star(itertools.product),
                tuple,
                gamla.amap(some_function),
            ),
        ),
        zip,
        curried.filter(toolz.second),
        curried.map(toolz.first),
        frozenset,
    )

and this is one that's returned to be used somewhere else:

    ...    
    return gamla.acompose_left(
        pipeline0,
        gamla.afirst(
            pipeline1,
            pipeline2,
            exception_type=LookupError,
        ),
    )

where all the a-something functions are my modifications of toolz functions to support async cases. Then I stumbled upon your lib and it looks similar but more sophisticated and planned, but what I lack is access to the compose operation (and some other combinators such as first).

vxgmichel commented 4 years ago

I'm using the toolz lib a lot, and it's great when writing regular python, but completely useless when using async functions.

Hmm I had a look at toolz API, it's quite interesting!

I have big pipelines comprised of smaller ones, that I'm making programmatically, then using.

Creating the pipeline and then using it is very much like the aiostream way of doing things.

... but completely useless when using async functions.

That's the point I like to discuss. As far as I understand, toolz has several APIs and for the purpose of this conversation we're interested in two of them:

It is true that itertoolz doesn't support async functions, and one could argue that aiostream fills this gap. One difference I'd like to point out is that in the aiostream world, every operator returns a stream, even operators like getitem or reduce that produce a single value. Note however that it's usually not needed to unpack this single element since merely awaiting a stream returns its last element, e.g:

    assert await stream.accumulate(stream.range(3)) == 6

Now, another point I'd like to make is that the functions you mentioned (pipe, compose, juxt) are part of the functoolz API. Since building a pipeline is synchronous, it is fine to use functoolz to create an aiostream pipeline, e.g:

import asyncio
from aiostream import stream
from toolz import compose, flip, pipe

async def main():
    double_and_sum = compose(
        flip(stream.map)(lambda x: 2 * x),
        stream.accumulate,
    )
    xs = pipe(stream.range(10), double_and_sum)
    assert await xs == 90

if __name__ == "__main__":
    asyncio.run(main())

One issue though is that the composed function double_and_sum can't be used with the aiostream piping syntax (|) since it is not an operator per say. You can still wrap it though:

from aiostream import stream, operator
[....]
    double_and_sum = operator(
        compose(
            flip(stream.map)(lambda x: 2 * x),
            stream.accumulate,
        ),
        pipable=True,
    )
    xs = stream.range(10) | double_and_sum.pipe()

Or at least you could if it wasn't for an incompatibility between operator and toolz.functoolz.Compose :sweat_smile:

This should address your initial question about composing streams in particular, but I suspect from your examples that you might also need to compose coroutines. This would require an async alternative to toolz.functools although I don't know if that exists already. This post is already too long, but let me know if you want to discuss this point too (maybe in another issue since it seems to be different from your original issue).

(and some other combinators such as first)

FYI, first can be written as:

Hope that helps :)

uriva commented 4 years ago

I think my point is - a programmer should not be bothered with these details, and be allowed to switch between async, sync, streams or single values, as long as the combinators (higher order functions like pipe or compose) know how to make sense of it all. Python has some way to go until it does that (e.g. the await keyword is an anti-functional programming pattern imho).

Your library does it for streams, mine does it for async/sync combinations (sort of an async agnostic toolz).

So maybe this issue is saying that someone should go in this direction and it seems like this lib is the most advanced one.

vxgmichel commented 4 years ago

A programmer should not be bothered with these details, and be allowed to switch between async, sync, streams or single values, as long as the combinators (higher order functions like pipe or compose) know how to make sense of it all.

Well, I think that's easier said than done but I'd very interested if you come up with a way to write combinators that work transparently regardless of async/sync or single-value/multiple-values considerations.

Python has some way to go until it does that (e.g. the await keyword is an anti-functional programming pattern imho).

I don't know about that, rust and haskell both have a similar concept :thinking:

someone should go in this direction and it seems like this lib is the most advanced one.

I'd rather have this lib focus on async iterators (since that's already tricky enough, especially if we want to support libs like trio and curio). However, I'd be happy to tweak the API to fit with other functional lib such as toolz. In fact, I'm quite happy with the example I came up with earlier (toolz.functoolz.* to compose operators), so please let me know if you face some kind of limitation while playing with those.

I would also like to make a note here about the different syntax for chaining combinators that I encountered recently:

I wonder if we should rethink the pipe | syntax for aiostream :thinking:

uriva commented 4 years ago

Well, I think that's easier said than done but I'd very interested if you come up with a way to write combinators that work transparently regardless of async/sync or single-value/multiple-values considerations.

Take a look at uriva/gamla, you'll find acompose, amap, mapa etc' (the latter is similar to your stream combinators), they work well for me atm, but they're definitely less fancy and rely on constant materialization (I value syntax over efficiency :)).

wrt await - its functional version would be materialize (with function syntax like print).

So the JS style x.filter(...).map(...) pipes are inferior imo because they prevent composition without value. I'm not a fan of the pipe because it feels too magical, but I haven't tried it (just a bit using beam).

vxgmichel commented 4 years ago

So the JS style x.filter(...).map(...) pipes are inferior imo because they prevent composition without value.

I suppose this is why rust usually provide both structs and methods (e.g the core::iter::Map struct and the core::iter::Iterator.map method). It's also the reason why I ended up exposing two versions of the aiostream operators, in aiostream.stream and aiostream.pipe.

I'm not a fan of the pipe because it feels too magical, but I haven't tried it (just a bit using beam).

I just remembered that the combinators exposed in aiostream.pipe are simply flipped and curried version of the combinators in aiostream.stream. Then the pipe | operator is simply syntactic sugar for the equivalent of toolz.pipe, i.e a | b | c is equivalent to c(b(a)). This means there is a great interoperability between toolz.functoolz and aiostream. In particular, the only missing brick in aiostream for composing operators is toolz.compose:

import asyncio
from toolz import compose
from aiostream import stream, pipe

async def main():

    # This combinator takes a stream as argument and returns a stream
    double_and_sum = compose(pipe.map(lambda x: 2 * x), pipe.accumulate())

    # Simple instantiation of a stream by providing the initial argument
    assert await double_and_sum(stream.range(10)) == 90

    # Integration within a pipeline to build more complex stream
    await (stream.range(10) | double_and_sum | pipe.print())

if __name__ == "__main__":
    asyncio.run(main())

This example seems to fit your initial question very well, right? Maybe aiostream should provide a compose implementation after all :thinking:

uriva commented 4 years ago

Yes that looks good. I'd also like to be able to interleave regular non async functions (e.g. curried.map) if that's possible.

vxgmichel commented 4 years ago

I'd also like to be able to interleave regular non async functions (e.g. curried.map) if that's possible.

Well, pipe.map(fn) works exactly like curried.map(fn) if fn is synchronous. The only difference is that you can combine it with other async iterators. Consider the example above: every operation in the pipeline (range, map, aggregate, print) is synchronous but every step is wrapped as an async iteration so you can also plug some actually async operations into the pipeline. Note that async iteration doesn't imply any kind of concurrent execution, so a stream behaves exactly like its sync equivalent as long as everything down the chain is synchronous.