nanomsg / nnpy

cffi-based Python bindings for nanomsg
MIT License
117 stars 39 forks source link

Add support for asyncio #26

Closed wrobell closed 8 years ago

wrobell commented 8 years ago

It would be great to support asyncio.

Asyncio socket class implementation (depends on #25) with sample usage below. This is bit hacky implementation

Should nnpy support asyncio? Is API based on SocketAsync class OK?

import asyncio
import nnpy
from nnpy.constants import SNDFD, RCVFD, SOL_SOCKET
from nnpy import nanomsg, ffi

class SocketAsync(nnpy.Socket):
    def __init__(self, domain, protocol, loop=None):
        super().__init__(domain, protocol)
        if loop == None:
            loop = asyncio.get_event_loop()
        self._loop = loop

        self._reader = asyncio.Queue(loop=loop)
        self._fd_reader = None
        self._read_flags = 0

        self._writer = asyncio.Event(loop=loop)
        self._fd_writer = None
        self._write_flags = 0

    async def recv(self, flags=0):
        self._read_flags = flags
        self._enable_reader()
        value = await self._reader.get()
        return value

    async def send(self, data, flags=0):
        self._data = data
        self._write_flags = flags
        self._enable_writer()
        await self._writer.wait()

    def _notify_recv(self):
        self._loop.remove_reader(self._fd_reader)

        if self.poll(1):
            result = super().recv(self._read_flags)                                                                    
            self._reader.put_nowait(result)
        else:
            self._loop.call_later(0.5, self._enable_reader)                                                            

    def _notify_send(self):
        self._loop.remove_writer(self._fd_writer)

        if self.poll(2):
            super().send(self._data, self._write_flags)
            self._writer.set()                                                                                         
            self._writer.clear()
        else:
            self._loop.call_later(0.5, self._enable_writer)

    def _enable_reader(self):
        self._fd_reader = self.getsockopt(SOL_SOCKET, RCVFD)                                                           
        self._loop.add_reader(self._fd_reader, self._notify_recv) 

    def _enable_writer(self):                                                                                          
        self._fd_writer = self.getsockopt(SOL_SOCKET, SNDFD)                                                           
        self._loop.add_writer(self._fd_writer, self._notify_send)   

async def reader(sub):
    while True:                                                                                                        
        print('receiving...')
        value = await sub.recv()
        print('received:', value)
        await asyncio.sleep(1)

async def writer(pub):
    i = 0
    while True:
        print('sending...')
        value = '{:04}'.format(i)
        await pub.send(value)
        print('sent', value)
        i += 1
        await asyncio.sleep(0.5)

loop = asyncio.get_event_loop()

sub = SocketAsync(nnpy.AF_SP, nnpy.PULL)
sub.bind('tcp://*:5555')
pub = SocketAsync(nnpy.AF_SP, nnpy.PUSH)
pub.connect('tcp://localhost:5555')

loop.run_until_complete(asyncio.gather(writer(pub), reader(sub)))
djc commented 8 years ago

This is pretty much an entirely different Socket implementation, right? As such, maybe it makes more sense to put this in a separate package and rely on the CFFI bindings from nnpy if that helps. Since I don't have much expertise in asyncio, nor the interest to learn it right now, I don't really want to maintain code like this.

wrobell commented 8 years ago

I would not call it entirely different. :) It just adds asynchronous coroutines to the API. You could have SocketAsync class like above or alternatively Socket.recv_async and Socket.send_async coroutines.

Looking at pyzmq, they provide zmq.aiosync module, but there is also separate library aiozmq.

If you need concurrency, how would you use nnpy? Is the API thread-safe?

djc commented 8 years ago

I think the API is as thread-safe as the nanomsg one. For more details on that, you'd probably have to dive into nanomsg documentation, and/or ask their community.

Another problem with merging asyncio support is that nnpy supports both Python 2 and Python 3 for now, and as I understand it the code you propose only works on Python 3. The other question is, how hard is it to build asyncio support around a Socket, rather than into it. As Socket is a very thin layer, it doesn't seem so hard to build your async primitives on top of it.

wrobell commented 8 years ago

OK. See https://github.com/wrobell/aionn.

djc commented 8 years ago

Cool. Once you think it's in a good enough shape, maybe put up a pull request to link to it from nnpy's README?

wrobell commented 8 years ago

Should it be done with addition to README file like

Related Projects
----------------
- `nanomsg website <http://nanomsg.org/>`_
- `aionn - asyncio messaging library based on nanomsg and nnpy <https://github.com/wrobell/aionn>`_

?

djc commented 8 years ago

Yup, something like that seems good!