leporo / tornado-redis

Asynchronous Redis client that works within Tornado IO loop.
667 stars 163 forks source link

Can't subscribe to another channel after entering a listen loop #39

Closed FZambia closed 11 years ago

FZambia commented 11 years ago

I started to experiment with tornado-redis and stumbled on some strange behaviour. Look at this code:

from __future__ import print_function
import tornado.ioloop
import tornado.gen
import tornadoredis
import time

subscriber = tornadoredis.Client()
subscriber.connect()

publisher = tornadoredis.Client()
publisher.connect()

@tornado.gen.engine
def on_message(message):
    print(message)

    if message.body == 'subscribe extra channels':
        try:
            yield tornado.gen.Task(subscriber.subscribe, 'second')
        except Exception as e:
            print(e)

        try:
            print('yielding subscribing on third')
            yield tornado.gen.Task(subscriber.subscribe, 'third')
        except Exception as e:
            print(e)

@tornado.gen.engine
def subscribe():
    yield tornado.gen.Task(subscriber.subscribe, 'first')
    subscriber.listen(on_message)
    tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 2, publish)

@tornado.gen.engine
def publish():
    print('publishing message...')
    try:
        yield tornado.gen.Task(publisher.publish, 'first', 'subscribe extra channels')
    except Exception as e:
        print(e)

if __name__ == '__main__':
    tornado.ioloop.IOLoop.instance().add_callback(subscribe)
    tornado.ioloop.IOLoop.instance().start()

In this example I am subscribing on channel 'first' Then after 2 seconds publish message into that channel. In message handler function when that message received I am trying to subscribe on 2 new channels. Subscribing on channel 'second' seems to be successful, but subscribing on channel 'third' never happens.

I investigated a little - It seems that execution of subscribing on channel 'third' stops on this line of client's execute_command method:

yield gen.Task(self.connection.wait_until_ready)

Connection is not ready and read_callbacks have one wrapped function. And it seems to never yielded.

leporo commented 11 years ago

UPD: There is nothing wrong with the code above, so please ignore this comment.

Please don't use @tornado.gen.engine decorator for Pub/Sub message handlers. They should be as simple and as fast as possible. If you need some yield'ed steps there, just create a function, wrapped with @tornado.gen.engine, and use tornado.add_callback method to invoke it from message handler:

def on_message(message):
    if message.body == 'subscribe extra channels':
        tornado.ioloop.IOLoop.current().add_callback(subscribe_to_extra_channels)

@tornado.gen.engine
def subscribe_to_extra_channels():
    try:
        yield tornado.gen.Task(subscriber.subscribe, 'second')
    except Exception as e:
        print(e)

    try:
        print('yielding subscribing on third')
        yield tornado.gen.Task(subscriber.subscribe, 'third')
    except Exception as e:
        print(e)

Pub/Sub message handlers are being called directly from the message handling loop to reduce an overhead of setting up callbacks so one should do this manually if needs to.

leporo commented 11 years ago

Sorry, there seem to be a bug.

leporo commented 11 years ago

Fixed.

I updated your code a bit:

from __future__ import print_function
import tornado.ioloop
import tornado.gen
import tornadoredis
import time

subscriber = tornadoredis.Client()
subscriber.connect()

publisher = tornadoredis.Client()
publisher.connect()

def on_message(message):
    print('on_message: ' + str(message))

    if message.body == 'subscribe to extra channels':
        subscribe_extras()
    elif message.body == 'subscribe to multiple channels':
        subscribe_multiple()

@tornado.gen.engine
def subscribe_extras():
    yield tornado.gen.Task(subscriber.subscribe, 'second')
    print('Subscribed to the second channel')

    yield tornado.gen.Task(subscriber.subscribe, 'third')
    print('Subscribed to the third channel')

@tornado.gen.engine
def subscribe_multiple():
    yield tornado.gen.Task(subscriber.subscribe, ['fourth', 'fifth'])
    print('Subscribed to five channels')

