redis / redis-py

Redis Python client
MIT License
12.68k stars 2.53k forks source link

Pipeline `execute` in async blocks as it packs the commands #3416

Open rad-pat opened 1 month ago

rad-pat commented 1 month ago

Version: redis-py: 5.11

Platform: Python 3.12/Ubuntu 24.04

Description: Executing a pipeline of many commands in asyncio blocks whilst it packs the commands https://github.com/redis/redis-py/blob/d6ddb0d66bc4bd28c99d9cf148c9085117cc8ce9/redis/asyncio/client.py#L1365

I'm not really sure how to build a repl to demonstrate the issue, but we have a case where we are sending many (200k) commands to a pipeline as a transaction, it blocks for several seconds.

rad-pat commented 1 week ago

For anyone interested, I created a custom Pipeline class and override the _execute_transaction method adding some releases back to the event loop. I would make a PR, but having commented on other PRs previously, not much progress seems to happen with them.

from redis.asyncio.client import Pipeline, Connection, CommandStackT, CommandT, ResponseError, EMPTY_RESPONSE, ExecAbortError, WatchError
import inspect

class AsyncPipeline(Pipeline):
    async def _execute_transaction(  # noqa: C901
            self, connection: Connection, commands: CommandStackT, raise_on_error
    ):
        pre: CommandT = (("MULTI",), {})
        post: CommandT = (("EXEC",), {})
        cmds = (pre, *commands, post)
        # Change 1 - run pack_commands in a thread
        all_cmds = await asyncio.to_thread(
            connection.pack_commands,
            (args for args, options in cmds if EMPTY_RESPONSE not in options)
        )
        await connection.send_packed_command(all_cmds)
        errors = []

        # parse off the response for MULTI
        # NOTE: we need to handle ResponseErrors here and continue
        # so that we read all the additional command messages from
        # the socket
        try:
            await self.parse_response(connection, "_")
        except ResponseError as err:
            errors.append((0, err))

        # and all the other commands
        for i, command in enumerate(commands):
            if EMPTY_RESPONSE in command[1]:
                errors.append((i, command[1][EMPTY_RESPONSE]))
            else:
                try:
                    await self.parse_response(connection, "_")
                except ResponseError as err:
                    self.annotate_exception(err, i + 1, command[0])
                    errors.append((i, err))
            # Change 2 - Release back to event loop to prevent blocking
            if i % 100 == 0:
                await asyncio.sleep(0)

        # parse the EXEC.
        try:
            response = await self.parse_response(connection, "_")
        except ExecAbortError as err:
            if errors:
                raise errors[0][1] from err
            raise

        # EXEC clears any watched keys
        self.watching = False

        if response is None:
            raise WatchError("Watched variable changed.") from None

        # put any parse errors into the response
        for i, e in errors:
            response.insert(i, e)

        if len(response) != len(commands):
            if self.connection:
                await self.connection.disconnect()
            raise ResponseError(
                "Wrong number of response items from pipeline execution"
            ) from None

        # find any errors in the response and raise if necessary
        if raise_on_error:
            self.raise_first_error(commands, response)
        # We have to run response callbacks manually
        data = []
        # Change 3 - Enumerate and then Release back to event loop to prevent blocking
        for i, (r, cmd) in enumerate(zip(response, commands)):
            if not isinstance(r, Exception):
                args, options = cmd
                command_name = args[0]
                if command_name in self.response_callbacks:
                    r = self.response_callbacks[command_name](r, **options)
                    if inspect.isawaitable(r):
                        r = await r
            data.append(r)
            if i % 100 == 0:
                await asyncio.sleep(0)
        return data