vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
780 stars 34 forks source link

Async class methods in pipeline #69

Open cje opened 3 years ago

cje commented 3 years ago

I have an existing project which I'd like to convert to using your library. I've modified one of your examples (listed at the foot of this message) to show the structure I'm aiming for, ie: using async class methods as steps in a transformation pipeline.

This does not work due to the presence of the @operator decorator on class methods, and fails with the message:

An operator cannot be created from a method, since the decorated function becomes an operator class

Is there some alternative way in which I can have an async class methods process objects in the pipeline? I'd prefer keep all of the logic in the pipeline code if I can, and all of the existing logic in the existing classes.

import asyncio
import random as random_module
from aiostream import operator, pipe, streamcontext

class MyOperators:
    @operator
    async def random(self, offset=0.0, width=1.0, interval=0.1):
        """Generate a stream of random numbers."""
        while True:
            await asyncio.sleep(interval)
            yield offset + width * random_module.random()

    @operator(pipable=True)
    async def power(self, source, exponent):
        """Raise the elements of an asynchronous sequence to the given power."""
        async with streamcontext(source) as streamer:
            async for item in streamer:
                yield item ** exponent

    @operator(pipable=True)
    def square(self, source):
        """Square the elements of an asynchronous sequence."""
        return self.power.raw(source, 2)

    async def main():
        xs = (
            self.random()  # Stream random numbers
            | self.square.pipe()  # Square the values
            | pipe.accumulate()
        )  # Sum the values
        print(await xs)

# Run main coroutine
m = MyOperators()
loop = asyncio.get_event_loop()
loop.run_until_complete(m.main())
loop.close()
vxgmichel commented 3 years ago

Hi @cje, thanks for the report!

This limitation is here because it's quite tricky to bind the operator class (i.e. MyOperators.random) to the instance of the class it's defined in (i.e m). It requires some metaclass black magic, although it's definitely possible:

diff --git a/aiostream/core.py b/aiostream/core.py
index bdaedd9..f46d892 100644
--- a/aiostream/core.py
+++ b/aiostream/core.py
@@ -279,9 +279,9 @@ def operator(func=None, *, pipable=False):
         signature = inspect.signature(func)
         parameters = list(signature.parameters.values())
         if parameters and parameters[0].name in ("self", "cls"):
