nsqio / go-nsq

The official Go package for NSQ
MIT License
2.59k stars 444 forks source link

fix(consumer/delegate): requeue when receive resend msg from nsqd. #348

Closed laushunyu closed 2 years ago

laushunyu commented 2 years ago

Do:

curl -d 'okkkk' "http://127.0.0.1:4151/pub?topic=asd"

Test Code:

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "time"
)

func main() {
    c, err := nsq.NewConsumer("asd", "aaa", nsq.NewConfig())
    if err != nil {
        panic(err)
    }

    c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        fmt.Println("get message: ", string(message.Body))
        time.Sleep(time.Hour)
        return nil
    }))

    if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil {
        panic(err)
    }

    select {}
}

For example, msg stuck in handler. When deadline reach "msg_timeout", nsqd will resend it to client immediately. When client conn get the resend msg, will invoke delegate's OnMessage method, then consumer will stcuk in sending msg to full incomingMessages channel. After no response of 2 "heartbeat" msg, nsqd belive client disconnected. Then do:

curl "http://127.0.0.1:4151/stats?format=json&topic=asd&channel=aaa" | jq .

Get resp:

{
  "version": "1.2.0",
  "health": "OK",
  "start_time": 1660103452,
  "topics": [
    {
      "topic_name": "asd",
      "channels": [
        {
          "channel_name": "aaa",
          "depth": 13,
          "backend_depth": 5,
          "in_flight_count": 0,
          "deferred_count": 0,
          "message_count": 13,
          "requeue_count": 6,
          "timeout_count": 52,
          "client_count": 0,
          "clients": [],
          "paused": false,
          "e2e_processing_latency": {
            "count": 0,
            "percentiles": null
          }
        }
      ],
      "depth": 0,
      "backend_depth": 0,
      "message_count": 13,
      "message_bytes": 91,
      "paused": false,
      "e2e_processing_latency": {
        "count": 0,
        "percentiles": null
      }
    }
  ],
  "memory": {
    "heap_objects": 4723,
    "heap_idle_bytes": 63791104,
    "heap_in_use_bytes": 1875968,
    "heap_released_bytes": 63373312,
    "gc_pause_usec_100": 827,
    "gc_pause_usec_99": 256,
    "gc_pause_usec_95": 180,
    "next_gc_bytes": 4194304,
    "gc_total_runs": 1607
  },
  "producers": []
}
mreiferson commented 2 years ago

This is working as intended, if your handlers block we expect back pressure to propagate to nsqd.

laushunyu commented 2 years ago

There is no way to find out why nsq client is 0. I think it is necessary to print something when blocking handler cause client offline. It makes developer know he wrote a bug.