ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.78k stars 362 forks source link

coroutine observer #138

Closed jhidding closed 5 years ago

jhidding commented 7 years ago

Hi, I'm rewriting (or planning to rewrite) a piece of coroutine based code into Rx. The first thing I find myself wanting to do is something like:

from rx import (Observer)
import functools

class CoroutineObserver(Observer):
    """Wraps a Python coroutine in an :py:class:`rx.Observer`."""
    def __init__(self, cr):
        self.cr = cr

    def on_next(self, x):
        self.cr.send(x)

    def on_error(self, e):
        raise e

    def on_completed(self):
        pass

def coroutine_observer(f):
    @functools.wraps(f)
    def g(*args, **kwargs):
        cr = f(*args, **kwargs)
        cr.send(None)
        return CoroutineObserver(cr)

    return g

Which alows to define an observer like so:

@coroutine_observer
def printer():
    while True:
        a = yield
        print("got: ", a)

This code works fine, but I'm wondering if I'm not duplicating stuf. Cheers!

dbrattli commented 7 years ago

Nice! Haven't interfaced with coroutines, but did various experiments to flip to async iterable and async observables. Your code should work, but observers should not raise exceptions, so you also need to propagate errors cr.throw() and completion cr.close(). Notice that the enhanced coroutine already is a kind of observer (send, throw, close). This will make the while-loop a bit more messy with try-excepts for StopIteration and Exception, so the crux is to find a way to flip it into something iterable instead, which is why I ended up with async iterables as with aioreactive.

jhidding commented 7 years ago

Ok! I've been thinking about 'flipping' to iterables in terms of generators/coroutines, using threads and queues as glue. I don't understand async/await at all (why do we have weird syntax features that only work with an obfuscated library feature like the loop manager?), which is why I was looking at RxPY to make things easier, but I'll certainly take a look at aioreactive.

lock[bot] commented 4 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.