nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
868 stars 177 forks source link

Socketio asyncio NATS manage can't get subscribe message #53

Closed Negashev closed 6 years ago

Negashev commented 6 years ago

I write client manager for python-socketio on NATS, following the example of a redis manager. This manager send publish but not get subscribe messages, what am I doing wrong?

NATS (docker latest) Python (docker python:alpine)

import asyncio
import pickle

try:
    import nats
except ImportError:
    nats = None

from socketio.asyncio_pubsub_manager import AsyncPubSubManager

class AsyncNatsManager(AsyncPubSubManager):  # pragma: no cover
    """NATS based client manager for asyncio servers.

    This class implements a NATS backend for event sharing across multiple
    processes. Only kept here as one more example of how to build a custom
    backend, since the kombu backend is perfectly adequate to support a NATS
    message queue.

    To use a NATS backend, initialize the :class:`Server` instance as
    follows::

        server = socketio.Server(client_manager=socketio.AsyncNatsManager(
            'nats://hostname:port'))

    :param url: The connection URL for the NATS server. For a default NATS
                store running on the same host, use ``nats://``.
    :param channel: The channel name on which the server sends and receives
                    notifications. Must be the same in all the servers.
    :param write_only: If set ot ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving.
    """
    name = 'asyncionats'

    def __init__(self, servers=None, channel='socketio',
                 write_only=False):
        if servers is None:
            servers = ["nats://nats:4222"]
        if nats is None:
            raise RuntimeError('NATS package is not installed '
                               '(Run "pip install asyncio-nats-client" in your '
                               'virtualenv).')
        self.servers = servers
        self.queue = asyncio.Queue()
        self.producer = None
        self.consumer = None
        self.sid = None
        super().__init__(channel=channel, write_only=write_only)

    async def _publish(self, data):
        if self.producer is None:
            self.producer = await nats.connect(servers=self.servers)
        return await self.producer.publish(self.channel, pickle.dumps(data))

    async def _listen(self):
        print(self)
        if self.consumer is None:
            self.consumer = await nats.connect(servers=self.servers)
            self.sid = await self.consumer.subscribe('socketio', cb=self.message_handler)
        return await self.queue.get()

    async def message_handler(self, msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print(f"Received a message on '{subject} {reply}': {data}")
        await self.queue.put(data)
Negashev commented 6 years ago

@wallyqs Any news?

prateek2408 commented 6 years ago

@Negashev - Any progress on this. I was also thinking to replace rabbitmq with NATS, in my case as well the data is serialized via pickle before it is finally send to the messaging server.

Is it possible or is it stuck because of some bug ?

wallyqs commented 6 years ago

I will check this one today.

Negashev commented 6 years ago

@wallyqs Any news?

wallyqs commented 6 years ago

OK some updates here and have now a wip example of python socket.io + sanic + asyncio working together more or less: https://gist.github.com/wallyqs/c8d16cf2fa5aa2e4f4e49b714f96575d Not very familiar with the python socket.io framework so feedback welcome, but so far it does not look like there are any blockers in using these tools together, (would be nice to have an implementation of the class AsyncNatsManager(AsyncPubSubManager): and a more complete example though).

import asyncio

from sanic import Sanic
from sanic.response import html

import socketio

# --- NATS PubSub Manager ---

import nats
from nats.aio.client import Client as NATS
from socketio.asyncio_pubsub_manager import AsyncPubSubManager

import json

class AsyncNatsManager(AsyncPubSubManager):
    name = 'asyncionats'

    def __init__(self,
                 servers=None,
                 channel='socketio',
                 write_only=False,
                 loop=asyncio.get_event_loop(),
                 ):

        if servers is None:
            servers = ["nats://127.0.0.1:4222"]
        self.servers = servers
        self.queue = asyncio.Queue()

        # Establish single connection to NATS for the client.
        self.nc = None
        super().__init__(channel=channel, write_only=write_only)

    async def _publish(self, data):
        print("Socket.io <<- NATS     :", data)

        # Send the client events through NATS
        if self.nc is None:
            self.nc = NATS()
            await self.nc.connect(servers=self.servers)

        # Skip broadcasted messages that were received from NATS.
        if data['event'] != 'event':
            payload = json.dumps(data['data']).encode()
            await self.nc.publish("socketio.{}".format(data['event']), payload)

    async def _listen(self):
        if self.nc is None:
            self.nc = NATS()
            await self.nc.connect(servers=self.servers)

            # Close over the socketio to be able to emit within
            # the NATS callback.
            sio = self
            async def message_handler(msg):
                nonlocal sio

                print("NATS      ->> Socket.io:", msg.data.decode())

                data = json.loads(msg.data.decode())

                # Broadcast the bare message received via NATS as a Socket.io event
                await sio.emit('nats', data, namespace='/test')

                await self.queue.put(data)
            await self.nc.subscribe(self.channel, cb=message_handler)
        return await self.queue.get()

# --- Sanic + Socket.io based Application with attached PubSub Manager ---

app = Sanic()
mgr = AsyncNatsManager()
sio = socketio.AsyncServer(client_manager=mgr, async_mode='sanic')
sio.attach(app)

@app.route('/')
async def index(request):
    with open('app.html') as f:
        return html(f.read())

@sio.on('event', namespace='/test')
async def test_message(sid, message):
    await sio.emit('response', {'data': message['data']}, room=sid,
                   namespace='/test')

@sio.on('nats', namespace='/test')
async def test_nats_message(sid, message):
    print("NATS msg!!!!!!!!", message, sid)
    await sio.emit('response', {'data': message['data']}, room=sid,
                   namespace='/test')

@sio.on('connect', namespace='/test')
async def test_connect(sid, environ):
    print("Client connected", sid)
    await sio.emit('response', {'data': 'Connected', 'count': 0}, room=sid,
                   namespace='/test')

@sio.on('disconnect', namespace='/test')
def test_disconnect(sid):
    print('Client disconnected')

app.static('/static', './static')

if __name__ == '__main__':
    app.run()
<!DOCTYPE HTML>
<html>
<head>
    <title>NATS &plus; SocketIO </title>
    <script type="text/javascript" src="//code.jquery.com/jquery-2.1.4.min.js"></script>
    <script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.5/socket.io.min.js"></script>
    <script type="text/javascript" charset="utf-8">
        $(document).ready(function(){
            namespace = '/test';
            var socket = io.connect('http://' + document.domain + ':' + location.port + namespace);
            socket.on('connect', function() {
               $('#log').append('<br />Connected!');
               socket.emit('event', {data: 'Connection started...'});
            });
            socket.on('disconnect', function() {
               socket.emit('event', {data: 'Disconnecting...'});
               $('#log').append('<br />Disconnected!');
            });
            socket.on('response', function(msg) {
                $('#log').append('<br />Received: ' + msg.data);
            });
            socket.on('nats', function(msg) {
                console.log("NATS???");
                $('#log').append('<br />NATS: ' + msg.data);
            });
            // event handler for server sent data
            $('form#emit').submit(function(event) {
                socket.emit('event', {data: $('#emit_data').val()});
                return false;
            });
        });
    </script>
</head>
<body>
    <h1>NATS &plus; SocketIO</h1>
    <h2>Publish to NATS</h2>
    <form id="emit" method="POST" action='#'>
        <input type="text" name="emit_data" id="emit_data" placeholder="Message">
        <input type="submit" value="Publish">
    </form>
    <h2>Receive:</h2>
    <div><p id="log"></p></div>
</body>
</html>
wallyqs commented 6 years ago

Closing for now since no blockers, would be good to have AsyncPubSubManager implementation for NATS but that may be done on a new repo.