cundi / blog

push issues as a blog
2 stars 0 forks source link

[翻译]使用Django和webscoket实现服务端推送事件 #4

Open cundi opened 9 years ago

cundi commented 9 years ago

原始链接:http://curella.org/blog/django-push-using-server-sent-events-and-websocket/

Django push: Using Server-Sent Events and WebSocket with Django

Django推送:利用服务端推送事件和WebSocket

The goal of this article is to explore and show how it's possible to implement Server-Sent Events and WebSocket with Django.

本篇文章的目标是去探究和展示如何使用Websocket和Django实现的可能性。

There are other implementations out there for frameworks that are designed specifically to work in event-based scenario (tornado, node.js), and are probably better suited for implementing these kind of services.

其他的基于事件场景而特别地设计的现成的框架(比如,tornado和node.js),或许更适合实现这些类型的服务。

The point of this article is not "you should use Django for that", but a more humble "here's how I made it work with Django".

本文所持的观点不是“嘿,你应该使用Django来完整这件事”,而是更为谦和的“您来瞧瞧,今天的工作,我是如何使用Django来完成的。”

The Scenario 应用场景

Suppose you have a website where users can import their contacts from other services. The importing is handled off-band by some other means (most likely, a celery task), and you want to show your users a notification box when the job is done.

假设你拥有一个可以让用户从其他服务倒入联系人的网站。某种意味上来说,导入由老牌的处理应用来完成(最有可能的是一个celery任务),而且你想在任务完成后提醒用户。

There are currently a few alternative technologies for pushing events to the browser: Server-Sent Events (SSE) and WebSocket.

当下,有一些推送事到浏览器的可供选择的技术:

服务器发送事件(SSE)和WebSocket。 

SSEs are a simpler protocol and are easier to implement, but they provide communication only in one direction, from the server to the browser. WebSocket provides instead a bidirectional channel.

For a simple notification scenario like the above, SSEs provide just what we want, at the expenses of one long-running connection per user.

对于像上面提到的简单提醒场景,SSE刚好满足我们的需求,为每个用户都耗费一个“长连接”。

We will use redis and its PubSub functionality as a broker between the celery task and Django's web process.

我们使用redis和他的PubSub功能作为celery任务和Django的web进程之间的一个中断器。

The final code of this article is available as a repository on GitHub.

本文的冻结版代码放在了Github仓库。

Architecture 架构

Celery Task -> Redis -> Django -> Browser

Running gunicorn 运行gunicorn

Both technologies require the server to keep the connection open indefinitely.

上述两种技术都要求服务器无限期的保持链接开放。

If we'd run Django under mod_wsgi or the regular dev server, the request-response cycle will be blocked by those always-open requests.

如果我们想要在mod_wsgi或者常规的开发服务器之下运行Django,那么请求响应循环将被这些“长开放”请求所阻塞。

The solution is to use gevent. I found that the simplest way to use it is to run Django under gunicorn.

解决的办法是使用 gevent。我发现了一个在gunicorn下面使用它运行Django的最简单的办法。

Install gunicorn:
安装gunicorn:

$ pip install gunicorn

Add gunicorn to your INSTALLED_APPS:
把gunicorn添加到 INSTALLED_APPS

INSTALLED_APPS = (
    ...,
    'myapp',
    'gunicorn',
)

Then, I created a config file for gunicorn at config/gunicorn.

然后,为gunicorn在新建一个配置文件config/gunicorn

#!python
from os import environ
from gevent import monkey
monkey.patch_all()

bind = "0.0.0.0:8000"
workers = 1 # fine for dev, you probably want to increase this number in production
worker_class = "gunicorn.workers.ggevent.GeventWorker"

You can start the server with:
你可以用下面的命令启动服务器:

$ gunicorn_django -c config/gunicorn

For more info on Django on gunicorn see Django's docs on How to use Django with Gunicorn.

更多关于gunicorn上运行Django的内容,参见Django的的官方文档,Gunicorn如何运行Django。

