pebbe / zmq4

A Go interface to ZeroMQ version 4
BSD 2-Clause "Simplified" License
1.17k stars 163 forks source link

No buffer space available (tcp.cpp:69) when setting SNDBUF and RCVBUF #73

Closed nguyenxuantuong closed 8 years ago

nguyenxuantuong commented 8 years ago

Hi,

I have zeromq: stable 4.1.4 installed using brew on MacOSX and have written a simple PUB/SUB program to test zeromq. But when I run the sample program using flags --bufsize > 5 (to use a buffer of size > 5MB) (go run go_zmq_pubsub.go --bufsize=6); it throws the following exception:

No buffer space available (tcp.cpp:69) SIGABRT: abort PC=0x7fff9911c286 m=0 signal arrived during cgo execution

Do you have any suggestion why it happened? Thanks.

Below is the program I used to test the zeromq4.x

package main

import (
    "fmt"
    "flag"
    "strconv"
    "sync"
    log "github.com/Sirupsen/logrus"
    zmq "github.com/pebbe/zmq4"
    "time"
)

var _ = fmt.Println

func main(){
    var port int
    var bufsize int
    flag.IntVar(&port, "port", 7676, "server's zmq tcp port")
    flag.IntVar(&bufsize, "bufsize", 0, "socket kernel buffer size")

    flag.Parse();

    publisher, err := zmq.NewSocket(zmq.PUB)
    if(err != nil) {
        log.Fatal(err)
    }

    //set publisher kernel transmit buffer size
    //convert into bytes
    if err := publisher.SetSndbuf(bufsize * 1000000); err != nil {
        log.Fatal(err)
    }

    defer publisher.Close()
    publisher.Bind("tcp://*:" + strconv.Itoa(port))

    //SETUP subscriber
    subscriber, err := zmq.NewSocket(zmq.SUB)
    if(err != nil) {
        log.Fatal(err)
    }

    //set subscriber kernel receive buffer size
    if err := subscriber.SetRcvbuf(bufsize * 1000000); err != nil {
        log.Fatal(err)
    }

    defer subscriber.Close()
    subscriber.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
    subscriber.SetSubscribe("")

    var wg sync.WaitGroup
    wg.Add(2)

    idx := 0

    go func(wg *sync.WaitGroup) {
        //start streaming messages
        ticker := time.NewTicker(1 * time.Second)
        go func() {
            for {
                select {
                case <-ticker.C:
                    _, err = publisher.Send("PKG:"+strconv.Itoa(idx), 0)

                    idx++;

                    if(err != nil) {
                        log.Error(err)
                    }
                }
            }
        }()
    }(&wg)

    //receiver
    go func(wg *sync.WaitGroup) {
        go func(){
            for {
                payload, err := subscriber.Recv(0)
                _ = payload

                if err != nil {
                    log.Error(err)
                    break
                }

                //now sending into worker pool
                log.Info("RECEIVE:" + payload)
            }
        }()
    }(&wg)

    wg.Wait()
}

PS: I just tried and the above code works fine on Centos7 with lib-zeromq built from source.

aletheia7 commented 8 years ago

go zmq Socket.SetSndBuf()/Socket.SetRcvBuf() just calls the OS's underlying setsockopt(SO_SNDBUF | SO_RCVBUF, x) system call.

Every OS has different SO_SNDBUF limits and reacts differently when limits are exceeded. Linux will silently accept a too high value; and lower it for you. OSX/FreeBSD fails fast.

OSX's upper limit is related to kern.ipc.maxsockbuf. I am running El Capitan (10.11.3, latest) Here is what I see:

# sysctl kern.ipc.maxsockbuf
kern.ipc.maxsockbuf: 6291456
#

This may help for OSX: http://slaptijack.com/system-administration/mac-os-x-tcp-performance-tuning/

pebbe commented 8 years ago

So, this isn't an issue with zmq4?