@tornado.gen.engine
def subscribe():
    yield tornado.gen.Task(subscriber.subscribe, 'first')
    print('Subscribed')
    subscriber.listen(on_message)
    tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 1, publish)

@tornado.gen.engine
def publish():
    yield tornado.gen.Task(publisher.publish, 'first', 'subscribe to extra channels')

    yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 0.5)

    yield tornado.gen.Task(publisher.publish, 'second', 'message to the second channel')
    yield tornado.gen.Task(publisher.publish, 'third', 'message to the third channel')

    yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 0.5)

    yield tornado.gen.Task(publisher.publish, 'first', 'subscribe to multiple channels')

    yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 0.5)

    yield tornado.gen.Task(publisher.publish, 'fourth', 'message to the fourth channel')
    yield tornado.gen.Task(publisher.publish, 'fifth', 'message to the fifth channel')

    yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 0.5)

if __name__ == '__main__':
    tornado.ioloop.IOLoop.instance().add_callback(subscribe)
    tornado.ioloop.IOLoop.instance().start()

I've added some 'add_timeout' calls to let subscriber receive a 'subscribe to more channels' request before posting messages to these extra channels.

leporo commented 11 years ago

Please check if these changes work for you.

FZambia commented 11 years ago

Thank you for such a quick response!

The problem is still here: when I running your code I am not receiving message which was published into channel 'third'.

I simplified a bit:

from __future__ import print_function
import tornado.ioloop
import tornado.gen
import tornadoredis
import time

subscriber = tornadoredis.Client()
subscriber.connect()

publisher = tornadoredis.Client()
publisher.connect()

def on_message(message):
    print('received', message.kind, message.body)

    if message.body == 'subscribe to extra channels':
        subscribe_extras()

@tornado.gen.engine
def subscribe_extras():
    yield tornado.gen.Task(subscriber.subscribe, 'second')
    print('Subscribed to the second channel')

    yield tornado.gen.Task(subscriber.subscribe, 'third')
    print('Subscribed to the third channel')

@tornado.gen.engine
def subscribe():
    yield tornado.gen.Task(subscriber.subscribe, 'first')
    print('Subscribed to the first channel')
    subscriber.listen(on_message)
    tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 1, publish)

@tornado.gen.engine
def publish():
    yield tornado.gen.Task(publisher.publish, 'first', 'subscribe to extra channels')
    yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 2)
    yield tornado.gen.Task(publisher.publish, 'second', 'message to the second channel')
    yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 2)
    yield tornado.gen.Task(publisher.publish, 'third', 'message to the third channel')

if __name__ == '__main__':
    tornado.ioloop.IOLoop.instance().add_callback(subscribe)
    tornado.ioloop.IOLoop.instance().start()

On my machine output of running this:

Subscribed to the first channel
received subscribe 1
received message subscribe to extra channels
Subscribed to the second channel
received subscribe 2
received message message to the second channel

But there are no third channel's messages! No subscription message and no published message because we did not subscribe. Look at Redis's monitor command output:

1375800431.532975 "SUBSCRIBE" "first"
1375800432.534694 "PUBLISH" "first" "subscribe to extra channels"
1375800432.537653 "SUBSCRIBE" "second"
1375800434.536302 "PUBLISH" "second" "message to the second channel"
1375800436.538984 "PUBLISH" "third" "message to the third channel"

There is no "SUBSCRIBE" "third"

leporo commented 11 years ago

Please try to uninstall and install the tornado-redis package.

I uploaded the buggy revision 2.4.4 to PyPI and then (about an hour later) replaced it with the one that should work. I'm really sorry for this.

If this won't help please check the line 381 of client.py file. It should look like this:

            if not self.subscribed and not self.connection.ready():
                yield gen.Task(self.connection.wait_until_ready)
FZambia commented 11 years ago

Yes, it fixed the issue! Thank you very much!!