dabeaz / curio

Good Curio!
Other
4.02k stars 241 forks source link

ZMQ PUB/SUB Support ? #205

Closed crubier closed 7 years ago

crubier commented 7 years ago

Hi, and thank you for this amazing library !

I have a question though, is ZMQ PUB/SUB supported ? I tried to make a simple PUB/SUB system and it does not work. It might be caused by a rookie mistake on my side, but I can't find the problem in my code. Here is the simple yet problematic piece of code:

async def publisher(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind(address)
    for n in range(100):
        print('Not waiting anyone')
        await sock.send(b'Message %d' % n)
        print('Sent {}'.format(n))
    await sock.send(b'exit')

async def subscriber(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.connect(address)
    while True:
        print('Waiting publisher')
        msg = await sock.recv()
        print('Got:', msg)
        if msg == b'exit':
            break

Then I start subscriber in a process, followed by publisher in another process. It seems that the subscriber never gets messages...

Subsbriber trace:

Waiting publisher

Publisher trace:

Not waiting anyone
Sent 0
Not waiting anyone
Sent 1
Not waiting anyone
Sent 2
Not waiting anyone
Sent 3
Not waiting anyone
Sent 4

I tried the PUSH/PULL example and it works perfectly

dabeaz commented 7 years ago

For a subscriber socket, I think there's an extra step involved in setting some sort of socket option. Let me look... yes, you do something like this (in the client) after you connect:

sock.setsockopt(zmq.SUBSCRIBE, topicfilter)

I think topicfilter is like a string prefix (might be a byte-string).

jkbbwr commented 7 years ago

Its a byte-string unless you use setstringsockopt

crubier commented 7 years ago

Thank you and sorry, I forgot to mention that I already tried the example from pyzmq, to no avail.

Here is the code, which yields the same result as above:

async def publisher(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind(address)
    for n in range(100):
        print('Not waiting anyone')
        await sock.send(b'Message %d' % n)
        print('Sent {}'.format(n))
    await sock.send(b'Message Exit')

async def subscriber(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.connect(address)
    sock.setsockopt(zmq.SUBSCRIBE, b'Message')
    while True:
        print('Waiting publisher')
        msg = await sock.recv()
        print('Got:', msg)
        if msg == b'Exit':
            break

Also, pyzmq states that "The current version of zmq supports filtering of messages based on topics at subscriber side. This is usually set via socketoption.", which means It should work fine without it too. Right now, both don't work, in my examples...

dabeaz commented 7 years ago

The sample code, as written tends to publish messages pretty fast and then exit. If I slow it down by putting a sleep in it like this, it works:

from curio import *
from curio import zmq

async def publisher(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind(address)
    for n in range(100):
        print('Not waiting anyone')
        await sock.send(b'Message %d' % n)
        print('Sent {}'.format(n))
        await sleep(1)
    await sock.send(b'Message Exit')

zmq.run(publisher, 'tcp://*:40000')

Seems likely to me that the subscriber never successfully hooks up with the publisher before it's done publishing the 100 messages (simply because it occurs so quickly).

crubier commented 7 years ago

Perfect, you are right, I also tried to port the basic PUB/SUB example from pyzmq, and it works !

pub_server.py:

import curio.zmq as zmq
import random
import sys
import time

async def main():
    port = "5556"
    if len(sys.argv) > 1:
        port = sys.argv[1]
        int(port)

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)

    while True:
        topic = random.randrange(9999,10005)
        messagedata = random.randrange(1,215) - 80
        print("%d %d" % (topic, messagedata))
        await socket.send_string("%d %d" % (topic, messagedata))
        time.sleep(1)

zmq.run(main)

sub_client.py:

import sys
import curio.zmq as zmq

async def main():
    port = "5556"
    if len(sys.argv) > 1:
        port =  sys.argv[1]
        int(port)

    if len(sys.argv) > 2:
        port1 =  sys.argv[2]
        int(port1)

    # Socket to talk to server
    context = zmq.Context()
    socket = context.socket(zmq.SUB)

    print ("Collecting updates from weather server...")
    socket.connect ("tcp://localhost:%s" % port)

    if len(sys.argv) > 2:
        socket.connect ("tcp://localhost:%s" % port1)

    # Subscribe to zipcode, default is NYC, 10001
    topicfilter = "10001"
    socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)

    # Process 5 updates
    total_value = 0
    for update_nbr in range (5):
        string = await socket.recv_string()
        topic, messagedata = string.split()
        total_value += int(messagedata)
        print (topic, messagedata)

    print ("Average messagedata value for topic '%s' was %dF" % (topicfilter, total_value / update_nbr))

zmq.run(main)