apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.23k stars 3.58k forks source link

[Bug] Cannot compact topic containing compressed messages with properties. #23299

Open michalcukierman opened 1 month ago

michalcukierman commented 1 month ago

Search before asking

Read release policy

Version

3.3.1

Minimal reproduce step

I was not able to create a reproducible example yet. The situation happened on one of our test environments, on:

The error Message:

Screenshot 2024-09-12 at 17 46 32

Queries showing activity of __subscription consumers and failed compaction

Screenshot 2024-09-12 at 17 47 20

Constant backlog of the messages:

Screenshot 2024-09-12 at 17 48 36

Stacktrace comes from the recent update of [RawBatchConverter.java](https://github.com/apache/pulsar/pull/21917/files#diff-91cd32cbdf605810629e9890fb5dd1ee8d7bbee85038aaeb7919ce52a3649f58)

What did you expect to see?

The topics are compacted

What did you see instead?

Compaction on every topic fails, what causes the server to try to compact again.

Anything else?

No response

Are you willing to submit a PR?

michalcukierman commented 1 month ago

Cc @thetumbled @Technoboy- as contributors of #21917

michalcukierman commented 1 month ago

Looks like, the #21917 was not the cause, as the exception comes from other place:

        } else if (filter.test(singleMessageMetadata.getPartitionKey(), id)
                           && singleMessagePayload.readableBytes() > 0) {
                    messagesRetained++;
                    Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
                                                                      singleMessagePayload, batchBuffer);
                } else {
michalcukierman commented 1 month ago

Added Draft PR to reproduce the issue with a code:

    @Test
    public void testCompactionOfCompressedMessagesWithProperties() throws Exception {
        String topicName = BrokerTestUtil.newUniqueName(
            "persistent://my-property/use/my-ns/testCompactionOfCompressedMessagesWithProperties");

        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
            .compressionType(CompressionType.LZ4)
            .topic(topicName)
            .batchingMaxMessages(2)
            .create();

        producer.newMessage().key("K1").value("V1").sendAsync();
        producer.newMessage().key("K2").value("V2").properties(Map.of("p1", new String(new byte[100]))).sendAsync();

        admin.topics().triggerCompaction(topicName);
        Awaitility.await().until(() -> admin.topics().compactionStatus(topicName).status == Status.SUCCESS);
    }
michalcukierman commented 4 weeks ago

Attaching the failing test: https://github.com/apache/pulsar/compare/master...michalcukierman:pulsar:pulsar-issue-23299