nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

v1.2.1 use DeferredPublish publish delay message,delay invalidation #328

Closed cnbattle closed 3 years ago

cnbattle commented 3 years ago

i use v1.2.0, is ok.

    err := Producer.DeferredPublish("topic-demo1", time.Second*5, []byte(message))
    log.Println(err)
mreiferson commented 3 years ago

Can you share a full example and logs of the issue you’re seeing?

cnbattle commented 3 years ago

hamdle.go

package mq

import (
    "github.com/nsqio/go-nsq"
    "log"
)

type newHandler struct{}

func (m *newHandler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    log.Println(addr, message)
    return
}

func Handle() {
    topic := "topic-demo1"
    channel := "channel-demo1"
    addr := "192.168.33.10:4150"
    conf := nsq.NewConfig()
    newConsumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {
        log.Println(err)
        return
    }
    newHandler := &newHandler{}
    newConsumer.AddHandler(newHandler)
    err = newConsumer.ConnectToNSQD(addr)
    if err != nil {
        log.Println(err)
    }
}

producer.go

package mq

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

var Producer *nsq.Producer

func init() {
    var err error
    nsqAddr := "192.168.33.10:4150"
    conf := nsq.NewConfig()
    Producer, err = nsq.NewProducer(nsqAddr, conf)
    if err != nil {
        fmt.Println(err)
        return
    }
}

testing file. handle_test.go

package mq

import "testing"

func TestHandle(t *testing.T) {
    Handle()
    select {}
}

producer_test.go

package mq

import (
    "log"
    "testing"
    "time"
)

func TestPublish(t *testing.T) {
    message := "test message,publish time :" + time.Now().Format("2006-01-02 15:04:05")
    log.Println(message)
    err := Producer.DeferredPublish("topic-demo1", time.Second*10, []byte(message))
    if err != nil {
        t.Fatal(err)
    }
}

i run TestHandle func first. log is

=== RUN   TestHandle
2021/08/20 09:07:45 INF    2 [topic-demo1/channel-demo1] (192.168.33.10:4150) connecting to nsqd

run TestPublish func next, log is

=== RUN   TestPublish
2021/08/20 09:09:21 test message,publish time :2021-08-20 09:09:21
2021/08/20 09:09:21 INF    1 (192.168.33.10:4150) connecting to nsqd
--- PASS: TestPublish (0.01s)
PASS

Log of the first step is

=== RUN   TestHandle
2021/08/20 09:07:45 INF    2 [topic-demo1/channel-demo1] (192.168.33.10:4150) connecting to nsqd
2021/08/20 09:09:21 192.168.33.10:4150 test message,publish time :2021-08-20 09:09:21

i use docker-compose deploy nsq , file is : https://gist.github.com/cnbattle/977ef3b7617773fe7befcd213ef50522

if i specify version v1.2.0 , delay is ok.

cnbattle commented 3 years ago

my nsqio/nsq:latest local image hash is :

sha256:d3f340ae23ee28117dfc7629cebe9a2c42b74b85455e9a472d35e4f33cb2bc2c
mreiferson commented 3 years ago

I combined this into a single test:

package main

import (
    "fmt"
    "log"
    "testing"
    "time"

    "github.com/nsqio/go-nsq"
)

type newHandler struct{}

func (m *newHandler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    log.Println(addr, message)
    return
}

func TestPublish(t *testing.T) {
    topic := "topic-demo1"
    channel := "channel-demo1"
    nsqAddr := "127.0.0.1:4150"

    conf1 := nsq.NewConfig()
    Producer, err := nsq.NewProducer(nsqAddr, conf1)
    if err != nil {
        fmt.Println(err)
        return
    }

    message := "test message,publish time :" + time.Now().Format("2006-01-02 15:04:05")
    log.Println(message)
    err = Producer.DeferredPublish("topic-demo1", time.Second*10, []byte(message))
    if err != nil {
        t.Fatal(err)
    }

    conf2 := nsq.NewConfig()
    newConsumer, err := nsq.NewConsumer(topic, channel, conf2)
    if err != nil {
        log.Println(err)
        return
    }
    newHandler := &newHandler{}
    newConsumer.AddHandler(newHandler)
    err = newConsumer.ConnectToNSQD(nsqAddr)
    if err != nil {
        log.Println(err)
    }

    select {}
}

It works as expected, so the issue is likely something to do with your environment/configuration:

$ go test
2021/08/20 23:20:41 test message,publish time :2021-08-20 23:20:41
2021/08/20 23:20:41 INF    1 (127.0.0.1:4150) connecting to nsqd
2021/08/20 23:20:41 INF    2 [topic-demo1/channel-demo1] (127.0.0.1:4150) connecting to nsqd
2021/08/20 23:20:51 127.0.0.1:4150 test message,publish time :2021-08-20 23:20:41