go-zeromq / zmq4

[WIP] Pure-Go implementation of ZeroMQ-4
BSD 3-Clause "New" or "Revised" License
344 stars 57 forks source link

github.com/pebbe/zmq2 (2.2.0) sub socket cannot connect to github.com/go-zeromq/zmq4 pub socket #133

Open Kimbsen opened 2 years ago

Kimbsen commented 2 years ago

As soon as the subscriber connects(subscribes) the pub socket server starts logging:

zmq4: could not open a ZMTP connection with "tcp://127.0.0.1:5000": zmq4: could not initialize ZMTP connection: zmq4: could not exchange greetings: zmq4: could not recv greeting: could not read ZMTP greeting: unexpected EOF

The subscriber does not report any error, but also receives no events

I think 2.2.0 should be compatible with zmq4, but i can't get it to work. http://wiki.zeromq.org/area:faq#toc9

Or am i just incorrectly assuming this should work? :)

testclient for github.com/pebbe/zmq2

package main

import (
    "log"
    "os"
    "time"

    zmq "github.com/pebbe/zmq2"
)

func main() {

    const addr = "tcp://localhost:5000"

    socket, err := zmq.NewSocket(zmq.SUB)
    if err != nil {
        log.Fatal(err)
    }
    if err := socket.Connect(addr); err != nil {
        log.Fatal(err)
    }

    if err := socket.SetSubscribe(os.Args[1]); err != nil {
        log.Fatal(err)
    }

    poller := zmq.NewPoller()
    poller.Add(socket, zmq.POLLIN)

    for {
        sockets, err := poller.Poll(2 * time.Second)
        if err != nil {
            log.Fatal(err)
        } else {
            for _, socket := range sockets {
                data, err := socket.Socket.RecvBytes(0)
                if err != nil {
                    log.Fatal(err)
                } else {
                    log.Println("GOT", string(data))
                }
            }
        }
    }
}

testserver for github.com/go-zeromq/zmq4

package main

import (
    "context"
    "log"
    "os"
    "time"

    "github.com/go-zeromq/zmq4"
)

func main() {

    const addr = "tcp://localhost:5000"

    pub := zmq4.NewPub(context.Background())
    defer pub.Close()

    if err := pub.Listen(addr); err != nil {
        log.Fatal(err)
    }

    event := func() zmq4.Msg {
        return zmq4.NewMsgFrom([]byte(os.Args[1]), []byte("RANDOM_STRING"))
    }
    for {
        select {
        case <-time.After(1 * time.Second):
            if err := pub.Send(event()); err != nil {
                log.Fatal(err)
            } else {
                log.Println("SENT event")
            }
        }
    }
}

while a slightly modified example can connect and receive events fine:

package main

import (
    "context"
    "log"
    "os"

    "github.com/go-zeromq/zmq4"
)

func main() {
    log.SetPrefix("psenvsub: ")

    //  Prepare our subscriber
    sub := zmq4.NewSub(context.Background())
    defer sub.Close()

    err := sub.Dial("tcp://localhost:5000")
    if err != nil {
        log.Fatalf("could not dial: %v", err)
    }

    err = sub.SetOption(zmq4.OptionSubscribe, os.Args[1])
    if err != nil {
        log.Fatalf("could not subscribe: %v", err)
    }

    for {
        // Read envelope
        msg, err := sub.Recv()
        if err != nil {
            log.Fatalf("could not receive message: %v", err)
        }
        log.Printf("[%s] %s\n", msg.Frames[0], msg.Frames[1])
    }
}
sbinet commented 2 years ago

yeah, go-zeromq/zmq4 hasn't handling of the old ZMTP protocol(s) baked in.

PRs welcomed :)

Kimbsen commented 2 years ago

Ah i see. Thanks for the quick reply!