nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

beginning close #323

Closed w896736588 closed 3 years ago

w896736588 commented 3 years ago

nsq version v1.0.8 golang version 1.12 First create a producer and product a msg,then stop it (Topic does not exist will report an error)

var err error
if nsqProducer, err = nsq.NewProducer(fmt.Sprintf(
    "%s:%s",
    nsqConfig["addr"],
    nsqConfig["producerPort"],
), config); err != nil {
    panic(err.Error())
}
if err := nsqProducer.Publish(topic, []byte(msg)); err != nil {
    panic(err.Error())
}
nsqProducer.Stop()

Then create a consumer About 30 seconds later,Output the following log

2021/06/03 10:09:09 INF    1 (121.40.109.241:4150) beginning close
2021/06/03 10:09:09 INF    1 (121.40.109.241:4150) readLoop exiting
2021/06/03 10:09:09 INF    1 (121.40.109.241:4150) breaking out of writeLoop
2021/06/03 10:09:09 INF    1 (121.40.109.241:4150) writeLoop exiting

I traced the error ,in conn.go 489 atomic.LoadInt32(&c.closeFlag)

func (c *Conn) readLoop() {
    delegate := &connMessageDelegate{c}
    for {
        if atomic.LoadInt32(&c.closeFlag) == 1 {  
            fmt.Print("112")
            goto exit
        }

But i can not find where set &c.closeFlag = 1

If i do not create a producer,it will not report this error.

mreiferson commented 3 years ago

Thanks for submitting the issue, but it's a little hard to follow. Can you possibly put together a complete reproducible test or example code?

w896736588 commented 3 years ago

Thanks for submitting the issue, but it's a little hard to follow. Can you possibly put together a complete reproducible test or example code?

package main

import (
    "github.com/nsqio/go-nsq"
    "github.com/rs/zerolog/log"
    "os"
    "os/signal"
    "syscall"
)

type myMessageHandler struct {
}

func main(){
    topic := "testTopic"

    //producer start
    producerConfig := nsq.NewConfig()
    producer, err1 := nsq.NewProducer("****:4150", producerConfig)
    if err1 != nil {
        panic(err1)
    }
    var err2 = producer.Publish(topic, []byte("0"))
    if err2 != nil {
        panic(err2)
    }
    producer.Stop()

    //consumer start
    consumerConfig := nsq.NewConfig()
    consumer, err := nsq.NewConsumer(topic, topic + "channel", consumerConfig)
    if err != nil  {
        panic(err)
    }
    consumer.ChangeMaxInFlight(1)
    consumer.SetLogger(
        &NoopNSQLogger{},
        nsq.LogLevelDebug,
    )
    messageHandle := new(myMessageHandler)
    consumer.AddHandler(messageHandle)
    err = consumer.ConnectToNSQLookupd("****:4161")
    if err != nil {
        panic(err)
    }
    consumerSignal(consumer)
}
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
    if len(m.Body) == 0 {
        // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
        return nil
    }
    log.Debug().Msgf("message %s ",string(m.Body))
    // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
    return nil
}

type NoopNSQLogger struct {
}

func (l *NoopNSQLogger) Output(calldepth int, s string) error {
    log.Debug().Msgf("nsq output: " + s)
    return nil
}

func consumerSignal(consumer *nsq.Consumer)  {
    shutdown := make(chan os.Signal,1)
    signal.Notify(shutdown,syscall.SIGINT)
    for{
        select {
        case <-consumer.StopChan:
            return
        case <-shutdown:
            consumer.Stop()
        }
    }
}

Please replace the nsq host .

2021/06/16 18:50:32 INF    1 (****:4150) connecting to nsqd
2021/06/16 18:50:32 INF    1 stopping
2021/06/16 18:50:32 INF    1 exiting router
{"level":"debug","time":"2021-06-16T18:50:32+08:00","message":"nsq output: DBG    2 [testTopic/testTopicchannel] starting Handler"}
{"level":"debug","time":"2021-06-16T18:50:32+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] querying nsqlookupd http://****:4161/lookup?topic=testTopic"}

{"level":"debug","time":"2021-06-16T18:50:32+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] (****:4150) connecting to nsqd"}
{"level":"debug","time":"2021-06-16T18:50:32+08:00","message":"nsq output: DBG    2 [testTopic/testTopicchannel] (****:4150) IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false
 Deflate:false Snappy:false AuthRequired:false}"}
{"level":"debug","time":"2021-06-16T18:50:32+08:00","message":"nsq output: DBG    2 [testTopic/testTopicchannel] (****:4150) sending RDY 1"}
{"level":"debug","time":"2021-06-16T18:50:32+08:00","message":"message 0 "}
{"level":"debug","time":"2021-06-16T18:50:32+08:00","message":"nsq output: DBG    2 [testTopic/testTopicchannel] (****:4150) FIN 0f1f068e2a2e8000"}
**2021/06/16 18:51:02 INF    1 (****:4150) beginning close
2021/06/16 18:51:02 INF    1 (****:4150) breaking out of writeLoop
2021/06/16 18:51:02 INF    1 (****:4150) writeLoop exiting
2021/06/16 18:51:02 INF    1 (****:4150) readLoop exiting
2021/06/16 18:51:02 INF    1 (****:4150) finished draining, cleanup exiting
2021/06/16 18:51:02 INF    1 (****:4150) clean close complete**
{"level":"debug","time":"2021-06-16T18:51:02+08:00","message":"nsq output: DBG    2 [testTopic/testTopicchannel] (****:4150) heartbeat received"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] stopping..."}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] (****:4150) received CLOSE_WAIT from nsqd"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] (****:4150) beginning close"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] (****:4150) breaking out of writeLoop"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] (****:4150) writeLoop exiting"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] (****:4150) readLoop exiting"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] (****:4150) finished draining, cleanup exiting"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] (****:4150) clean close complete"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: WRN    2 [testTopic/testTopicchannel] there are 0 connections left alive"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] stopping handlers"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: DBG    2 [testTopic/testTopicchannel] stopping Handler"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] exiting lookupdLoop"}
{"level":"debug","time":"2021-06-16T18:51:12+08:00","message":"nsq output: INF    2 [testTopic/testTopicchannel] rdyLoop exiting"}
mreiferson commented 3 years ago

