apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.14k stars 3.57k forks source link

[Transactions] Pulsar Admin: peekMessages is returning a wrong message, dealing with non empty list of individuallyDeletedMessages #11002

Closed eolivelli closed 3 years ago

eolivelli commented 3 years ago

Describe the bug I have a topic 'MY_QUEUE' with one subscription 'jms-queue' that ends up in this situation peekMessages against 'jms-queue' is returning 74:37:-1 but the next available message should be 74:62

pulsar-admin topics stats-internal MY_QUEUE
{
  "entriesAddedCounter" : 64,
  "numberOfEntries" : 64,
  "totalSize" : 13634,
  "currentLedgerEntries" : 64,
  "currentLedgerSize" : 13634,
  "lastLedgerCreatedTimestamp" : "2021-06-21T11:14:38.402Z",
  "waitingCursorsCount" : 1,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "74:63",
  "state" : "LedgerOpened",
  "ledgers" : [ ],
  "cursors" : {
    "jms-queue" : {
      "markDeletePosition" : "74:36",
      "readPosition" : "74:64",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 58,
      "cursorLedger" : 75,
      "cursorLedgerLastEntry" : 29,
      "individuallyDeletedMessages" : "[(74:37..74:38],(74:39..74:40],(74:41..74:42],(74:43..74:44],(74:45..74:46],(74:47..74:63]]",
      "lastLedgerSwitchTimestamp" : "2021-06-21T11:14:38.498Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 28,
      "totalNonContiguousDeletedMessagesRange" : 6,
      "subscriptionHavePendingRead" : false,
      "subscriptionHavePendingReplayRead" : false,
      "properties" : { }
    },
    "transaction-buffer-sub" : {
      "markDeletePosition" : "74:-1",
      "readPosition" : "74:0",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 0,
      "cursorLedger" : -1,
      "cursorLedgerLastEntry" : -1,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-06-21T11:14:40.444Z",
      "state" : "Uninitialized",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "subscriptionHavePendingRead" : false,
      "subscriptionHavePendingReplayRead" : false,
      "properties" : { }
    }
  },
  "schemaLedgers" : [ ],
  "compactedLedger" : {
    "ledgerId" : -1,
    "entries" : -1,
    "size" : -1,
    "offloaded" : false,
    "underReplicated" : false
  }
}

After calling "peekMessages" my code creates a Reader at that position, and it writes (correctly) INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/MY_QUEUE][reader-f65efe6431] Successfully getLastMessageId 74:62

There must be some problem in dealing with individuallyDeletedMessages, but I see in ManagedCursorImpl that this case should be handled well.

https://github.com/apache/pulsar/blob/76d3426b72a79ee9f8812fa66e45239830ff1c80/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L647

To Reproduce I do not have an easy way to reproduce this problem. Currently it happens while running the JMS TCK against the Pulsar JMS client using Pulsar 2.8.0 server. I have seen that the system is in this messed situation after running the part of TCK about "transactions". But I am able to reproduce the problem consistently.

Expected behavior peekMessages should return the next message that is to be read by the subscription.

eolivelli commented 3 years ago

The problem is that we have holes in the individuallyDeletedMessages. I have to try to reproduce this problem with a simple reproducer. It may be related to transaction rollback, because I reproduce this situation only after running some tests about transactions

eolivelli commented 3 years ago

I am continuing my investigation and it looks like when I send messages using a transaction the 'commit' operation uses a messageId.

So:

so message 3:1:-1 does not exist, and when a consumer tried to acknowledge the messages the subscriptions ends up in this state in which there are systematically holes and the individuallyDeletedMessages keeps growing and peekMessages is not able to report the correct value.

I am trying to create a reproducer as a test case in current Pulsar master branch

codelipenghui commented 3 years ago

@congbobo184 Could you please help take a look at this issue? Looks like related to the transactions

congbobo184 commented 3 years ago

now, transaction delete marker is an asynchronous operation, if new messages continue produce and ack message continue, there will not produce problem. in some race condition, ack message1 the delete marker in process and ack message2 will not in delete marker process, in this time the producer don't send new message to broker and will not process the ack operation. the sub may not delete the message1 commit marker. So we should start a timer task in broker to check the sub delete transaction marker.

eolivelli commented 3 years ago

thank you @congbobo184 for your clarification

this is the patch that introduced the auto acknowledgement for the delete marker https://github.com/apache/pulsar/pull/8318