pebbe / zmq4

A Go interface to ZeroMQ version 4
BSD 2-Clause "Simplified" License
1.17k stars 163 forks source link

ZeroMQ socket not receiving all TCP data sent to it while in a goroutine #149

Closed mikerubbertoe closed 5 years ago

mikerubbertoe commented 5 years ago

I am setting up a socket in Golang to send and receive data to and from an API which uses ZeroMQ for communication. I need to use goroutines. I understand that ZMQ sockets are extremely not thread safe and to make sure they function properly, all sending and receiving should be done in the same thread.

I can receive all data from all API requests except for one piece of data from one specific API call, a heartbeat that we need to respond to in order to keep the connection alive.

In this API, there are multiple requests that can be made that can result in data being sent back as well as a heartbeat request. Every other API request will send back the data, wait around 5 seconds, then send the heartbeat request needed to keep the connection alive. When the code is run using a single thread, we can receive the heartbeat from every API call. However, when I run that same function through a goroutine, I get back the heartbeat request from every API function EXCEPT for one of them.

Setting up the socket

import(
    "fmt"
    "os"
    zmq "github.com/pebbe/zmq4"
)

func testCode() {

    context, err := zmq.NewContext()

    if err != nil {
        fmt.Println("context err: ", err)
        return
    }

    //create Request Socket to talk to Request Server
    fmt.Fprint(os.Stdout,"Attempting to connect to Request Server...")
    sock, err := context.NewSocket(zmq.DEALER)
    if err != nil {
        fmt.Println("socket err: ", err)
        return
    }

    connection.RequestSocket = sock

    err = connection.RequestSocket.Connect(REQUEST_URL)
    if err != nil {
        fmt.Println("connect err: ", err)
        return
    }
}

//build the data to be sent up here

    ...

    _, err := socket.SendBytes(bufToSend, 0)
    if err != nil {
        fmt.Println("send err: ", err)
        return
    }

//receive all necessary data from the server
    for mask & info == 0 { 
        reply, err := socket.RecvBytes(0)
        if err != nil {
            fmt.Println("err: ", err)
            return
        }
        //process the data
        ...
}

So when I run this main function in a different file, we receive all the correct data and heartbeat requests from every API request

func main() {
    testCode()
}

However, when I make testCode() a goroutine (as shown below), I fail to receive a heartbeat request for only one of the API calls

func main() {
    go testCode()
}

What is also interesting is that I monitored all TCP coming in and out of the port we talk to via wireshark and have confirmed that the heartbeat requests are being sent by the API and. So the heartbeat request is being sent, the data is being recognized by Wireshark as a message coming back from the API but I am not receiving it through the socket for one specific API call. Every other API call sends the heartbeat request and I can see in my program that I received them.

I have made sure that all creation, sending, and receiving for the socket has been done within one thread. I am confused as to why this problem is happening and why it only occurs for only one of the API calls and not all of them.

pebbe commented 5 years ago

Have you tried runtime.LockOSThread() at the start of testCode() ?

mikerubbertoe commented 5 years ago

Yes, I have and the same results occur. I should have updated my original post because I made an assumption which I found out later was false. When testCode() is run, it does not receive all the messages that it should when running on a goroutine. It receives about 18500 messages when it should be receiving 36000 messages.

When run on a single thread it receives all 36000 messages but when run in a goroutine, it only receives half the amount. I also put in a wrapper method to see if the goroutine crashes for some reason

func WrapperGo(myFunc func()) {
    go func() {
        defer func() {
            if recoverMe := recover(); recoverMe != nil {
                err, _ := recoverMe.(error)
                myErr := errors.Wrap(err, 2)
                fmt.Println("stack", myErr.StackFrames(),"Panic.")
            }
        }()
        myFunc()
    }()
}

The goroutine does not crash/panic, it just stops receiving messages.

pebbe commented 5 years ago

It's not simply that main exits before testCode() is finished?

I can't really see what is going on based on some fragments of code. Can it be that the problem is caused by some flaw in programming logic, not by a bug in zmq4?

mikerubbertoe commented 5 years ago

I am sure that main doesnt exit before testCode() is finished. I have simplified the code even more to where everything is in a single file and the same problem occurs.

package main

import(
    "bytes"
    "encoding/binary"
    "fmt"
    zmq "github.com/pebbe/zmq4"
)

func main() {
    go test()
    for {
    }
}

func test() {   
    context, err := zmq.NewContext()
    if err != nil {
        fmt.Println("context err: ", err)
        return 
    }

    socket, err := context.NewSocket(zmq.DEALER)
    if err != nil {
        fmt.Println("socket err: ", err)
        return 
    }

    err = socket.Connect(REQUEST_URL)
    if err != nil {
        fmt.Println("connect err: ", err)
        return 
    }
    //build byte array to start request

    _, err = socket.SendBytes(bufToSend, 0)
    if err != nil {
        fmt.Println("send err: ", err)
        return
    }

    //receive all replies
    for {
       reply, err = socket.RecvBytes(0)
        if err != nil {
            fmt.Println("error. Continue")
            return
        }
        counter++
        fmt.Println(counter, reply)
        }
    }
}

This is as simple as I can make the code and still properly contact the server (I have left out the details about how to build the buffer to talk to the server). When run not as a goroutine, the program will receive all the messages, but when run as a goroutine, I receive usually half of the messages. Still no panic in the goroutine.

pebbe commented 5 years ago

In this example, the problem is in main. There was something with and empty for loop preventing other go routines to run. I don't remember the details, but it keeps your processor very busy.

Try this:

var (
    wait = make(chan bool)
)

func main() {
    go test()
    <-wait
}
mikerubbertoe commented 5 years ago

That looks like it solved it. Thanks for the help!