apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.14k stars 3.57k forks source link

New KeyShared consumers will not get any messages until a consumer that did get messages disconnects or acks/nacks some messages #15705

Closed zbentley closed 1 year ago

zbentley commented 2 years ago

Describe the bug

If a KeyShared consumer has any unacknowledged messages, new KeyShared consumers on the same subscription after that point will not get any new messages (even messages with brand new keys) until the original consumer either disconnects or acks/nacks some indeterminate number of messages.

This is a really bad bug!

Bug scenario 1

  1. Create a non partitioned topic.
  2. Create a KeyShared subscription on that topic.
  3. Produce some number of messages on the topic with a given key, say key1, using a KeyBased batching strategy.
  4. Start a consumer on the topic with the below code and ensure it prints Press a key to acknowledge messages.
  5. Start a second consumer on the same topic; it may or may not print Press a key to acknowledge messages depending on whether key1 is assigned to it (but it will never print that if this bug is in effect).
  6. Produce some number (10 should be sufficient) of messages on the topic with unique keys not equal to key1; say key2, key3, and so on. The goal here is to get a key that hashes to the second consumer's range.
  7. Observe that the second consumer never gets a message.
  8. In the first consumer's terminal, press enter.
  9. As the first consumer acks messages, observe that only then does the second consumer get any messages.

Bug scenario 2

  1. Create a non partitioned topic.
  2. Create a KeyShared subscription on that topic.
  3. Produce 100 messages on that topic, each with a distinct partition key (e.g. key1, key2 through key100).
  4. Start a consumer on the topic with the below code and ensure it prints Press a key to acknowledge messages.
  5. Start a second consumer on the topic with the below code.
  6. Observe that the second consumer does not receive any messages (i.e. it does not print Press a key to acknowledge messages), even though hash range redistribution should have allocated at least some of the 100 keys to the new consumer.
  7. In the first consumer's terminal, press enter.
  8. As the first consumer acks messages, observe that only then does the second consumer get any messages.

Consumer code

import time
from pulsar import Client, ConsumerType, Timeout
import os

TOPIC = 'THETOPIC'
SUBSCRIPTION = 'MYSUBSCRIPTION'

def recv(sub):
    while True:
        try:
            msg = sub.receive(100)
            print("Got message", msg.data())
            return msg
        except Timeout:
            pass

def main():
    client = Client(service_url='pulsar://localhost:6650')
    sub = client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        consumer_type=ConsumerType.KeyShared,
        receiver_queue_size=2,
        consumer_name=f'testconsumer-{os.getpid()}'
    )
    msg = recv(sub)
    input("Press a key to acknowledge messages")
    while True:
        sub.acknowledge(msg)
        msg = recv(sub)

if __name__ == '__main__':
    main()

Expected behavior In scenario 1, the second consumer should receive at least some messages in step 7. In scenario 2, the second consumer should receive messages as soon as it starts in step 5.

In short, I think hash range redistribution is not working right, or is not triggering message re-routing: when new KeyShared consumers arrive, two things should happen:

  1. New consumers should be allocated part of the hash range of their subscription.
  2. Any backlog messages for keys in that range should be sent to the new consumer.

In scenario 1, neither part is working. In scenario 2, part 1 is working, but I think part 2 is not.

Environment:

Same environment as https://github.com/apache/pulsar-client-python/issues/190

codelipenghui commented 2 years ago

@zbentley

If a KeyShared consumer has any unacknowledged messages, new KeyShared consumers on the same subscription after that point will not get any new messages (even messages with brand new keys) until the original consumer either disconnects or acks/nacks some indeterminate number of messages.

It's expected behavior because the old consumer has unacked messages, the new messages after the unacked messages might break the message dispatch order by the key.

Here is more context about the key-shared subscription ordering guarantee

https://github.com/apache/pulsar/issues/6554 https://github.com/apache/pulsar/pull/7106

zbentley commented 2 years ago

@codelipenghui if I'm reading that correctly, that's really concerning behavior.

If a topic has no consumers, and a backlog of message index:key pairs 0:a, 1:a, 2:a, 3:b, 4:b, 5:b, and a KeyShared consumer c1 joins with a receiver queue size of 1 and gets message 0, why would we prevent a new consumer c2 from joining and getting messages 3-5? That doesn't compromise key ordering in any way.

