pebbe / zmq4

A Go interface to ZeroMQ version 4
BSD 2-Clause "Simplified" License
1.17k stars 163 forks source link

Proxy error: "socket operation on non-socket" #145

Closed kprimice closed 4 years ago

kprimice commented 5 years ago

I am trying to setup a Pub-Sub network with a proxy (like in the doc). However after a short period of time (about 1 minute) the proxy dies:

socket operation on non-socket

       ctx, err := zmq.NewContext()
        // frontend
    s, err := ctx.NewSocket(zmq.XSUB)
    if err != nil {
        return fmt.Errorf("NewSocket: %s", err)
    }
    if err = s.Bind("inproc://input"); err != nil {
        return fmt.Errorf("Bind: %s", err)
    }
    // backend
    p, err := zmq.NewSocket(zmq.XPUB)
    if err != nil {
        return fmt.Errorf("NewSocket: %s", err)
    }
    if err = p.Bind("tcp://*:5555"); err != nil {
        return fmt.Errorf("Bind: %s", err)
    }
    // listen
    l, err := ctx.NewSocket(zmq.PAIR)
    if err != nil {
        return fmt.Errorf("NewSocket: %s", err)
    }
    if err = l.Connect("inproc://pipe"); err != nil {
        return fmt.Errorf("Connect: %s", err)
    }
    if err := zmq.Proxy(s, p, l); err != nil {
        log.Fatal("Proxy is dead: ", err)
    }

And then all my publishers (each in their own thread) share the same context (ctx) and publish on inproc.

kprimice commented 5 years ago

I found a way to reproduce the error with the espresso example.

https://github.com/pebbe/zmq4/blob/master/examples/espresso.go

If I reduce the time.Sleep() line 47 to 10 milliseconds, after some time I also get the error

socket operation on non-socket

kprimice commented 5 years ago

It seems to be a problem of throughput. The longer the message sent is, the longer the time.Sleep has to be to prevent the proxy from shutting down.

What do you think @pebbe?

pebbe commented 5 years ago

I have no idea what goes wrong.

This is a subject best discussed on the mailing list. That's where the experts are. http://zeromq.org/docs:mailing-lists

kprimice commented 5 years ago

Thank you @pebbe

I posted on the mailing list but didn't get a reply. So I decided to investigate a bit further and tested another go library for zmq: https://github.com/zeromq/goczmq

Reproducing the espresso example I don't get the socket error when I set the sleep time to 1 millisecond (even if I remove it altogether).

Indeed ZMQ throughput is supposed to be on the order of magnitude of MB/s

pebbe commented 5 years ago

Can you post your implementation using goczmq?

kprimice commented 5 years ago

import (
    zmq "github.com/zeromq/goczmq"
    log "github.com/sirupsen/logrus"

    "fmt"
    "math/rand"
    "time"
)

func publisher_thread() {
    pub, _ := zmq.NewPub("inproc://frontend")
    pub.Connect("inproc://frontend")

    for {
        s := fmt.Sprintf("%c-%05d02", rand.Intn(10)+'A', rand.Intn(100000))
        err := pub.SendFrame([]byte(s), 0)
        if err != nil {
            break //  Interrupted
        }
        time.Sleep(10 * time.Millisecond)
    }
}

//  The listener receives all messages flowing through the proxy, on its
//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
//  attached child threads. In other languages your mileage may vary:

func listener_thread() {
    pipe, _ := zmq.NewPair("inproc://pipe")

    //  Print everything that arrives on pipe
    for {
        msg, _, err := pipe.RecvFrame()
        if err != nil {
            break //  Interrupted
        }
        fmt.Printf("%q\n", msg)
    }
}

//  The main task starts the subscriber and publisher, and then sets
//  itself up as a listening proxy. The listener runs as a child thread:

func main() {
    proxy := zmq.NewProxy()

    var err error

    if err = proxy.SetFrontend(zmq.XSub, "inproc://frontend"); err != nil {
        log.Fatal(err)
    }
    if err = proxy.SetBackend(zmq.XPub, "tcp://*:6001"); err != nil {
        log.Fatal(err)
    }
    /*
    if err = proxy.SetCapture("inproc://pipe"); err != nil {
        log.Fatal(err)
    }
    */

    go publisher_thread()
    // go listener_thread()

    //  Subscribe to "A" and "B"
    sub, _ := zmq.NewSub("tcp://localhost:6001","A,B")
    for {
        msg, _, err := sub.RecvFrame()
        if err != nil {
            break //  Interrupted
        }
        log.Info(fmt.Sprintf("%s", msg))
    }
}
pebbe commented 5 years ago

What are your imports?

kprimice commented 5 years ago

I added the imports to my previous post

pebbe commented 5 years ago

Your example is very different from my version of espresso.go, so I can't really compare this.

Searching for "socket operation on non-socket" I find a lot of hits, with all kinds of wrapper languages, so my guess is this is a bug in ZeroMQ.

kprimice commented 5 years ago

How is it very different @pebbe ?

I mean it is a XPUB/XSUB proxy with one subscriber and one publisher. I just removed the listener because it is not very useful for this issue.

kprimice commented 5 years ago

If you look in details the posts you refer to on google, you will see that it was usually fixed and was not a problem with ZMQ. Also if you add "proxy" to your search you won't have a lot of match.

kprimice commented 5 years ago

@pebbe do you think we can work together and find a fix or shall I use the other library? I absolutely need this feature for my project

cypres commented 5 years ago

@kprimice I had this exact problem, the problem is that the sockets will be garbage collected by go.

Just setup a deferred call to socket.Close() and Go will keep a reference to the socket alive until your proxy is closed (by a signal)

// frontend
s, err := ctx.NewSocket(zmq.XSUB)
defer s.Close()
// backend
p, err := zmq.NewSocket(zmq.XPUB)
defer p.Close()
// listen
l, err := ctx.NewSocket(zmq.PAIR)
defer l.Close()
gbandres98 commented 4 years ago

Had the same problem and @duedal fix worked.

kprimice commented 4 years ago

It did solve the problem. Thanks!

Closing