evilkost / brukva

Asynchronous Redis client that works within Tornado IO loop.
Other
265 stars 33 forks source link

None is passed to Client.listen callback instead of brukva.client.Message instance #10

Open kmike opened 13 years ago

kmike commented 13 years ago

I got this problem again.

Stack trace from ipdb's 'bt' command:

/Users/kmike/svn/tornadio/tornadio/server.py(71)__init__()
     67             except Exception, ex:
     68                 logging.error('Failed to start Flash policy server: %s', ex)
     69 
     70         logging.info('Entering IOLoop...')
---> 71         io_loop.start()

  /Users/kmike/svn/tornado/tornado/ioloop.py(270)start()
    268                 fd, events = self._events.popitem()
    269                 try:
--> 270                     self._handlers[fd](fd, events)
    271                 except (KeyboardInterrupt, SystemExit):
    272                     raise

  /Users/kmike/svn/tornado/tornado/stack_context.py(173)wrapped()
    171                 callback(*args, **kwargs)
    172         else:
--> 173             callback(*args, **kwargs)
    174     if getattr(fn, 'stack_context_wrapped', False):
    175         return fn

  /Users/kmike/svn/tornado/tornado/iostream.py(199)_handle_events()
    197         try:
    198             if events & self.io_loop.READ:
--> 199                 self._handle_read()
    200             if not self.socket:
    201                 return

  /Users/kmike/svn/tornado/tornado/iostream.py(251)_handle_read()
    249                 # can't see it; the only way to find out if it's there is to

    250                 # try to read it.

