pebbe / zmq4

A Go interface to ZeroMQ version 4
BSD 2-Clause "Simplified" License
1.18k stars 164 forks source link

Handling multiple SUB sockets ? #192

Closed iangregsondev closed 1 year ago

iangregsondev commented 1 year ago

Hi,

Can anybody lend a hand?

I have written a small test to receive messages from zmq, its working great, but currently, it's assuming that my subscribes all happen on the same Connection (endpoint)

What is needed to listen to multiple sockets (endpoints)

Here is my current setup, as you can see I am using

"tcp://127.0.0.1:29335"

I would like to connect to more than 1 endpoint as the messages maybe coming from multiple endpoints i.e.

"tcp://127.0.0.1:29335 "tcp://127.0.0.1:29336 "tcp://127.0.0.1:29337 "tcp://127.0.0.1:29338

    subs := subscription.NewSubscription([]subscription.Event{subscription.HashBlock, subscription.HashTx, subscription.RawBlock, subscription.RawTx})

    //  Prepare our socket
    socket, _ := zmq.NewSocket(zmq.SUB)

    // REP socket monitor, all events
    err := socket.Monitor("inproc://monitor.rep", zmq.EVENT_ALL)
    if err != nil {
        log.Fatalln(err)
    }
    go rep_socket_monitor("inproc://monitor.rep")

    defer socket.Close()
    socket.Connect("tcp://127.0.0.1:29335")

    for _, event := range subs.Events() {
        eventName := event.String()
        fmt.Println("Subscribing to : ", eventName)
        socket.SetSubscribe(eventName)
    }

and this is how I am listening, it does work because its on one endpoint.

    for {
        messages, _ := socket.RecvMessageBytes(0)

        if len(messages) == 0 {
            fmt.Println("I am continuing!!")
            continue
        }
        switch event := subscription.EventFromString(string(messages[0])); event {
        case subscription.RawBlock:
            var msgBlock wire.MsgBlock
            err := msgBlock.Deserialize(bytes.NewReader(messages[1]))
            if err != nil {
                fmt.Println("Something went wrong!")
            }

            fmt.Printf("msgblock %+v\n", msgBlock)

            fmt.Printf("sequence number %v\n\n", int32(binary.LittleEndian.Uint32(messages[2][:])))

        case subscription.RawTx:
            var msgTx wire.MsgTx
            err := msgTx.Deserialize(bytes.NewReader(messages[1]))
            if err != nil {
                fmt.Println("Something went wrong!")
            }

            fmt.Printf("msgTx %+v\n", msgTx)

            fmt.Printf("sequence number %v\n\n", int32(binary.LittleEndian.Uint32(messages[2][:])))

        case subscription.HashBlock:
            hexValue := hex.EncodeToString(messages[1])
            fmt.Printf("hasblock hex %+v\n", hexValue)

        case subscription.HashTx:
            hexValue := hex.EncodeToString(messages[1])
            fmt.Printf("hashtx hex %+v\n", hexValue)

        default:
            fmt.Println("Not supported! ", event)

        }

    }

I am unsure of the best way to accomplish this, I found some info about using Poller, but that confused me a little as I wasn't sure if I would lose messages.

I don't require to publish anything, i am only listening.

The quick and dirty way I thought of was creating multiple sockets and connections and multiple go routines to process the messages BUT I think that isn't the right solution ?

Thanks in advance.

ahmetson commented 1 year ago

@iangregsondev Did you read the ZeroMQ Guide? zmq.Poller or zmq.Reactor is the best way. Another approach that I am using is to use goroutine for each zmq.SUB, that's also working well.

iangregsondev commented 1 year ago

Thank you @ahmetson , I am going to take a look.