leporo / tornado-redis

Asynchronous Redis client that works within Tornado IO loop.
667 stars 163 forks source link

Error when subscribing to multiple channels #70

Closed adam-benayoun closed 10 years ago

adam-benayoun commented 10 years ago

The following error is raised on the second subscription to multiple channels (works first time on a connection, fails on the second subscription on the same connection): File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/web.py", line 1115, in _stack_context_handle_exception raise_exc_info((type, value, traceback)) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception if tail.exit(_exc): File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit return self.exception_handler(type, value, traceback) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception return runner.handle_exception(typ, value, tb) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception self.run() File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 505, in run yielded = self.gen.throw(_exc_info) File "/Users/adam/BUD/budlist/app/backend/handlers/realtime_updates/realtime_updates.py", line 23, in get yield gen.Task(self.controller.subscribe_and_listen, app, user) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception if tail.exit(_exc): File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit return self.exception_handler(type, value, traceback) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception return runner.handle_exception(typ, value, tb) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception self.run() File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 505, in run yielded = self.gen.throw(_exc_info) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 1147, in listen response = yield gen.Task(response) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 302, in wrapped ret = fn(_args, *_kwargs) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 550, in inner self.set_result(key, result) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 476, in set_result self.run() File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 507, in run yielded = self.gen.send(next) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 504, in consume_multibulk token = self.process_data(data, cmd_line) File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 491, in process_data cmd_line) ResponseError: ResponseError (on LISTEN [(), {}]): Unknown response type s

Here is some additional info (logging of "data" in process_data): E 140815 15:50:48 realtime_updates:36] got a message: Message(kind=u'subscribe', channel=u'b', body=2, pattern=u'b') [E 140815 15:51:08 client:468] 3 [E 140815 15:51:08 client:468] $9 [E 140815 15:51:08 client:468] $1 [E 140815 15:51:08 client:468] :2 [E 140815 15:51:08 realtime_updates:36] got a message: Message(kind=u'subscribe', channel=u'a', body=2, pattern=u'a') [E 140815 15:51:08 client:468] 3 [E 140815 15:51:08 client:468] $9 [E 140815 15:51:08 client:468] subscribe

leporo commented 10 years ago

Sorry, I couldn't check it. Have you found a solution?

adam-benayoun commented 10 years ago

I think, yes, but would like to hear your opinion. Basically, I think that the problem was that I used the same tornado-redis client for 2 subscribers, being asynchronous as opposed to redis-py, there was some interleaving in which the bulk wasn't consumed (_consume_bulk) after the response prefix ($, *, etc) but rather caught by process_data.. so was complaining about 's' being bad prefix (when "subscribe" interleaved)... What I did was using the same connection_pool for the new Client I create for each subscriber.. So I was forced either to use a connection per subscriber or using a connection pool.. Do you think that if I could pass into the Client the same connection every time it would let me achieve the same without using the connection pool? Since this is an async client, it's main objective is for pub/sub as opposed to redis-py.. what do you think?

leporo commented 10 years ago

I think it was my worst mistake to implement the connection pooling for tornado-redis. One never needs a connection pooling for Pub/Sub operations.

And, frankly speaking, the connection pooling implementation is not as good as it could be, so I won't recommend it for other cases :)

Please check my answer on StackOverflow on querying Redis server from Tornado application. Hope it helps.

adam-benayoun commented 10 years ago

I think you did a great job there :) how would you recommend working around that interleaving issue? I don't think I have a way to inject a connection into tornadoredis right? On Aug 16, 2014 4:50 PM, "Vlad Glushchuk" notifications@github.com wrote:

I think it was my worst mistake to implement the connection pooling for tornado-redis. One never needs a connection pooling for Pub/Sub operations.

And, frankly speaking, the connection pooling implementation is not as good as it could be, so I couldn't recommend it for other cases :)

Please check my answer on StackOverflow on querying Redis server from Tornado application http://stackoverflow.com/questions/5953786/how-do-you-properly-query-redis-from-tornado/15596969#15596969. Hope it helps.

— Reply to this email directly or view it on GitHub https://github.com/leporo/tornado-redis/issues/70#issuecomment-52393594.

leporo commented 10 years ago

Can you please share the code you're having issues with?

Do you use one of these helper classes in your project? It doesn't support PSUBSCRIBE at the moment, but there is a feature request to implement it.

adam-benayoun commented 10 years ago

Sure, it will be kinda hard because it involves several modules, let me try. Btw, I am not using PSUBSCRIBE/other P.*

One important thing to note here is that self.handler.tornadoredis hold the same tornadoredis.Client instance for all RequestHandler instances (in a repository), that made me thought that it's actually not "thread safe" and that I have to at least create a new tornadoredis.Client for each request (which worked for me btw..), I just did an additional trial by monkey patching the Client with the same connection every time because my issue seems to be that I either can use connection pool or use the same Client instance AND connection and it actually worked.

Here is the controller code: from backend.controllers.base import Controller import json from tornado import gen import logging from tornado.ioloop import IOLoop import time from backend.settings import options from backend.models.realtime_updates.realtime_updates import RUDisconnect,\ RealtimeUpdatesChannels from backend.utils import bson_json import functools

class RealtimeUpdatesController(Controller):

def __init__(self, handler):
    super(RealtimeUpdatesController, self).__init__(handler)
    self.disconnected = False
    self.polls_done = 0
    self.channels = []
    self.messages = []

@gen.engine
def subscribe_and_listen(self, app, user, callback):
    groups = json.loads(self.get_argument('groups'))
    channels = [
        RealtimeUpdatesChannels.GroupChannel.format(group_id=group) for group in groups
    ]
    channels.append(RealtimeUpdatesChannels.UserChannel.format(user_id=user['_id']))
    self.channels = channels
    yield gen.Task(self.handler.tornadoredis.subscribe, self.channels)
    self.handler.tornadoredis.listen(functools.partial(self._on_message, callback))
    self._poll_messages(callback)

@gen.engine
def unsubscribe(self, callback):
    yield gen.Task(self.handler.tornadoredis.unsubscribe, self.channels)
    callback(None)

def _on_message(self, callback, msg):
    if msg.kind == 'message':
        message = self._create_message(msg)
        self.messages.append(message)
    elif msg.kind == 'disconnect':
        self.disconnected = True
    elif msg.kind in ('subscribe', 'unsubscribe'):
        pass
    else:
        logging.error('Unknown message: type:%s, body:%s' % (msg.kind, msg.body))

def _poll_messages(self, callback):
    if self.messages:
        callback(dict(messages=self.messages))
    elif self.disconnected:
        callback(dict(messages={
            'message': RUDisconnect.create({})
        }))
    else:
        poll_timeout = options.realtime_updates_poll_timeout
        if self.polls_done < poll_timeout:
            response_ival = options.realtime_updates_min_response_interval
            poll_messages = functools.partial(self._poll_messages, callback)
            IOLoop.instance().add_timeout(time.time() + response_ival, poll_messages)
            self.polls_done += 1
        else:
            callback(self.messages)

def _create_message(self, msg):
    return {
        'message': bson_json.loads(msg.body)
    }

def publish(self, channel, msg):
    self.repo.redis.publish(channel, bson_json.dumps(msg))

adam-benayoun commented 10 years ago

Any idea on how to minimize the number of Redis connections with tornado-redis and scale to > 100k connected clients?