Server-Sent Events 服务器推送事件

The browser will issue a GET request to the url /sse/ (this path is completely arbitrary). The server will respond with a stream of data, without ever closing the connection.

浏览器处理到url/sse/(该路径是完全任意的)到GET请求。服务器将响应一个数据流,而不用一直关闭连接。

The easiest way to implement SSEs is to use the django-sse package, available on PyPi.
最为简单的实现SSE的方法是利用PyPi上面可用的django-sse包。

$ pip install sse django-sse

If you want to publish via redis, django-sse requires you to specify how to connect:
假如你想要通过redis来发布,那么你要为django-sse指定具体的连接配置:

settings.py:

REDIS_SSEQUEUE_CONNECTION_SETTINGS = {
    'location': 'localhost:6379',
    'db': 0,
}

django_sse provides a ready-to-use view that uses redis as message broker.
django-sse提供了一个开箱即用的使用redis作为消息代理器的视图。

myapp/views.py:

from django.views.generic import TemplateView
from django_sse.redisqueue import RedisQueueView

class HomePage(TemplateView):
    template_name = 'index.html'

class SSE(RedisQueueView):
    pass

Hook the views up in your urls.py:
在url.py中将视图链接上:

from django.conf.urls import patterns, include, url
from myapp import views

urlpatterns = patterns('',
    url(r'^sse/$', views.SSE.as_view(), name='sse'),  # this URL is arbitrary.
    url(r'^$', views.HomePage.as_view(), name='homepage'),
)

IE Polyfill

Not every browser supports SSEs (most notably, internet Explorer).

For unsupported browser, we can include a JavaScript polyfill in our page. There are many polyfills available out there, but I've choose to use eventsource.js because it's close to the original API and it looks actively maintained.

After including the polyfill in our HTML we can set up our callback functions on DOMReady. Here I've also uses jQuery for simplicity.

<!doctype html>
<html>
<head>
  <meta charset="utf-8">
  <title>My App</title>

</head>
<body>
  <script src="//ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script>
  <script>window.jQuery || document.write('<script src="{{ STATIC_URL }}js/libs/jquery-1.7.1.min.js"><\/script>')</script>
  <script src="{{ STATIC_URL }}js/libs/eventsource.js"></script>

<script>
$().ready(function() {
  var source = new EventSource('/sse/'); // of course this must match the endpoint in your urlconf

  function log() {
    console.log(arguments);
  }

  source.onopen = function() {
    console.log(arguments);
  };

  source.onerror = function () {
    console.log(arguments);
  };

  source.addEventListener('connections', log, false);
  source.addEventListener('requests', log, false);
  source.addEventListener('myevent', function(e) {
    data = JSON.parse(e.data);
    // .. do something..
  }, false);
  source.addEventListener('uptime', log, false);

  source.onmessage = function() {
    console.log(arguments);
  };
});

</script>

</body>
</html>

Publishing events 推送事件

django_sse provides a convenience method to publish messages to RedisQueueViewsubclasses:
django-sse提供了一个便利的方法来推送消息到RedisQueueView到子类:

imoprt json
from django_sse.redisqueue import send_event

send_event('myevent', json.dumps(data))

Note that send_event allows only text values to be published. That's why we are serializing the data to json, and we unserialize it in the event handler with JSON.parse.

注意send_event仅允许推送文本格式的值。这也是我们序列化数据到json的原因,然后我们

By default, django_sse publishes and listens to the redis channel see. If we want to separate messages per user, we can define the get_redis_channel method on the view:

class SSE(RedisQueueView):
    def get_redis_channel(self):
        return "sse_%s" % self.request.user.username

When we want to publish some event to a specific user, all we have to do is to specify the channel when calling send_event:

send_event('myevent', json.dumps(data), 'sse_%s' % user.username)

WebSocket

Now, suppose you want to notify user A when user B does some kind action.

