dbrattli / aioreactive

Async/await reactive tools for Python 3.11+
MIT License
350 stars 24 forks source link

Is there an operator to connect an Observable to an Observer, and if not, which one would you prefer? #8

Open ZmeiGorynych opened 7 years ago

ZmeiGorynych commented 7 years ago

Hi Dag,

I think the pipe syntax for chaining operators is really cool, and am missing something similar to complete the chain.

That is, I find myself writing code like xs = source | op.map( f1) | op.map( f2) | ... and then having to complete it with a clunky

async def f():
    subscription = await subscribe(xs, AnonymousAsyncObserver(sink_fun))

when I'd much rather just say something like

f = xs > sink_fun

where the > operator means we're coupling the source with an Observer that can consume it, returning a regular coroutine.

Is there something like that in aioreactive, and if not, do you have any immediate plans to write it?

If the answer to both of the above is 'no' , I'll have a go at writing one this week and would be grateful for any hints :)

Thanks a lot!

dbrattli commented 7 years ago

This looks very interesting. Haven't done anything like that yet. You might have a problem hiding the await in there, so you probably have to await the whole thing in the end await f. Then f needs to be an AsyncSubscriptionFactory (called AsyncStreamFactory in master). Thus you might want to want a week or so. The current changes being worked on is going all the way back to Rx and AsyncObserveable in the sense that subscriptions are AsyncDisposable only, but observers can be made AsyncIterable, so the way to use async-for will need to change. Pipe syntax will still be the same, so your proposal is very interesting if we can make it work.

ZmeiGorynych commented 7 years ago

Do you mean I won't be able to flip back and forth between an async source and an async iterator anymore once the latest changes are in? That would at a stroke remove a large part of the attraction of the library for me...

ZmeiGorynych commented 7 years ago

As for the proposed operator, why not have it go all the way: subscribe arg 2 to arg1, get an event loop, and ask it to run that until finished? This way a casual user could use the whole library without seeing an await once ;)