go-zeromq / zmq4

[WIP] Pure-Go implementation of ZeroMQ-4
BSD 3-Clause "New" or "Revised" License
333 stars 56 forks source link

No messages getting through XPUB/XSUB proxy #108

Open stitchinthyme opened 3 years ago

stitchinthyme commented 3 years ago

I wrote the following small test server, which creates XPUB and XSUB sockets, and a proxy to send all messages received on the XSUB port to the XPUB port:

package main

import (
    "context"
    "log"
    "time"
    zmq "github.com/go-zeromq/zmq4"
)

func main() {
    pub_conn_str := "tcp://*:9002"
    sub_conn_str := "tcp://*:9003"
    pub_sock := zmq.NewXPub(context.Background())
    sub_sock := zmq.NewXSub(context.Background())
    if err := pub_sock.Listen(pub_conn_str); err != nil {
        log.Fatalf("Error binding pub to %s: %v\n", pub_conn_str, err)
    }
    if err := sub_sock.Listen(sub_conn_str); err != nil {
        log.Fatalf("Error binding sub to %s: %v\n", sub_conn_str, err)
    }
    log.Println("Sleeping 5s...")
    time.Sleep(5*time.Second)
    log.Println("Starting proxy")
    p := zmq.NewProxy(context.Background(), sub_sock, pub_sock, nil) // send anything received on sub_sock to pub_sock
    p.Run()
}

And a test client which connects to both ports, sends a series of 10 messages to the pub port (which is the server's sub port), and listens for them on the sub port (the server's pub port):

package main

import (
    "context"
    "log"
    "strconv"
    "sync"
    "time"
    zmq "github.com/go-zeromq/zmq4"
)

func main() {
    var wg sync.WaitGroup
    sub_conn_str := "tcp://localhost:9002"  // server's pub socket is our sub socket
    pub_conn_str := "tcp://localhost:9003"  //  ...and vice versa
    pub_sock := zmq.NewPub(context.Background())
    sub_sock := zmq.NewSub(context.Background())
    if err := pub_sock.Dial(pub_conn_str); err != nil {
        log.Fatalf("Error connecting pub to %s: %v\n", pub_conn_str, err)
    }
    if err := sub_sock.Dial(sub_conn_str); err != nil {
        log.Fatalf("Error connecting sub to %s: %v\n", sub_conn_str, err)
    }
    if err := sub_sock.SetOption(zmq.OptionSubscribe, ""); err != nil { // subscribe to all messages
        log.Fatalf("Subscribe error: %v\n", err)
        return
    }
    log.Printf("Sleeping 5s...")
    time.Sleep(5*time.Second)

    wg.Add(1)
    go func() {  // receiver goroutine - wait for incoming messages
        defer wg.Done()
        log.Println("Receiver starting")
        for {
            if msg, err := sub_sock.Recv(); err != nil {
                log.Printf("Recv error: %v\n", err)
            } else {
                log.Printf("Received msg [%s] %s\n", msg.Frames[0], msg.Frames[1])
            }
        }
    }()

    // Just send numbers 0-9, 1s apart
    for i := 0; i < 10; i++ {
        msg := zmq.NewMsgFrom([]byte("TEST"), []byte(strconv.FormatInt(int64(i), 16)))
        log.Printf("Sending message %d\n", i)
        if err := pub_sock.Send(msg); err != nil {
            log.Printf("Send error: %v\n", err)
        }
        time.Sleep(1*time.Second)
    }
    wg.Wait()
}

When I run the server in one window and the client in another, the server binds the ports properly and the client seems to be connecting and sending the messages, but the receiver in the client never gets them and never prints anything. I'm expecting to see a series of "Sending message [0-9]" and "Received msg [TEST] [0-9]". I see the "Sending message" prints, but not the "Received" ones.

I'm using Go 1.13 on Ubuntu 18.04.

One more note: I have almost exactly this same code using https://github.com/pebbe/zmq4 (with adjustments for API differences like Bind() vs. Listen(), etc.) and it works fine.

junchuanwang commented 2 years ago

I actually think the bug is in XSUB.

.recv() will block until an EOF is sent.
stitchinthyme commented 2 years ago

Actually, it seems to be because there is no subscribe logic in the SUB/XSUB sockets. SUB does keep a map of subscribed topics, but it doesn't actually use it to screen the messages. Instead, that logic is in the PUB thread -- it won't send a message unless the PUB socket is subscribed to it, which is counter to how ZMQ is supposed to work -- you subscribe to topics with SUB/XSUB and the publisher shouldn't have to know or care what you're subscribed to.

sbinet commented 2 years ago

it's probably because I initially wrote this with a heavy inspiration from nanomsg/mango that has subscription logic (IIRC).

PRs welcomed.

stitchinthyme commented 2 years ago

I made a fix, but it hadn't finished building before it was time to knock off work for the day, so I'll test it fully on Monday.

woodyiorl commented 2 years ago

I made a fix, but it hadn't finished building before it was time to knock off work for the day, so I'll test it fully on Monday.

Any update, sir? @stitchinthyme

stitchinthyme commented 2 years ago

Any update, sir? @stitchinthyme

Just submitted the PR, which works with my test programs above.

P.S. Not a 'sir'. :-)

junchuanwang commented 2 years ago

@stitchinthyme My bad, lol! thank you!