tonysimpson / nanomsg-python

nanomsg wrapper for python with multiple backends (CPython and ctypes) should support 2/3 and Pypy
MIT License
382 stars 85 forks source link

Support gevent and/or other event loops #40

Closed dhagrow closed 9 years ago

dhagrow commented 9 years ago

This is accomplished by pyzmq in the pyzmq.green module. Theoretically, an integration could be accomplished using nn_poll, however those docs make a reference to integrating with other event loops in the NOTE section, so a more efficient integration should be possible.

This is something I hope to look into if I can find the time, but perhaps someone else has a better lead on the subject.

dhagrow commented 9 years ago

A combination of nanomsg.Socket.send_fd and nanomsg.Socket.recv_fd, with gevent.socket.wait_write and gevent.socket.wait_read (respectively), may be all that's needed?

dhagrow commented 9 years ago

So, I wrongly assumed that because the pyzmq support for gevent was so involved, that it would be the case for nanomsg as well. The technique from the previous comment does appear to work well, however. Ideally there would be an example somewhere to show how this can be done, but this issue no longer seems necessary.

jvsteiner commented 8 years ago

any ideas on where I might be able to see an example of how this is done? I agree a simple example would be great... figured it out, simply put the following line (or similar) before the line with the call to read: gevent.socket.wait_read(sock.recv_fd)

jvsteiner commented 8 years ago

Actually, it ended up not working for me (yet). I get a condition where the wait_read call blocks a greenlet but then after it releases, the SUB socket.recv() still blocks. I always end up with an extra message in the recv queue. Same pattern works fine with os threads. simple example:

from gevent import monkey
import gevent
monkey.patch_all()
from nanomsg import Socket, PUB, SUB, SUB_SUBSCRIBE
def listen(sock):
    while True:
        print 'about to wait'
        gevent.socket.wait_read(sock.recv_fd)
        print 'done waiting'  # never executes
        print sock.recv()

pub = Socket(PUB)
pub.bind('ipc:///tmp/new.sock')
sub = Socket(SUB)
sub.connect('ipc:///tmp/new.sock')
sub.set_string_option(SUB, SUB_SUBSCRIBE, '')
gevent.spawn(listen, sub)
gevent.sleep(0)
pub.send('hi')  # greenlet doesn't receive
sub.recv()  # can get the msg, though

Calling gevent.socket.wait_read(sub.recv_fd) kind of "knocks it loose" and the greenlet will get the msg - could this be a bug? if so, in nanomsg or the python nanomsg wrapper?

dhagrow commented 8 years ago

I think your problem is just with the event loop. Try putting a gevent.sleep(1) after the send. That should ensure that the listen greenlet has time to run.

Otherwise, it should work as you're using it.

jvsteiner commented 8 years ago

I did this via interpreter prompt, and found that even if you leave off the last line sub.recv() and just wait, listen doesn't pick it up.

dhagrow commented 8 years ago

You can't just wait. The event loop is not just running in the background. You have to call gevent.sleep or gevent.wait.

jvsteiner commented 8 years ago

ok, seems like it's working in my toy example. back to the salt mines... Thanks for the help!

jvsteiner commented 8 years ago

for posterity, a working example of how one might implement this with a PUB/SUB pattern. The use of DONTWAIT is required to keep non-subscribed sock.recv() calls from blocking.

from gevent import monkey
import gevent
monkey.patch_all()
from nanomsg import Socket, PUB, SUB, SUB_SUBSCRIBE, DONTWAIT
def listen(sock):
    while True:
        gevent.socket.wait_read(sock.recv_fd)
        try:
            print sock.recv(flags=DONTWAIT)
        except Exception, e:
            pass
        gevent.sleep(0.1)

pub = Socket(PUB)
pub.bind('ipc:///tmp/new.sock')
sub = Socket(SUB)
sub.connect('ipc:///tmp/new.sock')
sub.set_string_option(SUB, SUB_SUBSCRIBE, 'h')
sub2 = Socket(SUB)
sub2.connect('ipc:///tmp/new.sock')
sub2.set_string_option(SUB, SUB_SUBSCRIBE, '')
gevent.spawn(listen, sub)
gevent.spawn(listen, sub2)
gevent.sleep()
pub.send('hi')
gevent.sleep(0.1)
pub.send('a different greeting for sub2')
gevent.sleep(0.1)