Am I interpreting it correctly that: a new key shared consumer that connects to the topic when the newest message has position X will not receive any messages until the oldest unacked message in the subscription is newer than or equal to X?

If that's the current behavior, it should be really prominently documented (potentially in a warning/highlighted way).

codelipenghui commented 2 years ago

If a topic has no consumers, and a backlog of message index:key pairs 0:a, 1:a, 2:a, 3:b, 4:b, 5:b, and a KeyShared consumer c1 joins with a receiver queue size of 1 and gets message 0, why would we prevent a new consumer c2 from joining and getting messages 3-5? That doesn't compromise key ordering in any way.

Yes, it will not break the key shared semantics, but it's an implementation tradeoff, the current implementation doesn't need to maintain the state for each key since a topic might have a huge number of keys.

The behavior you described is expected for the current implementation(maybe not the best solution for now).

Am I interpreting it correctly that: a new key shared consumer that connects to the topic when the newest message has position X will not receive any messages until the oldest unacked message in the subscription is newer than or equal to X?

Yes, correct.

If that's the current behavior, it should be really prominently documented (potentially in a warning/highlighted way).

Yes, make sense, we will add the document.

Anonymitaet commented 2 years ago

Hi @momo-jun can you help add that note? Thanks

zbentley commented 2 years ago

@codelipenghui thank you for the explanation; that makes sense.

Two clarifications:

  1. How does this apply to nacked messages? if a new KeyShared consumer c2 is blocked due to markDeletePosition not being caught up to the point where c2 joined, if an existing consumer c1 negatively acknowledges a message that hashes to c2, will the nacked message go to c2 or c1?
  2. Address this limitation, do you really need to track state for every key in the topic? I may be naïve here, but it seems to me that you would only need to track state for each key which has messages that have been dispatched to a consumer. That's still an O(N) state where there's currently not one, but it's a much smaller N. This might be getting into feature request territory now though.
merlimat commented 2 years ago

@codelipenghui thank you for the explanation; that makes sense.

Two clarifications:

  1. How does this apply to nacked messages? if a new KeyShared consumer c2 is blocked due to markDeletePosition not being caught up to the point where c2 joined, if an existing consumer c1 negatively acknowledges a message that hashes to c2, will the nacked message go to c2 or c1?

C2 will joined and be marked that it can only receive messages dispatched from the moment it joins.

In this example, a message nacked by c1, will still get redelivered to c1 (unless c1 disconnects), because the keys are not switched until everything that c1 has already received is acked.

Otherwise, we could get a nack on one message and then on another and they could end up being out of order, eg: if c2 also goes away.

  1. Address this limitation, do you really need to track state for every key in the topic? I may be naïve here, but it seems to me that you would only need to track state for each key which has messages that have been dispatched to a consumer. That's still an O(N) state where there's currently not one, but it's a much smaller N. This might be getting into feature request territory now though.

"each key which has messages that have been dispatched to a consumer." ... for which the worst case scenario is to track every key :)

xuesongxs commented 2 years ago

erwise, we could get a nack on one message and then on another and they could end up being out of order, eg: if c2 al

Your broker.conf file changed to: subscriptionKeySharedUseConsistentHashing=true

zbentley commented 2 years ago

@merlimat that's not accurate; if I start a KeyShared consumer c1 per the test setup here, wait until it receives its first message, then start another consumer c2, then trigger c1 to nack all of its messages, then c2 starts getting (some, not all, hash-based) messages.

Is that not supposed to happen? The docs seem to indicate that this is expected:

Be aware that negative acknowledgments on ordered subscription types, such as Exclusive, Failover and Key_Shared, might cause failed messages being sent to consumers out of the original order.

Additionally, how does allowOutOfOrderDelivery work with the Python/C++ client? Is that on by default? Off by default?

Lastly, how does the setting that @xuesongxs mentioned (subscriptionKeySharedUseConsistentHashing) affect this behavior? I thought that setting only affected what keys new consumers assume ownership from when they arrive; does it also affect how those consumers get messages (nacked or backlogged) that were already in the topic when they joined?

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

kuskmen commented 1 year ago

Can outOfOrderDelivery mitigate issues with consumers getting stuck, because obviously we don't care about ordering at this case?