nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

SIngleton Producer #331

Closed wilhart closed 9 months ago

wilhart commented 3 years ago

Is it possible to use singleton NSQ Producer? I'm running an API with NSQ Producer and it seems like i have to open and close connection everytime i hit endpoint.

ploxiln commented 3 years ago

You can create the Producer (using NewProducer()) during the initialization of your service, store it to a global variable, and re-use it from all request goroutines. It is "thread-safe".

hunterhug commented 3 years ago

Is it possible to use singleton NSQ Producer? I'm running an API with NSQ Producer and it seems like i have to open and close connection everytime i hit endpoint.

Here is a example you can use:

package nq

import (
    "encoding/json"
    "errors"
    "fmt"
    "github.com/nsqio/go-nsq"
    "math/rand"
    "sync"
    "time"
)

type Client struct {
    Producer       []*nsq.Producer
    PLen           int
    topics         map[string]struct{}
    mux            sync.Mutex
    Prefix         string
    NsqdAddress    []string
    LookupdAddress []string
}

func New(prefix string, nsqdAddress []string, lookupdAddress []string) (*Client, error) {
    if len(nsqdAddress) == 0 || len(lookupdAddress) == 0 {
        return nil, errors.New("config invalid")
    }

    rand.Seed(time.Now().Unix())

    c := &Client{
        Producer:       nil,
        NsqdAddress:    nsqdAddress,
        LookupdAddress: lookupdAddress,
        Prefix:         prefix,
        PLen:           len(nsqdAddress),
        topics:         map[string]struct{}{},
    }

    for _, v := range nsqdAddress {
        producer, err := nsq.NewProducer(v, nsq.NewConfig())
        if err != nil {
            return nil, err
        }
        c.Producer = append(c.Producer, producer)
    }
    return c, nil
}

func (c *Client) Pub(topic string, object interface{}) error {
    if c.Prefix != "" {
        topic = fmt.Sprintf("%s.%s", c.Prefix, topic)
    }
    body, err := json.Marshal(object)
    if err != nil {
        return err
    }
    return c.Producer[rand.Intn(c.PLen)].Publish(topic, body)
}

func (c *Client) DeferredPub(topic string, delay time.Duration, object interface{}) error {
    if c.Prefix != "" {
        topic = fmt.Sprintf("%s.%s", c.Prefix, topic)
    }
    body, err := json.Marshal(object)
    if err != nil {
        return err
    }
    return c.Producer[rand.Intn(c.PLen)].DeferredPublish(topic, delay, body)
}

func (c *Client) PubRaw(topic string, raw []byte) error {
    if c.Prefix != "" {
        topic = fmt.Sprintf("%s.%s", c.Prefix, topic)
    }
    return c.Producer[rand.Intn(c.PLen)].Publish(topic, raw)
}

func (c *Client) DeferredPubRaw(topic string, delay time.Duration, raw []byte) error {
    if c.Prefix != "" {
        topic = fmt.Sprintf("%s.%s", c.Prefix, topic)
    }
    return c.Producer[rand.Intn(c.PLen)].DeferredPublish(topic, delay, raw)
}

func (c *Client) Sub(topic, channel string, handler nsq.Handler) (err error) {
    c.mux.Lock()
    defer c.mux.Unlock()

    if _, ok := c.topics[topic]; ok {
        return nil
    } else {
        c.topics[topic] = struct{}{}
    }

    if c.Prefix != "" {
        topic = fmt.Sprintf("%s.%s", c.Prefix, topic)
    }

    consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
    if err != nil {
        return err
    }

    consumer.AddHandler(handler)
    if err := consumer.ConnectToNSQLookupds(c.LookupdAddress); err != nil {
        return err
    }

    return nil
}

func (c *Client) SubMany(topic, channel string, handler nsq.Handler, concurrency int) (err error) {
    c.mux.Lock()
    defer c.mux.Unlock()

    if _, ok := c.topics[topic]; ok {
        return nil
    } else {
        c.topics[topic] = struct{}{}
    }

    if c.Prefix != "" {
        topic = fmt.Sprintf("%s.%s", c.Prefix, topic)
    }

    consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
    if err != nil {
        return err
    }

    consumer.AddConcurrentHandlers(handler, concurrency)
    if err := consumer.ConnectToNSQLookupds(c.LookupdAddress); err != nil {
        return err
    }

    return nil
}

Test it:

func TestNew(t *testing.T) {
    a, err := New("En-Shop", []string{"10.0.5.2:5150","10.0.5.3:5150","10.0.5.4:5150"}, []string{"10.0.5.2:5161","10.0.5.3:5161","10.0.5.4:5161"})
    if err != nil {
        fmt.Println(err.Error())
        return
    }

    err = a.DeferredPubRaw("Test", time.Minute*10, []byte("ssss"))
    if err != nil {
        fmt.Println(err.Error())
        return
    }
}