go-zeromq / zmq4

[WIP] Pure-Go implementation of ZeroMQ-4
BSD 3-Clause "New" or "Revised" License
333 stars 56 forks source link

SUB socket SetOption must come after Dial, goczmq/pebbe don't have such limitation #116

Closed cielavenir closed 2 years ago

cielavenir commented 2 years ago

in these 6 combinations,

./gozeromq.go --zmq=pebbe --dialfirst
./gozeromq.go --zmq=pebbe
./gozeromq.go --zmq=czmq --dialfirst
./gozeromq.go --zmq=czmq
./gozeromq.go --zmq=zmq --dialfirst
./gozeromq.go --zmq=zmq

./gozeromq.go --zmq=zmq blocks forever, which does not correspond to czmq/pebbe

//usr/bin/env GO111MODULE=off go run $0 $@;exit

package main
import (
    "context"
    "flag"
    "log"

    "github.com/zeromq/goczmq"
    "github.com/go-zeromq/zmq4"
    pebbe "github.com/pebbe/zmq4"
)

func main(){
    endpoint := "tcp://127.0.0.1:55555"
    zmq := flag.String("zmq", "zmq", "zmq type (czmq / zmq / pebbe)")
    fdialfirst := flag.Bool("dialfirst", false, "dial first")
    flag.Parse()
    if *zmq == "czmq" {
        s := goczmq.NewSock(goczmq.Sub)
        if *fdialfirst {
            err := s.Attach(endpoint, false)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
            s.SetOption(goczmq.SockSetSubscribe(""))
            log.Print("set option done")
        } else {
            s.SetOption(goczmq.SockSetSubscribe(""))
            log.Print("set option done")
            err := s.Attach(endpoint, false)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
        }
        reply, err := s.RecvMessage()
        if err != nil { log.Fatal(err) }
        log.Print("recv done")
        log.Print(string(reply[0]))
        s.Destroy()
    } else if *zmq == "zmq" {
        s := zmq4.NewSub(context.Background())
        defer s.Close()
        if *fdialfirst {
            err := s.Dial(endpoint)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
            err = s.SetOption(zmq4.OptionSubscribe, "")
            if err != nil { log.Fatal(err) }
            log.Print("set option done")
        } else {
            /// this code path does not work (recv blocks forever) ///
            err := s.SetOption(zmq4.OptionSubscribe, "")
            if err != nil { log.Fatal(err) }
            log.Print("set option done")
            err = s.Dial(endpoint)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
        }
        raw, err := s.Recv()
        if err != nil { log.Fatal(err) }
        log.Print("recv done")
        log.Print(raw.String())
    } else if *zmq == "pebbe" {
        // maybe pebbe can be refered as it is mentioned in https://zeromq.org/languages/go/
        s, err := pebbe.NewSocket(pebbe.SUB)
        defer s.Close()
        if *fdialfirst {
            err := s.Connect(endpoint)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
            err = s.SetSubscribe("")
            if err != nil { log.Fatal(err) }
            log.Print("set option done")
        } else {
            err := s.SetSubscribe("")
            if err != nil { log.Fatal(err) }
            log.Print("set option done")
            err = s.Connect(endpoint)
            if err != nil { log.Fatal(err) }
            log.Print("dial done")
        }
        reply, err := s.Recv(0)
        if err != nil { log.Fatal(err) }
        log.Print("recv done")
        log.Print(reply)
    }
}

/*
publisher:

#!/usr/bin/python
import zmq
import time
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:55555")

msgid = 0
while True:
    pub.send(str(msgid))
    msgid = (msgid+1)%100000
    time.sleep(0.1)
*/
thielepaul commented 2 years ago

fixed in #128