valkey-io / valkey-py

Valkey Python client based on a fork of redis-py
MIT License
66 stars 7 forks source link

NOSUB error in PubSub class while it should be a RuntimeError #115

Open amirreza8002 opened 1 week ago

amirreza8002 commented 1 week ago

hi we are trying to write a valkey backend for celery: the PR the integration test (and some normal usage) error with the following traceback:

self = <t.integration.test_canvas.test_chain object at 0x105e79310>, manager = <celery.contrib.testing.manager.Manager object at 0x119174260>

    def test_chain_child_replaced_with_chain_middle(self, manager):
        orig_sig = chain(
            identity.s(42), replace_with_chain.s(), identity.s()
        )
        res_obj = orig_sig.delay()
>       assert res_obj.get(timeout=TIMEOUT) == 42

t/integration/test_canvas.py:803:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
celery/result.py:251: in get
    return self.backend.wait_for_pending(
celery/backends/asynchronous.py:221: in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
celery/backends/asynchronous.py:287: in _wait_for_pending
    for _ in self.drain_events_until(
celery/backends/asynchronous.py:54: in drain_events_until
    yield self.wait_for(p, wait, timeout=interval)
celery/backends/asynchronous.py:63: in wait_for
    wait(timeout=timeout)
celery/backends/redis.py:161: in drain_events
    message = self._pubsub.get_message(timeout=timeout)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:1072: in get_message
    response = self.parse_response(block=(timeout is None), timeout=timeout)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:883: in parse_response
    response = self._execute(conn, try_read)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:859: in _execute
    return conn.retry.call_with_retry(
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/retry.py:62: in call_with_retry
    return do()
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:860: in <lambda>
    lambda: command(*args, **kwargs),
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:881: in try_read
    return conn.read_response(disconnect_on_error=False, push_request=True)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <valkey.connection.Connection(host=localhost,port=6380,db=0)>, disable_decoding = False

    def read_response(
        self,
        disable_decoding=False,
        *,
        disconnect_on_error=True,
        push_request=False,
    ):
        """Read the response from a previously sent command"""

        host_error = self._host_error()

        try:
            if self.protocol in ["3", 3] and not LIBVALKEY_AVAILABLE:
                response = self._parser.read_response(
                    disable_decoding=disable_decoding, push_request=push_request
                )
            else:
                response = self._parser.read_response(disable_decoding=disable_decoding)
        except socket.timeout:
            if disconnect_on_error:
                self.disconnect()
            raise TimeoutError(f"Timeout reading from {host_error}")
        except OSError as e:
            if disconnect_on_error:
                self.disconnect()
            raise ConnectionError(
                f"Error while reading from {host_error}" f" : {e.args}"
            )
        except BaseException:
            # Also by default close in case of BaseException.  A lot of code
            # relies on this behaviour when doing Command/Response pairs.
            # See #1128.
            if disconnect_on_error:
                self.disconnect()
            raise

        if self.health_check_interval:
            self.next_health_check = time() + self.health_check_interval

        if isinstance(response, ResponseError):
            try:
>               raise response
E               valkey.exceptions.ResponseError: NOSUB 'unsubscribe' command executed not in subscribed mode

there are a couple of problems here:

  1. the error shouldn't happen, at least not in this form as you can see in the traceback, PubSub.get_message() is called which containes this check and even if this doesn't work, this method is calling PubSub.parse_response() which should raise an error as this shows

  2. this error is not consistent, which leads me to believe some sort of race condition is happening, i'm not sure if this is true yet, and if it is, what is causing it.

but the main issue is the first one, if you could help me understand why valkey itself is raising NOSUB, while valkey-py should have raised an error before valkey even gets envolved

important note: this problem is also true when using redis-py

aiven-sal commented 1 week ago

Hi! Do you have a way to reproduce the issue?

BTW there isn't anything in valkey-py that tries to hide that kind of error from valkey. Getting ResponseError: NOSUB 'unsubscribe' command executed not in subscribed mode doesn't sound like a bug to me.

amirreza8002 commented 1 week ago

hi the bug is in celery. no doubt in that

the question in my mind is about the two links i shared from what i understand, the first one should return None and don't pass the command to valkey even if that doesn't work, the second link should raise a RuntimeError and don't pass the command to valkey

or am i misunderstanding this? I'm judging based on the comments in the code, perhaps there's more to it

about reproducing... I'm not sure i know a way outside of running celery, which I'll share information shortly

amirreza8002 commented 1 week ago

this seems to raise the error

this from the issue i made at valkey explains how to run the celery integration test