pebbe / zmq4

A Go interface to ZeroMQ version 4
BSD 2-Clause "Simplified" License
1.17k stars 166 forks source link

Performance of Figure 13 - Pub-Sub Network with a Proxy #198

Closed DMoscicki closed 8 months ago

DMoscicki commented 8 months ago

Hello! @pebbe, first of all, thank you for the perfect instrument! I made pub-sub proxy. However, Router in Golang with zmq4-lib loses in message processing of the same on Python implementation. Can you tell me what I'm doing wrong or it may be due to the peculiarities of the language? On Python i used the pyzmq library.

Version of libzmq 4.3.5

I used a ticker for stopping program after 5 second in SUB.

The code of SUBs, Routers (Python, Golang) and PUBs below:

PUB

package main

import (
    "encoding/json"
    zmq "github.com/pebbe/zmq4"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"
)

type Person struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

func main() {
    var err error
    var i int64

    var c = make(chan os.Signal, 1)
    var wg sync.WaitGroup

    ctx, _ := zmq.NewContext()
    publisher, _ := ctx.NewSocket(zmq.PUB)
    publisher.Bind("tcp://*:6000")
    defer publisher.Close()

    signal.Notify(c, os.Interrupt)

    wg.Add(1)

    go func() {
        defer wg.Done()
        for {
            select {
            case <-c:
                log.Println("Stopped")
                return
            case <-time.After(time.Nanosecond * 0):
                if i%2 == 0 {
                    x := new(Person)
                    x.Name = "Test1"
                    x.Age = 28
                    data, _ := json.Marshal(x)
                    _, err = publisher.SendMessage("front1", data)
                    if err != nil {
                        return
                    }
                    clear(data)
                    x = nil
                } else {
                    x := new(Person)
                    x.Name = "Test2"
                    x.Age = 24
                    data, _ := json.Marshal(x)
                    _, err = publisher.SendMessage("front2", data)
                    if err != nil {
                        return
                    }
                    clear(data)
                    x = nil
                }
                i++
            }
        }
    }()

    wg.Wait()

    log.Println("Ended")
}

Router in Go:

package main

import (
    zmq "github.com/pebbe/zmq4"
)

func main() {

    ctx, _ := zmq.NewContext()

    subscriber, _ := ctx.NewSocket(zmq.XSUB)
    subscriber.Connect("tcp://localhost:6000")

    publisher, _ := ctx.NewSocket(zmq.XPUB)
    publisher.Bind("tcp://*:6001")

    defer subscriber.Close()
    defer publisher.Close()

    defer ctx.Term()

    zmq.Proxy(subscriber, publisher, nil)

}

Router in Python:

import zmq

if __name__ == '__main__':
    context = zmq.Context()
    subscriber = context.socket(zmq.XSUB)
    subscriber.connect("tcp://localhost:6000")

    publisher = context.socket(zmq.XPUB)
    publisher.bind("tcp://*:6001")

    zmq.proxy(subscriber, publisher, None)

    print("interrupted")

    # context.term

    del subscriber
    del publisher

    context.term()

SUB:

package main

import (
    "encoding/json"
    "fmt"
    zmq "github.com/pebbe/zmq4"
    "log"
    "os"
    "sync"
    "time"
)

type Person struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

func main() {

    ticker := time.NewTicker(time.Second * 5)
    var i int64
    var c = make(chan os.Signal, 1)
    var wg sync.WaitGroup

    ctx, _ := zmq.NewContext()

    subscriber, _ := ctx.NewSocket(zmq.SUB)
    subscriber.Connect("tcp://localhost:6001")
    subscriber.SetSubscribe("front1")

    defer subscriber.Close() // cancel subscribe

    wg.Add(1)

    go func() {
        defer wg.Done()
        for {
            select {
            case <-c:
                log.Println("STOPPED")
                return
            case <-time.After(time.Nanosecond * 0):
                f, err := subscriber.RecvMessageBytes(0)
                if err != nil {
                    break //  Interrupted
                } else {
                    i++
                }
                var pers = &Person{}
                _ = json.Unmarshal(f[1], pers)
                //log.Println(pers)
                clear(f)
                pers = nil
            case <-ticker.C:
                log.Println("TICKER")
                return
            }
        }
    }()

    wg.Wait()

    ticker.Stop()

    fmt.Println("Received ", i)

}