redis / redis-py

Redis Python client
MIT License
12.45k stars 2.48k forks source link

PubSub error when running in threads with redis-py 5.0.3 #3184

Open spicy-sauce opened 4 months ago

spicy-sauce commented 4 months ago

pubsub.py:

import sys
import time

import redis

# Global variable to indicate if the event handler is invoked
event_handler_invoked = False

def event_handler(message):
    # Event handler function
    global event_handler_invoked
    print("Event handler invoked with message:", message)
    event_handler_invoked = True

def setup_pubsub():
    # Function to set up Redis pub/sub
    # Connect to Redis
    redis_client = redis.Redis(host="localhost", port=12000)

    # Subscribe to a channel pattern
    pubsub = redis_client.pubsub()
    pubsub.psubscribe(**{'test_channel*': event_handler})

    # Start a thread to handle messages
    pubsub.run_in_thread(sleep_time=0.1)

def main():
    setup_pubsub()

    # Publish a test message
    redis_client = redis.Redis(host="localhost", port=12000)
    redis_client.publish('test_channel', 'Hello, Redis!')

    # Wait for the event handler to be invoked
    timeout = 5  # Timeout in seconds
    start_time = time.time()
    while not event_handler_invoked:
        if time.time() - start_time > timeout:
            print("Timeout reached. Event handler not invoked.")
            sys.exit(1)
        time.sleep(0.1)

    print("Event handler successfully invoked.")

if __name__ == "__main__":
    main()

In redis-py 4.6.0 it works as expected:

$ python3 pubsub.py
Event handler invoked with message: {'type': 'pmessage', 'pattern': b'test_channel*', 'channel': b'test_channel', 'data': b'Hello, Redis!'}
Event handler successfully invoked.

With redis-py 5.0.3 it constantly fails with:

$ python3 pubsub.py
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/_parsers/socket.py", line 69, in _read_from_socket
    buf.write(data)
ValueError: I/O operation on closed file.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 1162, in run
    pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 1024, in get_message
    response = self.parse_response(block=(timeout is None), timeout=timeout)
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 835, in parse_response
    response = self._execute(conn, try_read)
  File "/home/ubuntu//venv3.8/lib/python3.8/site-packages/redis/client.py", line 811, in _execute
    return conn.retry.call_with_retry(
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 812, in <lambda>
    lambda: command(*args, **kwargs),
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 829, in try_read
    if not conn.can_read(timeout=timeout):
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/connection.py", line 490, in can_read
    return self._parser.can_read(timeout)
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/_parsers/base.py", line 128, in can_read
    return self._buffer and self._buffer.can_read(timeout)
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/_parsers/socket.py", line 95, in can_read
    return bool(self.unread_bytes()) or self._read_from_socket(
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/_parsers/socket.py", line 90, in _read_from_socket
    buf.seek(current_pos)
ValueError: I/O operation on closed file.
^CTraceback (most recent call last):
  File "pubsub.py", line 54, in <module>
    main()
  File "pubsub.py", line 48, in main
    time.sleep(0.1)

Tested in Python3.8 and Python3.11, same result.

kheyer commented 3 months ago

Running into a similar issue with Celery/Redis. Downgrading to 4.6.0 didn't help. For me, the issue triggers when I try to get the results of celery tasks

Stack trace when gathering celery task results.

     results = await asyncio.gather(*[asyncio.to_thread(task.get) for task in tasks])
   File "/usr/local/lib/python3.9/asyncio/threads.py", line 25, in to_thread
     return await loop.run_in_executor(None, func_call)
   File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 58, in run
     result = self.fn(*self.args, **self.kwargs)
   File "/usr/local/lib/python3.9/site-packages/celery/result.py", line 251, in get
     return self.backend.wait_for_pending(
   File "/usr/local/lib/python3.9/site-packages/celery/backends/asynchronous.py", line 221, in wait_for_pending
     for _ in self._wait_for_pending(result, **kwargs):
   File "/usr/local/lib/python3.9/site-packages/celery/backends/asynchronous.py", line 287, in _wait_for_pending
     for _ in self.drain_events_until(
   File "/usr/local/lib/python3.9/site-packages/celery/backends/asynchronous.py", line 54, in drain_events_until
     yield self.wait_for(p, wait, timeout=interval)
   File "/usr/local/lib/python3.9/site-packages/celery/backends/asynchronous.py", line 63, in wait_for
     wait(timeout=timeout)
   File "/usr/local/lib/python3.9/site-packages/celery/backends/redis.py", line 161, in drain_events
     message = self._pubsub.get_message(timeout=timeout)
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1690, in get_message
     response = self.parse_response(block=(timeout is None), timeout=timeout)
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1542, in parse_response
     response = self._execute(conn, try_read)
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1518, in _execute
     return conn.retry.call_with_retry(
  File "/usr/local/lib/python3.9/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1519, in <lambda>
     lambda: command(*args, **kwargs),
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1536, in try_read
    if not conn.can_read(timeout=timeout):
   File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 869, in can_read
    return self._parser.can_read(timeout)
   File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 344, in can_read
     return self._buffer and self._buffer.can_read(timeout)
   File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 242, in can_read
     return bool(self.unread_bytes()) or self._read_from_socket(
   File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 237, in _read_from_socket
     buf.seek(current_pos)
ValueError: I/O operation on closed file.