leporo / tornado-redis

Asynchronous Redis client that works within Tornado IO loop.
667 stars 163 forks source link

Unsubscribe from pub/sub returns before the Client finishes unsubscribing #60

Closed apolloFER closed 10 years ago

apolloFER commented 10 years ago

Issue: Code is similar to the Websocket pub/sub demo. The problem occures when the websocket is disconnected and close() is called:

@tornado.gen.engine
def cleanup(self):

    print "-- Connection %s to client %s closing" % (self.identifier, self.client_identifier)

    if self.client_pubsub.subscribed:
        yield tornado.gen.Task(self.client_pubsub.unsubscribe, ["ngs:fc:chat:global", self.disconnect_channel])

    self.client_pubsub.disconnect()

The yield from unsubscribing returns when the channel is supposed to be unsubscribed. The disconnect is then called. The problem occures when the pub/sub is receiving a published message in the exact moment the unsubscribe is called. The yield returns and the disconnect is called. After that the server throws an exception:

ERROR:tornado.application:Exception in callback <functools.partial object at 0x1694e10>
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 458, in _run_callback
    callback()
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/iostream.py", line 341, in wrapper
    callback(*args)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py", line 148, in read_callback
    callback(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 550, in inner
    self.set_result(key, result)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 476, in set_result
    self.run()
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 1087, in listen
    response = yield gen.Task(response)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 533, in run
    self.yield_point.start(self)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 371, in start
    self.func(*self.args, **self.kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 153, in wrapper
    runner.run()
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 479, in consume_multibulk
    data = yield gen.Task(self.connection.readline)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 533, in run
    self.yield_point.start(self)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 371, in start
    self.func(*self.args, **self.kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py", line 154, in readline
    raise ConnectionError('Tried to read from '
ConnectionError: Tried to read from non-existent connection

When checking the "subscribed" variable of the Client object its value is a set with objects in it for a millisecond or so after the unsubscribe is finished. The solution is the following code:

@tornado.gen.engine
def cleanup(self):

    print "-- Connection %s to client %s closing" % (self.identifier, self.client_identifier)

    if self.client_pubsub.subscribed:
        yield tornado.gen.Task(self.client_pubsub.unsubscribe, "ngs:fc:chat:global")
        yield tornado.gen.Task(self.client_pubsub.unsubscribe, self.disconnect_channel)

    def check():        
        if self.client_pubsub.subscribed:
            tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(milliseconds=1), check)
        else:
            self.client_pubsub.disconnect()

    tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(milliseconds=1), check)

The unsubscribe task should yield back when the Client really finishes unsubscribing.

leporo commented 10 years ago

Thanks a lot.

The unsubscribe command receives no immediate response. The delay you've described is between sending the UNSUBSCRIBE command and receiving an UNSUBSCRIBE message to a channel (when client exits the listen loop).

An obvious solution is to call a callback, passed to unsubscribe method, when UNSIBSCRIBE message has received. But it should be backed with timeout to be sure there will be no infinite looping. I'll implement and test this when have spare time.

If you need a solution right now, please give a try to the SockJS demo. It uses a generic channel subscription handlers I created to simplify Pub/Sub implementation. It uses a single redis connection to handle all subscriptions so there is no need to do explicit disconnects.

apolloFER commented 10 years ago

EDIT2: I kinda misunderstood what you were trying to say. :) I thought you meant that calling unsubscribe with my own callback would fix the problem. Anyway, it looks like calling disconnect when the listen callback receives the last unsubscribe call fixes the problem for now. It's a way around more than a fix.

Actually, I tried that too. I know that unsubscribe can receive a callback. That callback is usually filled by the tornado.gen.Task (as I provided in the sample). The same happens if I provide the callback myself:

    @tornado.gen.engine
    def cleanup(self):

        print datetime.datetime.now(), "unsubscribing"

        if self.client_pubsub.subscribed:
            # yield tornado.gen.Task(self.client_pubsub.unsubscribe, ["darko:test:sub"]) - the way from previous post
            self.client_pubsub.unsubscribe("darko:test:sub", self.on_unsubscribe)

    def on_unsubscribe(self, state):
        print datetime.datetime.now(), "unsubscribed"
        self.client_pubsub.disconnect()

The way I test it is to put 2 add_timeouts (to reproduce the bug we experience in production) to the init method:

            tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(seconds=2), self.test_message)
            tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(seconds=2), self.cleanup)

The first method is a plain publish with some random message (it also prints "pushing" to stdout).

Here's the stdout of a sample:

2014-03-05 17:47:31.885451 pushing
2014-03-05 17:47:31.886115 unsubscribing
2014-03-05 17:47:31.886643 unsubscribed

And after that I get a crash.

If you want, I can upload the entire .py file for you to see what happens. I found a better fix which is to disconnect when the method I pass as a callback to .listen() receives an unsubscribe message with body == 0 (which means that the number of subscribed patterns is zero). But that's only a fix that fits into my use case.

EDIT: Here is the entire code: http://pastebin.com/bxejSeLJ

leporo commented 10 years ago

Thanks.

Your code does the same thing I wrote in my first comment: the callback, passed to unsubscribe method, being called when UNSIBSCRIBE message comes from redis server.

It's now called immediately after sending the UNSUBSCRIBE command.

I pushed a quick fix to the task-60 branch. It would be great if you try it. I can't merge it to master yet because there may be issues with PUNSUBSCRIBE command.

apolloFER commented 10 years ago

I have just tried the task-60 branch and yes, it fixes the issue. The standard yield Task way now works. Tnx

leporo commented 10 years ago

Merget it into the master branch and updated the PyPI package. Sorry for delay