apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.14k stars 3.57k forks source link

[Bug] consumer Key_Shared type doesn't work at Java client 2.6.0, however userful at Java client 2.7.1 #21725

Closed yonyong closed 1 month ago

yonyong commented 9 months ago

Search before asking

Version

2.6.0

Minimal reproduce step

Same Code

  1. producer config
    producer = client.newProducer(Schema.BYTES)
                .topic(PulsarProperty.topic)
                .enableBatching(true)
                .sendTimeout(0, TimeUnit.SECONDS)
                .create();
  2. producer send Msg

    while (true) {
            producer.flush();
    
            int key = (int) (Math.random() * 10 % 3) + 1;
            String keyMsg = "KEY-" + key;
            String msg = "got message! key:" + keyMsg + "; seq:" + seq;
            producer.newMessage()
                    .key(keyMsg)
                    .orderingKey(keyMsg.getBytes())
                    .value(msg.getBytes())
                    .send();
            seq ++;
            Thread.sleep(1000);
        }
  3. consumer1 & consumer2 config
    consumer = client.newConsumer()
                .topic(PulsarProperty.topic)
                .subscriptionName("keyshared-subscription1")
                .subscriptionType(SubscriptionType.Key_Shared)
                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                .consumerName("keyshared-consumer-1")
                .subscribe();
    consumer = client.newConsumer()
                .topic(PulsarProperty.topic)
                .subscriptionName("keyshared-subscription1")
                .subscriptionType(SubscriptionType.Key_Shared)
                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                .consumerName("keyshared-consumer-2")
                .subscribe();
  4. consumer receive Msg

    while (true) {
            Message msg = consumer.receive();
            try {
                // Do something with the message
                System.out.println("---------------------------------------------------------------------------");
                System.out.println("Message received: " + new String(msg.getData()));
                System.out.println(msg.getKey());
                System.out.println(String.format("Key:%s,order key:%s", msg.getKey(),msg.hasOrderingKey()));
                System.out.println("---------------------------------------------------------------------------");
    
                // Acknowledge the message so that it can be deleted by the message broker
                consumer.acknowledge(msg);
            } catch (Exception e) {
                // Message failed to process, redeliver later
                consumer.negativeAcknowledge(msg);
            }
        }

    Use Version 2.6.0

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.6.0</version>
        </dependency>

Use Version 2.7.1

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.7.1</version>
        </dependency>

What did you expect to see?

Two consumers as in the key_shared pattern the same key is consumed by the corresponding consumer.

What did you see instead?

Result:

When using version 2.6.0, when two consumers are enabled, all messages will only be consumed by one consumer (A), and the another consumer (B) will never receive any message unless consumer A is disconnected, which seems to be a fail-over mode but not Key_Shared. When we upgraded to 2.7.1, only the same key is consumed by the corresponding consumer as in the key_shared mode, as expected.

Analysis:

[
    {
        "msgRateOut": 1.5664867649048742,
        "msgThroughputOut": 242.91165105279143,
        "bytesOutCounter": 11361,
        "msgOutCounter": 73,
        "msgRateRedeliver": 0,
        "chunkedMessageRate": 0,
        "consumerName": "keyshared-consumer-2",
        "availablePermits": 927,
        "unackedMessages": 0,
        "avgMessagesPerEntry": 6,
        "blockedConsumerOnUnackedMsgs": false,
        "readPositionWhenJoining": "404032:3949",
        "lastAckedTimestamp": 1702548043440,
        "lastConsumedTimestamp": 1702548043339,
        "keyHashRanges": [
            "[32769, 65536]"
        ],
        "metadata": {},
        "address": "/192.168.75.44:60187",
        "clientVersion": "2.6.0",
        "connectedSince": "2023-12-14T17:59:51.518+08:00"
    },
    {
        "msgRateOut": 0,
        "msgThroughputOut": 0,
        "bytesOutCounter": 0,
        "msgOutCounter": 0,
        "msgRateRedeliver": 0,
        "chunkedMessageRate": 0,
        "consumerName": "keyshared-consumer-1",
        "availablePermits": 1000,
        "unackedMessages": 0,
        "avgMessagesPerEntry": 1000,
        "blockedConsumerOnUnackedMsgs": false,
        "readPositionWhenJoining": "404032:3977",
        "lastAckedTimestamp": 0,
        "lastConsumedTimestamp": 0,
        "keyHashRanges": [
            "[0, 32768]"
        ],
        "metadata": {},
        "address": "/192.168.75.44:60195",
        "clientVersion": "2.6.0",
        "connectedSince": "2023-12-14T17:59:57.268+08:00"
    }
]

broker shows that both versions of hashRange are same, so we can conclude that there is a difference in how the producer logic is handled between 2.6.0 and 2.7.1 that is causing the problem.

Question

If my judgement is correct, please deal with this issue, why 2.6.0 can't be installed to work in normal key_shared mode?

Anything else?

2.6.0 console log image

2.7.1 console log image

Are you willing to submit a PR?

chenhongSZ commented 9 months ago

I also faced this problem. I guess the root cause should be that #7416 , is not compatible with the before version.
If we cannot getKey in batch metadata, we need to read the key from the first single-message-metadata entry

@codelipenghui

Technoboy- commented 8 months ago

we have fixed some issues in key_shared sub. and we only update version >= 2.9 . so suggest to upgrade.

lhotari commented 1 month ago

Closing this issue since Pulsar 2.6 isn't maintained. Please upgrade to a supported version of Pulsar broker and client.