You could still use SSEs, but every time the scenario happens, you'll end up with three connections: two long-running ones opened by A and B listening for SSEs, and a short one fired by B when POSTing his action.

Since you're already having long-running connections because you need to push events, you may just switch to WebSockets and save that POST.

Since WebSocket is not yet supported by Explorer, we'll have to use an abstraction layer, like socket.io or socks.js, that provide alternative transports of messages.

I choose to use socket.io mainly because of the gevent-socketio library, which integrates pretty easily with Django.

Using the socketio worker

In order to run gevent-socketio, we have to run gunicorn with a specialized worker class.

The GeventSocketIOWorker will take care of implementing the socket.io handshake and the new WebSocket Protocol (ws://)

In order to use GeventSocketIOWorker, I modified the worker_class parameter in the config file for unicorn:

#!python
from os import environ
from gevent import monkey
monkey.patch_all()

bind = "0.0.0.0:8000"
workers = 1
worker_class = "socketio.sgunicorn.GeventSocketIOWorker" # Note that we are now using gevent-socketio's worker

Note that using the socketio.sgunicorn.GeventSocketIOWorker is compatible with SSEs, so you could use this worker if you want both protocols running.

gevent-socketio allows you to define different Socket.io namespaces. This way you can implement different domain-specific logics. For example, you could implement a namespace for users' status (online, away, etc.) and a different chat messages.

Additionally, gevent-socketio ships with a couple of namespaces mixing for common situations, like for implementing separate chat rooms.

Let's create a namespace. Our namespace will provide separate chat-rooms, and will process events from our redis queue.

I had to override the emit_to_room method because I had the messages delivered more than once when I had more clients connected than the available workers.

myapp/sockets.py:

from socketio.namespace import BaseNamespace
from socketio.sdjango import namespace
from socketio.mixins import RoomsMixin
from myapp.utils import redis_connection
import json

@namespace('')
class MyNamespace(BaseNamespace, RoomsMixin):
    def listener(self, room):
        # ``redis_connection()`` is an utility function that returns a redis connection from a pool
        r = redis_connection().pubsub()
        r.subscribe('socketio_%s' % room)

        for m in r.listen():
            if m['type'] == 'message':
                data = json.loads(m['data'])
                self.process_event(data)

    def on_subscribe(self, *args):
        for channel in args:
            self.join(channel)

    def join(self, room):
        super(MyNamespace, self).join(room)
        self.spawn(self.listener, room)
        self.emit('joined', room)

    def on_myevent(self, room, *args):
        self.emit_to_room(room, 'myevent', *args)

    def emit_to_room(self, room, event, *args):
        """
        This is almost the same as ``.emit_to_room()`` on the parent class,
        but it sends events only over the current socket.

        This is to avoid a problem when there are more client than workers, and
        a single message can get delivered multiple times.
        """
        pkt = dict(type="event",
                   name=event,
                   args=args,
                   endpoint=self.ns_name)
        room_name = self._get_room_name(room)

        if 'rooms' not in self.socket.session:
            return
        if room_name in self.socket.session['rooms']:
            self.socket.send_packet(pkt)

Note that the join method we spawn a listener (and thus, a new redis connection) for every room we join. That's the way it's implemented in the chat example at the gevent-socketio repository.

If you're worried about having to spawn one process per client per channel, I've included an alternative subclass in the repo that restarts the listener when joining channel. The catch is that there will be a few milliseconds during which the user won't receive message.

I'm also using a pool to recycle Redis connection, The redis_connection method creates a new redis object for our already existing connection pool:

utils.py:

from django.conf import settings
from redis import Redis
from redis import ConnectionPool as RedisConnectionPool
from redis.connection import Connection

WEBSOCKET_REDIS_BROKER_DEFAULT = {
    'HOST': 'localhost',
    'PORT': 6379,
    'DB': 0
}

CONNECTION_KWARGS = getattr(settings, 'WEBSOCKET_REDIS_BROKER', {})

class ConnectionPoolManager(object):
    """
    A singleton that contains and retrieves redis ``ConnectionPool``s according to the connection settings.
    """
    pools = {}

    @classmethod
    def key_for_kwargs(cls, kwargs):
        return ":".join([str(v) for v in kwargs.values()])

    @classmethod
    def connection_pool(cls, **kwargs):
        pool_key = cls.key_for_kwargs(kwargs)
        if pool_key in cls.pools:
            return cls.pools[pool_key]

        params = {
            'connection_class': Connection,
            'db': kwargs.get('DB', 0),
            'password': kwargs.get('PASSWORD', None),
            'host': kwargs.get('HOST', 'localhost'),
            'port': int(kwargs.get('PORT', 6379))
        }

        cls.pools[pool_key] = RedisConnectionPool(**params)
        return cls.pools[pool_key]

def redis_connection():
    """
    Returns a redis connection from one of our pools.
    """
    pool = ConnectionPoolManager.connection_pool(**CONNECTION_KWARGS)
    return Redis(connection_pool=pool)

For serving our namespaces, gevent-socketio gives us an autodiscovery feature similar to Django's admin:

urls.py:

from django.conf.urls import patterns, include, url
from myapp import views

import socketio.sdjango

socketio.sdjango.autodiscover()

urlpatterns = patterns('',
    url(r'^sse/$', views.SSE.as_view(), name='sse'),  # this URL is arbitrary.
    # socket.io uses the well-known URL `/socket.io/` for its protocol
    url(r"^socket\.io", include(socketio.sdjango.urls)),
    url(r'^$', views.HomePage.as_view(), name='homepage'),
)

On the client side, we need to include the socket.io JavaScript client, (available at https://github.com/LearnBoost/socket.io-client/).

By default, the client will try to use flashsockets under Internet Explorer (because Explorer doesn't support WebSocket).

The problem with flashsocket is that the Flash shipped with socketio-client makes a request for a policy file, and you'd need to set up a Flash policy server. So I decided to disable this transport and have IE use xhr-polling.

socket = io.connect('', {  // first argument is the namespace
  transports: ['websocket', 'xhr-multipart', 'xhr-polling', 'jsonp-polling']  // note ``flashsockets`` is missing
});

socket.on("myevent", function(e) {
  console.log("<myevent> event", arguments);
});
socket.on("message", function(e) {
  console.log("Message", e);
});

socket.on("joined", function(e) {
  console.log("joined", arguments);
});

socket.on("connect", function(e) {
  console.log("Connected", arguments);
  socket.emit('subscribe', 'default_room');
});

socket.on("disconnect", function(e) {
  console.log("Disconnected", arguments);
});

Publishing an event

All we have to do in order to emit an event to our client is pushing a message to the right redis channel.

utils.py:

# previous code here ...

import json

def emit_to_channel(channel, event, *data):
    r = redis_connection()
    args = [channel] + list(data)
    r.publish('socketio_%s' % channel, json.dumps({'name': event, 'args': args}))

Links & Acknowledgements

I would like to thank Jeff Triplett for the initial feedback on this article, Cody Soyland for his initial article about socket.io and gevent, Andrei Antoukh for accepting my patches for django_sse, and Jeffrey Gelens for accepting my patch for gevent-websocket.

If you want to read more, here's some links:

http://codysoyland.com/2011/feb/6/evented-django-part-one-socketio-and-gevent/ http://eflorenzano.com/blog/2011/02/16/technology-behind-convore/ http://www.gevent.org/ http://gunicorn.org/ https://bitbucket.org/Jeffrey/gevent-websocket/src http://gevent-socketio.readthedocs.org/en/latest/index.html http://www.w3.org/TR/eventsource/

machinefixer commented 9 years ago

实际上 message broker 意思是“消息代理”,相当于代理人,翻译成“中断器”是不恰当的

cundi commented 9 years ago

@machinefixer 谢谢指出错误,是我想当然了。