streamnative / pulsar-rs

Rust Client library for Apache Pulsar
Other
369 stars 121 forks source link

Negative unackedMessages in consumers #276

Open wexgjduv opened 1 year ago

wexgjduv commented 1 year ago

Hi,

I have a topic with 5 partitions:

persistent://public/default/mytopic
persistent://public/default/mytopic-partition-0
persistent://public/default/mytopic-partition-1
persistent://public/default/mytopic-partition-2
persistent://public/default/mytopic-partition-3
persistent://public/default/mytopic-partition-4
persistent://public/default/mytopic-partition-5

When messages were published in high rate, consumers had negative unackedMessages(i.e. "unackedMessages": -248941).

Below is the topic stats for the topic: pulsarctl topics stats public/default/mytopic-partition-1

{
  "msgRateIn": 141.90138436428614,
  "msgRateOut": 283.80284673707945,
  "msgThroughputIn": 412514.323391346,
  "msgThroughputOut": 825028.8755500212,
  "averageMsgSize": 2907.0493232986946,
  "storageSize": 1970164555,
  "publishers": [
    {
      "producerId": 11,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 13,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 13,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 16,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 17,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 20,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 11,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 11,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 19,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 11,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 14,
      "msgRateIn": 39.9503938482944,
      "msgThroughputIn": 110260.50366249132,
      "averageMsgSize": 2759.9353358364624,
      "metadata": {}
    },
    {
      "producerId": 19,
      "msgRateIn": 43.85042680789757,
      "msgThroughputIn": 129118.14007462219,
      "averageMsgSize": 2944.5127328012163,
      "metadata": {}
    },
    {
      "producerId": 19,
      "msgRateIn": 38.700377870489525,
      "msgThroughputIn": 114501.01798793963,
      "averageMsgSize": 2958.653746770026,
      "metadata": {}
    },
    {
      "producerId": 16,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 22,
      "msgRateIn": 0.016666825998745412,
      "msgThroughputIn": 59.30056690353617,
      "averageMsgSize": 3558,
      "metadata": {}
    },
    {
      "producerId": 18,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 21,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 18,
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "averageMsgSize": 0,
      "metadata": {}
    },
    {
      "producerId": 14,
      "msgRateIn": 9.11675447529518,
      "msgThroughputIn": 27502.49822547849,
      "averageMsgSize": 3016.6983546617917,
      "metadata": {}
    },
    {
      "producerId": 17,
      "msgRateIn": 10.266764536310736,
      "msgThroughputIn": 31072.862873910824,
      "averageMsgSize": 3026.5487012987014,
      "metadata": {}
    }
  ],
  "subscriptions": {
    "log-parser": {
      "blockedSubscriptionOnUnackedMsgs": false,
      "isReplicated": true,
      "msgRateOut": 141.90140847878627,
      "msgThroughputOut": 412514.3945251693,
      "msgRateRedeliver": 0,
      "msgRateExpired": 0,
      "msgBacklog": 0,
      "msgDelayed": 0,
      "unackedMessages": 0,
      "type": "Shared",
      "activeConsumerName": "",
      "consumers": [
        {
          "blockedConsumerOnUnackedMsgs": false,
          "availablePermits": 956,
          "unackedMessages": 0,
          "msgRateOut": 54.933876404142076,
          "msgThroughputOut": 159345.54194075975,
          "msgRateRedeliver": 0,
          "consumerName": "log-parser-log-parser-controller-manager-789446b64-j6xhb",
          "metadata": {
            "hostname": "log-parser-controller-manager-789446b64-j6xhb"
          }
        },
        {
          "blockedConsumerOnUnackedMsgs": false,
          "availablePermits": 512,
          "unackedMessages": 0,
          "msgRateOut": 44.61710952047978,
          "msgThroughputOut": 130170.69202872935,
          "msgRateRedeliver": 0,
          "consumerName": "log-parser-log-parser-controller-manager-789446b64-7r4tv",
          "metadata": {
            "hostname": "log-parser-controller-manager-789446b64-7r4tv"
          }
        },
        {
          "blockedConsumerOnUnackedMsgs": false,
          "availablePermits": 504,
          "unackedMessages": 0,
          "msgRateOut": 42.350422554164396,
          "msgThroughputOut": 122998.16055568015,
          "msgRateRedeliver": 0,
          "consumerName": "log-parser-log-parser-controller-manager-789446b64-5m65w",
          "metadata": {
            "hostname": "log-parser-controller-manager-789446b64-5m65w"
          }
        }
      ]
    },
    "pulsar-elasticsearch-sync-rs": {
      "blockedSubscriptionOnUnackedMsgs": false,
      "isReplicated": false,
      "msgRateOut": 141.90143825829318,
      "msgThroughputOut": 412514.48102485185,
      "msgRateRedeliver": 0,
      "msgRateExpired": 0,
      "msgBacklog": 0,
      "msgDelayed": 0,
      "unackedMessages": -984611,
      "type": "Shared",
      "activeConsumerName": "",
      "consumers": [
        {
          "blockedConsumerOnUnackedMsgs": false,
          "availablePermits": 1715,
          "unackedMessages": -248941,
          "msgRateOut": 32.01698907134322,
          "msgThroughputOut": 93654.8264187044,
          "msgRateRedeliver": 0,
          "consumerName": "consumer-pulsar-elasticsearch-sync-rs",
          "metadata": {}
        },
        {
          "blockedConsumerOnUnackedMsgs": false,
          "availablePermits": 1954,
          "unackedMessages": -241391,
          "msgRateOut": 34.8336856998672,
          "msgThroughputOut": 101149.2898608523,
          "msgRateRedeliver": 0,
          "consumerName": "consumer-pulsar-elasticsearch-sync-rs-1",
          "metadata": {}
        },
        {
          "blockedConsumerOnUnackedMsgs": false,
          "availablePermits": 1501,
          "unackedMessages": -249914,
          "msgRateOut": 40.250408642944585,
          "msgThroughputOut": 116979.63763772078,
          "msgRateRedeliver": 0,
          "consumerName": "consumer-pulsar-elasticsearch-sync-rs-2",
          "metadata": {}
        },
        {
          "blockedConsumerOnUnackedMsgs": false,
          "availablePermits": 1194,
          "unackedMessages": -244365,
          "msgRateOut": 34.80035484413819,
          "msgThroughputOut": 100730.72710757433,
          "msgRateRedeliver": 0,
          "consumerName": "consumer-pulsar-elasticsearch-sync-rs-3",
          "metadata": {}
        }
      ]
    }
  },
  "replication": {},
  "deduplicationStatus": "Disabled"
}

What's the reason that causes negative unackedMessages and how to fix it?

Thanks.