mrjoes / tornadio2

Python socket.io server implementation on top of Tornado framework
Other
523 stars 118 forks source link

Support custom async response to socket.io events. #61

Open tabouassaleh opened 11 years ago

tabouassaleh commented 11 years ago

Current implementation of socket.io is synchronous in that the raw_message function in session.py wait for the result from the event handler, and send that result as an ACK response to the event message.

This pull request changes that behaviour to optionally allow the user of the library to use celery+redis to dispatch processing, and then send it back to the client at a later time without blocking Tornado during processing.

For instance, using Brukva, the script starting the tornado service could include:

class Command(BaseCommand):
    def handle(self, *args, **options):
        # ...
        router = TornadioRouter(BaseSocket, {
            # ...
        })
        self._server = router.urls[0][2]['server']
        # ...
        c = brukva.Client()
        c.connect()
        c.subscribe('event_result')
        c.subscribe('broadcast_user')
        c.listen(self._event_result_router)
        SocketServer(application, ssl_options = ssl_options)

    def _event_result_router(self, result):
        '''
        Brukva Redis router for dispatching socket.io event results.

        Message body is a list containing:
          `session_id`: The session ID of the socket connection that sent the event.
          `msg_id`: The message sequence number for the event.
          `data`: The result data to be sent back to the client, a list consisting of error and response.
        '''
        err, message = result
        if err:
            logging.error('Event result error: %r', err)
        elif message.channel != 'event_result':
            return
        else:
            session_id, msg_id, error, response = json.loads(message.body)
            session = self._server._sessions._items.get(session_id)

            # The session may have disconnected.
            if not session: return

            # We don't currently use msg_endpoint, so ignore it for now.
            msg_endpoint = None
            if msg_id:
                if msg_id.endswith('+'):
                    msg_id = msg_id[:-1]

            session.send_message(proto.ack(msg_endpoint, msg_id, (error, response)))

And the celery task could look something like this:

@task
def process_event(session_id, message_id, user):
    ''' Simple celery task that returns the user's email address.'''
    c = brukva.Client()
    c.connect()
    result = json.dumps([
        session_id,
        message_id,
        None, # No error.
        user.email
    ])
    c.publish('event_result', result)