--> 251                 result = self._read_to_buffer()
    252             except Exception:
    253                 self.close()

  /Users/kmike/svn/tornado/tornado/iostream.py(287)_read_to_buffer()
    285         """
    286         try:
--> 287             chunk = self._read_from_socket()
    288         except socket.error, e:
    289             # ssl.SSLError is a subclass of socket.error

  /Users/kmike/svn/tornado/tornado/iostream.py(275)_read_from_socket()
    273                 raise
    274         if not chunk:
--> 275             self.close()
    276             return None
    277         return chunk

  /Users/kmike/svn/tornado/tornado/iostream.py(180)close()
    178             self.socket = None
    179             if self._close_callback:
--> 180                 self._run_callback(self._close_callback)
    181 
    182     def reading(self):

  /Users/kmike/svn/tornado/tornado/iostream.py(230)_run_callback()
    228             # inside our blanket exception handler rather than outside.

    229             with stack_context.NullContext():
--> 230                 callback(*args, **kwargs)
    231         except:
    232             logging.error("Uncaught exception, closing connection.",

  /Users/kmike/svn/tornado/tornado/stack_context.py(173)wrapped()
    171                 callback(*args, **kwargs)
    172         else:
--> 173             callback(*args, **kwargs)
    174     if getattr(fn, 'stack_context_wrapped', False):
    175         return fn

  /Users/kmike/svn/tornado/tornado/websocket.py(204)on_connection_close()
    202     def on_connection_close(self):
    203         self.client_terminated = True
--> 204         self.on_close()
    205 
    206     def _not_supported(self, *args, **kwargs):

  /Users/kmike/svn/tornadio/tornadio/persistent.py(84)on_close()
     82     def on_close(self):
     83         try:
---> 84             self.connection.on_close()
     85         finally:
     86             self.connection.is_closed = True

  /Users/kmike/dev/planor/realtime/connection.py(73)on_close()
     71     def on_close(self):
     72         if self.authenticated:
---> 73             self.logout()
     74 
     75     def logout(self):

  /Users/kmike/dev/planor/realtime/connection.py(77)logout()
     75     def logout(self):
     76         self.authenticated = False
---> 77         self.pubsub_client.disconnect()
     78         for handler in self.handlers:
     79             handler.on_logout()

  /Users/kmike/svn/brukva/brukva/client.py(334)disconnect()
    332 
    333     def disconnect(self):
--> 334         self.connection.disconnect()
    335 
    336     def on_connect(self):

  /Users/kmike/svn/brukva/brukva/client.py(141)disconnect()
    139             except socket.error, e:
    140                 pass
--> 141             self._stream = None
    142 
    143     def write(self, data, try_left=None):

  /Users/kmike/svn/brukva/brukva/client.py(873)listen()
    871                     raise response
    872                 result = self.format_reply(cmd_listen, response)
--> 873                 ctx.ret_call(result)
    874 
    875     ### CAS

  /Users/kmike/svn/brukva/brukva/client.py(39)__exit__()
     37 
     38         if self.is_active:
---> 39             self._call_callbacks(value)
     40             return True
     41         else:

  /Users/kmike/svn/brukva/brukva/client.py(29)_call_callbacks()
     27                     cb(value)
     28             else:
---> 29                 self.callbacks(value)
     30 
     31     def __enter__(self):

  /Users/kmike/dev/planor/realtime/connection.py(83)on_pubsub_message()
     79             handler.on_logout()
     80 
     81     def on_pubsub_message(self, message):
     82         for handler in self.handlers:
---> 83             handler.on_pubsub_message(message)

None
> /Users/kmike/dev/planor/realtime/handlers/debug.py(14)on_pubsub_message()
     12         logging.debug(message)
     13         if message is None:
---> 14             import ipdb; ipdb.set_trace()
     15         #logging.debug('channel: %s, body: %s' % (message.channel, message.body))

     16 
kmike commented 13 years ago

Hmm, this may be because I don't unsubscribe before disconnecting.

The bad thing is that exception occurred in Client.listen callback was printed to stderr and ignored so I haven't noticed it immediately:

Exception AttributeError: "'NoneType' object has no attribute 'channel'" in <generator object listen at 0x102a70aa0> ignored
kmike commented 13 years ago

I'm still getting None as a message sometimes (with latest master):

  /Users/kmike/svn/tornadio/tornadio/server.py(71)__init__()
     67             except Exception, ex:
     68                 logging.error('Failed to start Flash policy server: %s', ex)
     69 
     70         logging.info('Entering IOLoop...')
---> 71         io_loop.start()

  /Users/kmike/svn/tornado/tornado/ioloop.py(220)start()
    218             callbacks = self._callbacks
    219             self._callbacks = []
--> 220             for callback in callbacks:
    221                 self._run_callback(callback)
    222 

  /Users/kmike/svn/brukva/brukva/client.py(914)listen()
    912                         break
    913                 else:
--> 914                     ctx.ret_call(result)
    915 
    916     ### CAS

  /Users/kmike/svn/brukva/brukva/client.py(43)__exit__()
     41 
     42         if self.is_active:
---> 43             self.ret_call(value)
     44             return True
     45         else:

  /Users/kmike/svn/brukva/brukva/client.py(56)ret_call()
     54     def ret_call(self, value):
     55         self.is_active = False
---> 56         self._call_callbacks(self.callbacks, value)
     57         self.is_active = True
     58 

  /Users/kmike/svn/brukva/brukva/client.py(30)_call_callbacks()
     28                     cb(value)
     29             else:
---> 30                 callbacks(value)
     31 
     32     def __enter__(self):

> /Users/kmike/dev/planor/realtime/connection.py(84)on_pubsub_message()
     82         if message is None:
     83             import ipdb; ipdb.set_trace()
---> 84             logging.warning('on_pubsub_message is None')
     85             return
     86

Don't know if it is a bug: pub/sub seems to work otherwise. Can you share some knowledge about what the problem may be?

State (maybe it will help):

ipdb> self.pubsub_client.subscribed
True

ipdb> self.pubsub_client._waiting_callbacks
defaultdict(<type 'list'>, {})

ipdb> self.pubsub_client.queue
[]
kmike commented 13 years ago

Btw,

self.is_active = False
self._call_callbacks(self.callbacks, value)
self.is_active = True

always seems scary (as well as ExecutionContext.enable and ExecutionContext.disable) because is_active can become incorrect if exception in _call_callbacks occurs and it is not clear how this is handled. Though I've checked the code when 'enable' and 'disable' methods were introduced and they were applied carefully, with all edge cases handled.

evilkost commented 13 years ago

Sorry, i can't reproduce. Could you try 85c67d9fe and show logs for additional info.

kmike commented 13 years ago

Yes, sure. Thanks for moving things forward! It now produces this:

ERROR:brukva.client:None
Traceback (most recent call last):
  File "/Users/kmike/svn/brukva/brukva/client.py", line 901, in listen
    data = yield async(self.connection.readline)()
GeneratorExit
evilkost commented 13 years ago

I'm confused by (type_=GeneratorExit, value==None) in ExecutionContext.exit . And why None is not replaced with Exception?!:

    if self.error_wrapper:
        value = self.error_wrapper(value)
    else:
        value = value or Exception(
            'Strange exception with None value type: %s; tb: %s' %
            (type_, '\n'.join(traceback.format_tb(tb))
        ))

Can't come up with test case to reproduce this. Could you extract example from your project?

burakdede commented 12 years ago

[E 111211 10:54:35 client:49] None Traceback (most recent call last): File "/usr/local/lib/python2.6/dist-packages/brukva-0.0.1-py2.6.egg/brukva/client.py", line 926, in listen data = yield async(self.connection.readline)() GeneratorExit

Seems like this issue still holds. Here is scenerio for to reproduce it. Make a long polling for a while but if cilent move to idle state brukva fire this exception.

class MessageUpdatesHandler(BaseHandler):
    @tornado.web.authenticated
    @tornado.web.asynchronous
    def post(self):
        self.listing_id = self.get_argument("listing_id", None)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe(self.listing_id)
        self.client.listen(self.on_new_messages)

    def on_new_messages(self, messages):
        # Closed client connection
        if self.request.connection.stream.closed():
            return
        self.finish(dict(messages=str(messages.body)))
        self.client.unsubscribe(self.listing_id)

    def on_connection_close(self):
        # unsubscribe user from channel
        self.client.unsubscribe(self.listing_id)
        self.client.disconnect()