I created the test:

func TestGH323(t *testing.T) {
    topic := "testTopic"

    //producer start
    producerConfig := NewConfig()
    producer, err1 := NewProducer("127.0.0.1:4150", producerConfig)
    if err1 != nil {
        panic(err1)
    }
    var err2 = producer.Publish(topic, []byte("0"))
    if err2 != nil {
        panic(err2)
    }
    producer.Stop()

    //consumer start
    consumerConfig := NewConfig()
    consumer, err := NewConsumer(topic, topic+"channel", consumerConfig)
    if err != nil {
        panic(err)
    }
    consumer.ChangeMaxInFlight(1)
    consumer.SetLogger(
        &NoopNSQLogger{},
        LogLevelDebug,
    )
    messageHandle := new(myMessageHandler)
    consumer.AddHandler(messageHandle)
    err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
    if err != nil {
        panic(err)
    }
    consumerSignal(consumer)
}

type myMessageHandler struct {
}

func (h *myMessageHandler) HandleMessage(m *Message) error {
    if len(m.Body) == 0 {
        // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
        return nil
    }
    log.Printf("message %s", m.Body)
    // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
    return nil
}

type NoopNSQLogger struct {
}

func (l *NoopNSQLogger) Output(calldepth int, s string) error {
    log.Printf("nsq output: %s", s)
    return nil
}

func consumerSignal(consumer *Consumer) {
    time.AfterFunc(20*time.Millisecond, func() {
        consumer.Stop()
    })
    for {
        select {
        case <-consumer.StopChan:
            return
        }
    }
}

and ran against latest master of go-nsq: go test -v -run TestGH323 and it exited as expected:

$ go test -v -run TestGH323
=== RUN   TestGH323
2021/07/05 14:33:53 INF    1 (127.0.0.1:4150) connecting to nsqd
2021/07/05 14:33:53 INF    1 stopping
2021/07/05 14:33:53 INF    1 exiting router
2021/07/05 14:33:53 INF    1 (127.0.0.1:4150) beginning close
2021/07/05 14:33:53 INF    1 (127.0.0.1:4150) readLoop exiting
2021/07/05 14:33:53 INF    1 (127.0.0.1:4150) breaking out of writeLoop
2021/07/05 14:33:53 INF    1 (127.0.0.1:4150) writeLoop exiting
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2021/07/05 14:33:53 nsq output: DBG    2 [testTopic/testTopicchannel] starting Handler
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] (SNAKES:4150) connecting to nsqd
2021/07/05 14:33:53 INF    1 (127.0.0.1:4150) finished draining, cleanup exiting
2021/07/05 14:33:53 INF    1 (127.0.0.1:4150) clean close complete
2021/07/05 14:33:53 nsq output: DBG    2 [testTopic/testTopicchannel] (SNAKES:4150) IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false}
2021/07/05 14:33:53 nsq output: DBG    2 [testTopic/testTopicchannel] (SNAKES:4150) sending RDY 1
2021/07/05 14:33:53 message 0
2021/07/05 14:33:53 nsq output: DBG    2 [testTopic/testTopicchannel] (SNAKES:4150) FIN 0f36e7172f3dc000
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] stopping...
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] (SNAKES:4150) received CLOSE_WAIT from nsqd
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] (SNAKES:4150) beginning close
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] (SNAKES:4150) readLoop exiting
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] (SNAKES:4150) breaking out of writeLoop
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] (SNAKES:4150) writeLoop exiting
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] (SNAKES:4150) finished draining, cleanup exiting
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] (SNAKES:4150) clean close complete
2021/07/05 14:33:53 nsq output: WRN    2 [testTopic/testTopicchannel] there are 0 connections left alive
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] stopping handlers
2021/07/05 14:33:53 nsq output: DBG    2 [testTopic/testTopicchannel] stopping Handler
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] rdyLoop exiting
2021/07/05 14:33:53 nsq output: INF    2 [testTopic/testTopicchannel] exiting lookupdLoop
--- PASS: TestGH323 (0.25s)
PASS
ok      github.com/nsqio/go-nsq 0.403s