aio-libs-abandoned / aioredis-py

asyncio (PEP 3156) Redis support
https://aioredis.readthedocs.io/
MIT License
2.3k stars 336 forks source link

Pipeline methods inconsistancy #1028

Open stanislau-arkhipenka opened 3 years ago

stanislau-arkhipenka commented 3 years ago

Hey, Here is an example:

with r.pipeline() as pipe:
    await pipe.set("foo", "bar") # <-- returns awaitable
    await pipe.execute() 

but

with r.pipeline() as pipe:
    await pipe.watch()
    await pipe.multi()
    pipe.set("foo", "bar")  # <-- returns self
    await pipe.execute()

And it's very confusing... The reason behind this decidion goes back to original redis-py implementation. They wanted to be able todo something like pipe.set("foo", "bar").set("bar", baz") This happens because of:

    def execute_command(
        self, *args, **kwargs
    ) -> Union["Pipeline", Awaitable["Pipeline"]]:
        if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
            return self.immediate_execute_command(*args, **kwargs)
        return self.pipeline_execute_command(*args, **kwargs)

as immediate_execute_command returns awaitable, but pipeline_execute_command returns self

As far as I can see there are few options (and I don't like all of them): 1) Make immediate_execute_command to sync and return self, so we will fully support pipe.set("foo", "bar").set("bar", baz") case, but will lose return status from immediatly executed command 2) Make pipeline_execute_command async, so we will lose support of pipe.set("foo", "bar").set("bar", baz"), but will be consistent in invocation 3) we can add await into Pipeline which will do nothing, but allow us to await for Pipeline class

What do you think?

ShadowLNC commented 3 years ago

Preface: I'm using version aioredis @ git+https://github.com/aio-libs/aioredis-py.git@6ab86b5f187b36f8af4c1aabf8f698fcd6bdf842. At the time of writing, it doesn't look like there's been any changes beyond a few type signatures to the master branch.

I think I'm seeing the same issue. From the recipes (I assume the docs are for 2.0 given they include the 2.0 migration link):

await pipe.incr("foo").incr("bar").execute()

The above works, but I tried a similar approach:

await p.watch("foo").get("foo").execute()

I get an AttributeError: 'coroutine' object has no attribute 'get' and if I don't explicitly await the p.watch("foo") call (when running it separately), then I get RuntimeWarning: coroutine 'Pipeline.watch' was never awaited.

My full example was to be either of these (ideally I'd like both to be supported):

Example A:

p = my_redis_obj.pipeline(transaction=False)
p.watch("key")
p.get("key")  # Actually hgetall
_, val = await p.execute()
...  # check-and-set pattern
p.multi()
p.set("key", val)
await p.execute()

Example B:

p = my_redis_obj.pipeline(transaction=False)
_, val = await p.watch("key").get("key").execute()
...  # check-and-set pattern
await p.multi().set("key", val).execute()

In the above examples, I'm treating execute() as both "push this Pipeline" and "execute the transaction (if active) - Redis EXEC". I am having a bit of difficulty understanding relevant docs, so this mightn't be how it's meant to be used.

Proposed solution:

If, instead, we make the Pipeline class awaitable and use that to mean "push the Pipeline" (instead of returning self like it currently does), then execute() can be left to be explicitly perform the EXEC Redis command. All Redis commands could be added to the Pipeline's command_stack until the Pipeline object is awaited, at which point it's flushed and the result is returned. This would allow something like:

output = await p.watch("x").get("y").multi().set("z", "a").execute().get("b")

Or this (no difference from the perspective of the Pipeline object):

p.watch("x").get("y")  # Chaining some is fine since it just returns self
p.multi()
p.set("z", "a")
p.execute()
p.get("b")
output = await p

The type hints also seem to have limitations around the existing implementation as while get() and hgetall() (etc.) are inferred as Awaitable, using await doesn't seem to acutally do anything (it appears to return self - when in queueing mode). If all "command methods" return self and add to the command_stack (i.e. both ones that already add, like get() when in queueing mode, and ones that execute immediately, like watch() or like get() outside of queuing mode*), I think that also makes the type hinting a bit more straightforward.

*NB: I've made up a term "queueing mode" - basically defining when get() will add/queue to the command_stack vs when it will return a coroutine to be immediately awaited. See below. It's probably related to explicit_transaction and I suspect is directly dependent on which branch is taken inside execute_command method in the original description of this ticket (but I haven't stepped through the call stack).

With the proposed solution, if I await p where transaction=True when instantiating the Pipeline, but I don't call p.execute() before awaiting, then I'll get no result back, and I think that's fine. Reuse of the Pipeline would require calls to multi() (where a transaction is desired) and basically transaction=True becomes a shorthand for calling multi() immediately. This allows for the current strictness around preventing calls to watch() or multi() when already in a multi() block.


The current behaviour as outlined in the original comment is very hard to follow unless you're tracking the state of the Pipeline object:

p = my_redis_obj.pipeline(transaction=False)
await p.watch("a")  # Immediately-executing awaitable
await p.get("a")  # Immediately-executing awaitable
await p.execute()  # Empty list - note I never called multi() - puts it into queueing mode
await p.get("a")  # Queues it onto command_stack, awaiting does nothing as it returns self
# Here the queue still contains one `GET a`
await p.watch("a")  # Immediately executes, takes us out of queueing mode
await p.get("a")  # Immediately executes
await p.get("a").execute()  # AttributeError: 'coroutine' object has no attribute 'execute'
# Here the queue still contains one `GET a` from before
await p.execute()  # Returns the queued result and puts us back into queueing mode
await p.get("a")  # Queues it onto the command stack
await p.get("a").execute()  # Returns a list of two items, `a` from the line above and `a` from this line
await p.get("a").execute()  # Single item
await p.watch("a")  # Takes us back out of queueing mode

Basically we have interleaved queued items between direct calls to Redis. I'm of the opinion that the Pipeline class shouldn't support interleaved calls - you should be using a separate client object to perform those interleaved calls, and everything within the Pipeline object should be seqeuential. As 2.0.0 is still alpha, I'd hope we're not overly concerned about backwards compatibility here, but I understand there might need to be some deprecation period.

abrookins commented 3 years ago

There may be a better abstraction around pipelines and transactions than what we've adopted from redis-py. For now, we'll retain compatibility with redis-py, so we're going to keep this behavior.

FWIW, we're also retaining, for now, the fact that when you call client.pipeline(), you actually get a Redis "transaction" rather than a true pipeline. This has proven confusing for new people learning how to use redis-py when I've introduced the library to people.

In the fullness of time, I'd like to see a better abstraction that also maps better conceptually to the Redis concepts of "pipeline" and "transaction."

Let's keep this issue open for now and continue to think about it. For people interested in this, I'd like to see proposals for a new helper API rather than changing the existing pipeline() semantics.

WTF we would even call it at this point will be interesting to figure out, as there is also a transaction() helper, but I'm sure we can figure it out.