pebbe / zmq4

A Go interface to ZeroMQ version 4
BSD 2-Clause "Simplified" License
1.17k stars 163 forks source link

Lightweight inproc:// #120

Open gsiffert opened 6 years ago

gsiffert commented 6 years ago

import (
    "log"
    "time"

    zmq "github.com/pebbe/zmq4"
)

const (
    nbWorkers  = 100
    nbMessages = 1000
)

func onError(err error) {
    if err != nil {
        log.Fatal(err)
    }
}

func worker(ctx *zmq.Context) {
    fromProducer, err := ctx.NewSocket(zmq.PULL)
    onError(err)
    defer fromProducer.Close()
    toProducer, err := ctx.NewSocket(zmq.PUSH)
    onError(err)
    defer toProducer.Close()

    onError(fromProducer.Connect("inproc://to_workers"))
    onError(toProducer.Connect("inproc://from_workers"))

    for {
        msg, _ := fromProducer.Recv(0)
        time.Sleep(1 * time.Second)
        toProducer.Send(msg, 0)
    }
}

func main() {
    ctx, err := zmq.NewContext()
    onError(err)
    defer ctx.Term()

    toWorkers, err := ctx.NewSocket(zmq.PUSH)
    onError(err)
    defer toWorkers.Close()
    fromWorkers, err := ctx.NewSocket(zmq.PULL)
    onError(err)
    defer fromWorkers.Close()

    onError(toWorkers.Bind("inproc://to_workers"))
    onError(fromWorkers.Bind("inproc://from_workers"))

    for i := 0; i < nbWorkers; i++ {
        go worker(ctx)
    }

    time.Sleep(5 * time.Second)

    for i := 0; i < nbMessages; i++ {
        toWorkers.Send("Hello", 0)
    }

    for i := 0; i < nbMessages; i++ {
        fromWorkers.Recv(0)
    }

}

I was trying to see if it could be a good idea to abstract the usage of the Golang's channels by ZeroMQ. Sadly, the current version of this library open a new file descriptor for each socket, even if they are under the inproc protocol.

Which raise an error once my computer reach its limit of opened files, while the equivalent code in CPP doesn't.