-            raise ValueError(
-                "An operator cannot be created from a method, "
-                "since the decorated function becomes an operator class"
+            parameters[0] = inspect.Parameter(
+                "original_" + parameters[0].name,
+                inspect.Parameter.POSITIONAL_OR_KEYWORD
             )

         # Injected parameters
@@ -296,10 +296,6 @@ def operator(func=None, *, pipable=False):
         original = func
         original.__qualname__ = name + ".original"

-        # Raw static method
-        raw = func
-        raw.__qualname__ = name + ".raw"
-
         # Init method
         def init(self, *args, **kwargs):
             if pipable and args:
@@ -317,15 +313,27 @@ def operator(func=None, *, pipable=False):
         init.__module__ = module
         init.__doc__ = f"Initialize the {name} stream."

-        if pipable:
+        if not pipable:
+
+            # Raw static method
+            def raw(cls, *args, **kwargs):
+                return cls.original(*args, **kwargs)
+
+            # Customize raw method
+            raw.__signature__ = signature
+            raw.__qualname__ = name + ".raw"
+            raw.__module__ = module
+            raw.__doc__ = doc
+
+        else:

             # Raw static method
-            def raw(*args, **kwargs):
-                if args:
-                    assert_async_iterable(args[0])
-                return func(*args, **kwargs)
+            def raw(cls, *args, **kwargs):
+                assert args
+                assert_async_iterable(args[0])
+                return cls.original(*args, **kwargs)

-            # Custonize raw method
+            # Customize raw method
             raw.__signature__ = signature
             raw.__qualname__ = name + ".raw"
             raw.__module__ = module
@@ -352,17 +360,28 @@ def operator(func=None, *, pipable=False):
             if extra_doc:
                 pipe.__doc__ += "\n\n    " + extra_doc

+        # Metaclass
+        class MetaDescriptor(type(Stream)):
+
+            def __get__(self, instance, owner=None):
+                owner = owner or type(instance)
+                attrs = {
+                    "original": func.__get__(instance, owner),
+                    "__module__": f"{owner.__module__}.{owner.__name__}",
+                }
+                return MetaDescriptor(self.__name__, (self,), attrs)
+
         # Gather attributes
         attrs = {
             "__init__": init,
             "__module__": module,
             "__doc__": doc,
-            "raw": staticmethod(raw),
+            "raw": classmethod(raw),
             "original": staticmethod(original),
             "pipe": classmethod(pipe) if pipable else None,
         }

         # Create operator class
-        return type(name, bases, attrs)
+        return MetaDescriptor(name, bases, attrs)

     return decorator if func is None else decorator(func)

I'll let it sink for a few days and let you know whether or not I think this fix is worth the trouble of maintaining this use case. In the meantime, could you elaborate a bit more about your use case? Do you use MyOperators as a simple namespace for your operators or does it contain an actual state that the operators rely upon?

Jc2k commented 3 years ago

I hit this recently.

Imo, I think it's probably more likely to be a problem when you are refactoring existing code. In my case I had implemented the chainable/builder style of foo.q.filter1(foo=bar).filter2(baz=qux) and realised it would be nicer to use aiostream, which i'd just starting using elsewhere. For me, foo contained some state, and i replaced the filters functons with pipable operators. Given the code I had it felt natural (and least invasive) to to add a foo.stream(), which of course didn't work.

If i was starting from scratch I might do this in a more aiostream "native" way. But I needed to be able to gradually replace the old API, so foo had to keep its existing API for the time being.

I ended up making a function that adapted foo - e.g. stream(foo) | filters.filter1.pipe(foo=bar). At the moment this doesn't need to poke at the "private" interface of foo so it feels ok.

I don't like the idea that the filters need access to self, though, and don't have a use case for that.

vxgmichel commented 3 years ago

In my case I had implemented the chainable/builder style of foo.q.filter1(foo=bar).filter2(baz=qux), [...]

The idea of providing a rust-style method-based chaining mechanism is also in my mind, although it's not obvious how this thing should work (pipable vs non-pipable, vanilla vs custom operators, etc.).

I don't like the idea that the filters need access to self, though, and don't have a use case for that.

I'm probably missing something, but couldn't you simply omit the self argument to bypass the limitation I was talking about? The following seems to work fine:

class MyOperators:
    @operator
    async def random(offset=0.0, width=1.0, interval=0.1):
        """Generate a stream of random numbers."""
        while True:
            await asyncio.sleep(interval)
            yield offset + width * random_module.random()

    @operator(pipable=True)
    async def power(source, exponent):
        """Raise the elements of an asynchronous sequence to the given power."""
        async with streamcontext(source) as streamer:
            async for item in streamer:
                yield item ** exponent

    @operator(pipable=True)
    def square(source):
        """Square the elements of an asynchronous sequence."""
        return MyOperators.power.raw(source, 2)

    async def main(self):
        xs = (
            self.random()  # Stream random numbers
            | self.square.pipe()  # Square the values
            | pipe.accumulate()
            | pipe.take(3)
            | pipe.list()
        )  # Sum the values
        print(await xs)
Jc2k commented 3 years ago

The chaining pattern I had going was pre-aiostreams, I hadn't thought about applying it to aiostream - I actually like the pipe stuff (though i wish i could make the .pipe() constructor the default for custom operators). I think for aiostreams auto-generating a chaining interface would require some fairly awful looking dynamic code that might not gel well with typing and probably isn't worth it?

I don't like the idea that the filters need access to self, though, and don't have a use case for that.

I'm probably missing something, but couldn't you simply omit the self argument to bypass the limitation I was talking about? The following seems to work fine:

Ah I think I haven't been very clear here, i wasn't looking for a solution, I was just weighing in on the original problem and saying I probably didn't find the usecase for pipable operators being on a stateful class very compelling (as you seemed unsure as whether this change was something you wanted to apply and have to maintain). I was saying personally i prefer the idea of my pipable operators being "pure", standalone and easy to test. I would try to avoid putting them on a class. And i'd do namespacing with modules.

vxgmichel commented 3 years ago

@Jc2k

I think for aiostreams auto-generating a chaining interface would require some fairly awful looking dynamic code that might not gel well with typing and probably isn't worth it?

My thoughts exactly :grin: (although the operator classes are already generated dynamically and causing some linting issues).

Ah I think I haven't been very clear here, i wasn't looking for a solution, I was just weighing in on the original problem

Oh got it, thanks for your feedback!

I was saying personally i prefer the idea of my pipable operators being "pure", standalone and easy to test.

Hmm right, it's probably something the lib should promote too.

And i'd do namespacing with modules.

Well namespacing with classes is not a problem either as long as you don't use self/cls and don't expect the operators to behave like methods.

MaxwellWibert commented 11 months ago

@vxgmichel Hey, I'm pretty late to this conversation, and can't speak to the difficulty of implementing these pipable operators as methods. I'm a big fan of this library, but I can't help but wonder if perhaps async python is the wrong domain to argue for functional purity, as @Jc2k was doing. I can definitely see the appeal in the argument, and I wholeheartedly agree that in many cases, pure functions are more testable, more maintainable, and just generally more elegant. With that in mind, I hope you'll forgive me if I offer a good-faith argument for why functional purity is an unreasonable expectation in asynchronous settings, and I hope you'll further forgive me for erring on the side of over-explaining my point.

Pure functions are inextricably side-effect free. They usually have no internal state (except, perhaps some value cache or something). Indeed, this is usually what makes them so attractive to work with, since it makes their behavior very predictable. However, as the name would suggest, the asyncio package is principally designed with IO operations in mind. IO operations are included in a program with the sole purpose of allowing side-effects. In many practical situations, we require data to flow in and out of our program through "side-channels" (typically files, databases, network calls, etc.), with the goals of modifying the behavior of our program based on the in-channels, and causing some persistent change on the world outside the program through the out-channels. Channels in and out of our program have side effects as a feature, not as a bug.

As for internal state, I often find during IO operations that some behavior-modifying internal state is absolutely unavoidable. For example, if I have to make a large number of HTTP requests to some API, I have to pipe these requests to a network call throttling proxy, which keeps me from exceeding that API's rate limits. This proxy needs to use internal state to keep track of how many requests it has made during a specified interval in time, when the current running request counter will reset, and a queue of all the calls loaded into the proxy during the wait time.

vxgmichel commented 11 months ago

Hi @MaxwellWibert, thanks for sharing your thoughts :)

I agree with pretty much everything you say, there's no real point in sticking to functional purity in the context of a lib like aiostream. So is it your point that instance-bound operators would be a valuable addition to the library?

The problem as I said at in my first post is that:

This limitation is here because it's quite tricky to bind the operator class (i.e. MyOperators.random) to the instance of the class it's defined in (i.e m). It requires some metaclass black magic.

So it's not trivial to implement, possibly painful to maintain, and very hard to type correctly. And there are alternatives to that which are:

What is your opinion about this?