apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.28k stars 409 forks source link

goroutine leak when calling rocketmq.NewPushConsumer frequently #1147

Open hoverseu opened 1 month ago

hoverseu commented 1 month ago

BUG REPORT

Once, we had a group that was not created, so our app code calling rocketmq.NewPushConsumer returned err. Since the retry mechanism, the method rocketmq.NewPushConsumer was called frequently, then we found that the memory kept growing.

I speculated that there was groutine leak. I use runtime.NumGoroutine() to print the number of groutines, which confirms this.

By investigating the code and adding debug logs, we found that after calling pushConsumer.Shutdown(), one goroutine in pushConsumer.Start and some statistics-related goroutines could not exit in time and were blocked in sleep. The following are some of the blocking points.

push_consumer.go

  1. first goroutine sleep pc.option.ConsumeTimeout, Usually pc.option.ConsumeTimeout will be a longer time.
func (pc *pushConsumer) Start() error {
...
                // first goroutine
        go primitive.WithRecover(func() {
            if pc.consumeOrderly {
                return
            }
                         // 
            time.Sleep(pc.option.ConsumeTimeout)
            pc.cleanExpiredMsg()

            ticker := time.NewTicker(pc.option.ConsumeTimeout)
            defer ticker.Stop()
            for {
                select {
                case <-ticker.C:
                    pc.cleanExpiredMsg()
                case <-pc.done:
                    rlog.Info("push consumer close cleanExpiredMsg listener.", map[string]interface{}{
                        rlog.LogKeyConsumerGroup: pc.consumerGroup,
                    })
                    return
                }
            }
        })
...
}

consumer/statistics.go

  1. The first goroutine will block for 1 minute before returning. Even if sis.closed is closed.
  2. The send goroutine will block for 1 hour before returning. Even if sis.closed is closed.
  3. The third goroutine will block for 1 day before returning. Even if sis.closed is closed.
func (sis *statsItemSet) init() {
    ...
    // first goroutine 
     go primitive.WithRecover(func() {
        time.Sleep(nextMinutesTime().Sub(time.Now()))
        ticker := time.NewTicker(time.Minute)
        defer ticker.Stop()
        for {
            select {
            case <-sis.closed:
                return
            case <-ticker.C:
                sis.printAtMinutes()
            }
        }
    })

     // second goroutine 
    go primitive.WithRecover(func() {
        time.Sleep(nextHourTime().Sub(time.Now()))
        ticker := time.NewTicker(time.Hour)
        defer ticker.Stop()
        for {
            select {
            case <-sis.closed:
                return
            case <-ticker.C:
                sis.printAtHour()
            }
        }
    })

      // third goroutine
    go primitive.WithRecover(func() {
        time.Sleep(nextMonthTime().Sub(time.Now()))
        ticker := time.NewTicker(24 * time.Hour)
        defer ticker.Stop()
        for {
            select {
            case <-sis.closed:
                return
            case <-ticker.C:
                sis.printAtDay()
            }
        }
    })
}

My advice:

time.NewTicker will not be executed immediately, but after the Ticker cycle, so I think the sleep code of these routines is unnecessary.

absolute8511 commented 1 month ago

actually, the nextMonthTime() should be nextDayTime(). However, I think the sleep is not necessary at all