nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

consumer: sometimes got heartbeat but not message #206

Closed slayercat closed 7 years ago

slayercat commented 7 years ago

1. env

nsqd --version nsqd v1.0.0-compat (built w/go1.8)

go-nsq version commit b9762cdcb6d5cc5ac5287ca076354143d332cc97 Tue Feb 14 16:13:23 2017 -0800

# go version go version go1.8 linux/amd64

2. what do i meet

# curl -XGET http://127.0.0.1:4151/stats; 
[root@18-190 ptd]# curl -XGET http://127.0.0.1:4151/stats; 
nsqd v1.0.0-compat (built w/go1.8)
start_time 2017-03-27T19:07:02+08:00
uptime 45m7.260003591s

Health: OK

   [.raw.raw#ephemeral] depth: 0     be-depth: 0     msgs: 6656     e2e%: 
      [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 6656     e2e%: 
        [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 6656     re-q: 0        msgs: 6656     connected: 8s

   [d.c2.cp#ephemeral] depth: 10000 be-depth: 0     msgs: 791227   e2e%: 

   [d.c2.cp.done#ephemeral] depth: 0     be-depth: 0     msgs: 0        e2e%: 
      [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 0        e2e%: 
        [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 0        re-q: 0        msgs: 0        connected: 8s

   [d.c2.domain#ephemeral] depth: 10000 be-depth: 0     msgs: 208218   e2e%: 

...

code snip

func nsqSubscribe(addr string, topic string, channel string, hdlr nsq.HandlerFunc) error {
    consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
    if err != nil {
        print("new consumer error: ", err, "\n")
        time.Sleep(1 * time.Second) //wait 1s
        panic(err)
    }   
    consumer.AddHandler(hdlr)
    err = consumer.ConnectToNSQD(addr)
    if err != nil {
        print("connect nsqd error: ", err, "\n")
        time.Sleep(1 * time.Second) //wait 1s
        panic(err)
    }   
    _ = <-consumer.StopChan
    panic("nsq conn dead topic=" + topic + " channel=" + channel)
    return nil 
}

func main(){
producer, err := nsq.NewProducer(nsqConf.Local.Addr, nsq.NewConfig())
    if err != nil {
        panic(err)
    }
    go func() {
        detector := new(c2.C2Sdk)

        if detectorEnabled {
            err = detector.Init()
            if err != nil {
                fmt.Println("Failed to init c2")
                panic(err)
            }
        }
        nsqSubscribe(nsqConf.Local.Addr, "d.c2.cp#ephemeral", "detect#ephemeral",
            nsq.HandlerFunc(func(message *nsq.Message) error {
                return handler_scan(message, detector,
                    producer, unmarshal_url, scan_url,
                    "d.c2.cp.done#ephemeral")
            }))
    }()
go func() {
        detector := new(c2.C2Sdk)

        if detectorEnabled {
            err = detector.Init()
            if err != nil {
                fmt.Println("Failed to init c2")
                panic(err)
            }
        } 
        nsqSubscribe(nsqConf.Local.Addr, "d.c2.url#ephemeral", "detect#ephemeral",
            nsq.HandlerFunc(func(message *nsq.Message) error {
                return handler_scan(message, detector,
                    producer, unmarshal_url, scan_url,
                    "d.c2.url.done#ephemeral")
            }))
    }()

.....

}

3. how to reproduct

slayercat commented 7 years ago

did some dig and find redistributeRDY() always returns at

if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) {
    return
}
slayercat commented 7 years ago
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
func (r *Consumer) redistributeRDY() {
    r.log(LogLevelInfo, "in func rdyLoop redistributeRDY")
    if r.inBackoffTimeout() {
        r.log(LogLevelInfo, "rdyLoop redistributeRDY: inBackoffTimeout")
        return
    }    

    // if an external heuristic set needRDYRedistributed we want to wait 
    // until we can actually redistribute to proceed
    conns := r.conns()
    if len(conns) == 0 {
        r.log(LogLevelInfo, "rdyLoop redistributeRDY: conns=0")
        return
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: conns!=0")

    maxInFlight := r.getMaxInFlight()
    if len(conns) > int(maxInFlight) {
        r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
            len(conns), maxInFlight)
        atomic.StoreInt32(&r.needRDYRedistributed, 1)
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 1111")

    if r.inBackoff() && len(conns) > 1 {
        r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
        atomic.StoreInt32(&r.needRDYRedistributed, 1)
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 2222")

    if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) { 
        r.log(LogLevelInfo, "rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true")
        return
    }    

    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 3333")
slayercat commented 7 years ago

r.needRDYRedistributed is 0

slayercat commented 7 years ago
    maxInFlight := r.getMaxInFlight()
    r.log(LogLevelInfo, "count max in fight=%d   count conns= %d",int(maxInFlight), len(conns))
    if len(conns) > int(maxInFlight) {
        r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
            len(conns), maxInFlight)
        atomic.StoreInt32(&r.needRDYRedistributed, 1)
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 1111")

    r.log(LogLevelInfo, "in backoff? %s", r.inBackoff())
    if r.inBackoff() && len(conns) > 1 {
        r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
        atomic.StoreInt32(&r.needRDYRedistributed, 1)
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 2222")
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] count max in fight=1   count conns= 1
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] in backoff? %!s(bool=false)
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true --> r.needRDYRedistributed0
slayercat commented 7 years ago

seems any reproducted channel will not call updateRDY for any time. digging why.

ploxiln commented 7 years ago

What do you mean by "reproducted"?

slayercat commented 7 years ago

sorry for that, @ploxiln . I mean when this issue appears. the associated consumer will not call function updateRDY. I'm still digging it.

slayercat commented 7 years ago

When I make nsqd log verbose , I saw channel created. but when curl http://127.0.0.1:4151/stats. the target channel not exists, and the go-nsq consumer will not get any message either.

Mar 28 19:19:13 18-190 nsqd[935]: [nsqd] 2017/03/28 19:19:13.084582 TOPIC(d.c2.cp#ephemeral): new channel(detect#ephemeral)
Mar 28 19:19:13 18-190 nsqd[935]: [nsqd] 2017/03/28 19:19:13.189118 TOPIC(d.c2.cp#ephemeral): created
[root@18-190 ptd]# date;curl -s -XGET http://127.0.0.1:4151/stats
Tue Mar 28 19:25:18 CST 2017
nsqd v1.0.0-compat (built w/go1.8)
start_time 2017-03-27T19:07:02+08:00
uptime 24h18m16.376911107s

Health: OK

   [.raw.raw#ephemeral] depth: 0     be-depth: 0     msgs: 534      e2e%: 
      [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 534      e2e%: 
        [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 534      re-q: 0        msgs: 534      connected: 7s

   [d.c2.cp#ephemeral] depth: 10000 be-depth: 0     msgs: 41524    e2e%: 

   [d.c2.cp.done#ephemeral] depth: 0     be-depth: 0     msgs: 0        e2e%: 
      [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 0        e2e%: 
        [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 0        re-q: 0        msgs: 0        connected: 7s
slayercat commented 7 years ago

[root@18-190 ptd]# netstat -anop|grep c2
tcp 0 0 127.0.0.1:38372 127.0.0.3:4150 ESTABLISHED 21601/c2detector off (0.00/0/0)
tcp 0 0 127.0.0.1:38370 127.0.0.3:4150 ESTABLISHED 21601/c2detector off (0.00/0/0)
tcp 0 0 127.0.0.1:38368 127.0.0.3:4150 ESTABLISHED 21601/c2detector off (0.00/0/0)
tcp 0 0 127.0.0.1:38374 127.0.0.3:4150 ESTABLISHED 21601/c2detector off (0.00/0/0)
unix 3 [ ] STREAM CONNECTED 360495996 21601/c2detector
[root@18-190 ptd]# cat nsq.log |grep 38368
Mar 28 20:49:17 18-190 nsqd[16733]: [nsqd] 2017/03/28 20:49:17.838368 [127.0.0.1:53812] state rdy: 1 inflt: 0
Mar 28 20:51:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:51:47.325215 [127.0.0.1:38368] state rdy: 1 inflt: 0
Mar 28 20:51:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:51:47.325381 PROTOCOL(V2): [127.0.0.1:38368] [NOP]
Mar 28 20:51:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:51:47.325390 Exec client=127.0.0.1:38368
Mar 28 20:52:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:17.325252 [127.0.0.1:38368] state rdy: 1 inflt: 0
Mar 28 20:52:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:17.325391 PROTOCOL(V2): [127.0.0.1:38368] [NOP]
Mar 28 20:52:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:17.325399 Exec client=127.0.0.1:38368
Mar 28 20:52:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:47.325343 [127.0.0.1:38368] state rdy: 1 inflt: 0
Mar 28 20:52:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:47.326081 PROTOCOL(V2): [127.0.0.1:38368] [NOP]
Mar 28 20:52:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:47.326090 Exec client=127.0.0.1:38368
Mar 28 20:53:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:53:17.325381 [127.0.0.1:38368] state rdy: 1 inflt: 0
Mar 28 20:53:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:53:17.325625 PROTOCOL(V2): [127.0.0.1:38368] [NOP]
Mar 28 20:53:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:53:17.325634 Exec client=127.0.0.1:38368

when got command NOP, conn state is stateSubscribed in nsqd..

slayercat commented 7 years ago

Seems every connection registed self to channel.clients using func (c *Channel) AddClient(clientID int64, client Consumer). even when issue appears.

But when print channel.clients in nsq, the queue is gone.

[root@18-190 ]# netstat -anop|grep c2
tcp        0      0 127.0.0.1:50312         127.0.0.3:4150          ESTABLISHED 12338/c2detector     off (0.00/0/0)
tcp        0      0 127.0.0.1:50314         127.0.0.3:4150          ESTABLISHED 12338/c2detector     off (0.00/0/0)
tcp        0      0 127.0.0.1:50308         127.0.0.3:4150          ESTABLISHED 12338/c2detector     off (0.00/0/0)
tcp        0      0 127.0.0.1:50310         127.0.0.3:4150          ESTABLISHED 12338/c2detector     off (0.00/0/0)

[root@18-190 ]# cat shit |grep 312
CHANNEL(detect#ephemeral): topic d.c2.ip#ephemeral add client 127.0.0.1:50312 (id=37) not exists. done
[root@18-190 ]# cat shit |grep 314
CHANNEL(detect#ephemeral): topic d.c2.domain#ephemeral add client 127.0.0.1:50314 (id=38) not exists. done
[root@18-190 ]# cat shit |grep 308
CHANNEL(detect#ephemeral): topic d.c2.cp#ephemeral add client 127.0.0.1:50308 (id=35) not exists. done
[root@18-190 ]# cat shit |grep 310
CHANNEL(detect#ephemeral): topic d.c2.url#ephemeral add client 127.0.0.1:50310 (id=36) not exists. done

[root@18-190 ptd]# grep 50308 nsq.log  -C 4
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.344281 PROTOCOL(V2): [127.0.0.1:50362] [PUB d.c2.ip#ephemeral]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.344418 PROTOCOL(V2): [127.0.0.1:50362] [PUB d.c2.cp#ephemeral]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.344534 PROTOCOL(V2): [127.0.0.1:50362] [PUB d.c2.cp#ephemeral]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.344651 PROTOCOL(V2): [127.0.0.1:50362] [PUB d.c2.domain#ephemeral]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.458219 [127.0.0.1:50308] state rdy:    1 inflt:    0
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.458315 PROTOCOL(V2): [127.0.0.1:50308] [NOP]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.458333 NOP: client=127.0.0.1:50308(id=%!d(string=18-190)), status=3
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.458344 NOP: topicname=d.c2.url#ephemeral. clients=127.0.0.1:50310

^^^^^^

Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.464984 [127.0.0.1:50310] state rdy:    1 inflt:    0
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.465069 PROTOCOL(V2): [127.0.0.1:50310] [NOP]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.465079 NOP: client=127.0.0.1:50310(id=%!d(string=18-190)), status=3
ploxiln commented 7 years ago

This sounds like a race in creating ephemeral channels - like the channel is cleaned-up before the client is fully connected (because ephemeral channels with no connections are removed). (But I haven't really investigated myself yet.)

slayercat commented 7 years ago

thanks @ploxiln .

I've printed address of Channel.clients, seems it's differ in Channel.AddClient and protocolV2.NOP.

It's really likely as you said.

CHANNEL(detect#ephemeral): topic d.c2.url#ephemeral add client 127.0.0.1:53128 (id=27) not exists. done. clientsaddr=0xc8200aa870 
CHANNEL(detect#ephemeral): topic d.c2.domain#ephemeral add client 127.0.0.1:53130 (id=28) not exists. done. clientsaddr=0xc82019a2d0 
CHANNEL(detect#ephemeral): topic d.c2.cp#ephemeral add client 127.0.0.1:53134 (id=30) not exists. done. clientsaddr=0xc8209c63f0 
CHANNEL(detect#ephemeral): topic d.c2.ip#ephemeral add client 127.0.0.1:53132 (id=29) not exists. done. clientsaddr=0xc8204fb290 
[root@18-190 ptd]# grep '0xc8204fb290' nsq.log|tail -3
Mar 29 10:47:00 18-190 nsqd[27330]: [nsqd] 2017/03/29 10:47:00.101482 NOP: topicname=d.c2.ip#ephemeral. clientsaddr=0xc8204fb290
Mar 29 10:47:00 18-190 nsqd[27330]: [nsqd] 2017/03/29 10:47:00.114756 NOP: topicname=d.c2.ip#ephemeral. clientsaddr=0xc8204fb290
Mar 29 10:47:00 18-190 nsqd[27330]: [nsqd] 2017/03/29 10:47:00.115416 NOP: topicname=d.c2.ip#ephemeral. clientsaddr=0xc8204fb290
[root@18-190 ptd]# grep '0xc8209c63f0' nsq.log|tail -3
[root@18-190 ptd]# grep '0xc82019a2d0' nsq.log|tail -3
[root@18-190 ptd]# grep '0xc8200aa870' nsq.log|tail -3
mreiferson commented 7 years ago

Seems like specifically a race in nsqd when an ephemeral channel's last client disconnects and the cleanup process is triggered and simultaneously a client begins connecting and subscribing.

mreiferson commented 7 years ago

@slayercat thanks for the report — at this point it seems like a bug in nsqd and we should migrate this issue over to that repo.

slayercat commented 7 years ago

thanks, @mreiferson .

any workaround here? like mask the cleanup process?

mreiferson commented 7 years ago

I suspect that the fix in nsqd will be to somehow lock around cleanup so that a new client cannot be added until completion.

slayercat commented 7 years ago

I masked delete callback of channel & topic. and it seems work for me.

// RemoveClient removes a client from the Channel's client list
func (c *Channel) RemoveClient(clientID int64) {
    c.Lock()
    defer c.Unlock()

    _, ok := c.clients[clientID]
    if !ok {
        return
    }   
    delete(c.clients, clientID)

    if len(c.clients) == 0 && c.ephemeral == true {
        //go c.deleter.Do(func() { c.deleteCallback(c) })
        ////////mask for https://github.com/nsqio/go-nsq/issues/206
    }   
}
// DeleteExistingChannel removes a channel from the topic only if it exists
func (t *Topic) DeleteExistingChannel(channelName string) error {
    t.Lock()
    channel, ok := t.channelMap[channelName]
    if !ok {
        t.Unlock()
        return errors.New("channel does not exist")
    }
    delete(t.channelMap, channelName)
    // not defered so that we can continue while the channel async closes
    numChannels := len(t.channelMap)
    t.Unlock()

    t.ctx.nsqd.logf("TOPIC(%s): deleting channel %s", t.name, channel.name)

    // delete empties the channel before closing
    // (so that we dont leave any messages around)
    channel.Delete()

    // update messagePump state
    select {
    case t.channelUpdateChan <- 1:
    case <-t.exitChan:
    }   

    if numChannels == 0 && t.ephemeral == true {
        //go t.deleter.Do(func() { t.deleteCallback(t) })
       ////////mask for https://github.com/nsqio/go-nsq/issues/206
    }   

    return nil 
}
ploxiln commented 7 years ago

Your workaround is equivalent to using a normal non-ephemeral topic and channel. Usually that's what you really want anyway :)

slayercat commented 7 years ago

thanks, @ploxiln

ephemeral channel won't write messages to disk. so there's some kind differ :)

I'll trying to fix the issue when my schedule allows.

thank you again.

mreiferson commented 7 years ago

continue in https://github.com/nsqio/nsq/issues/883 please, thanks!