apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.29k stars 3.59k forks source link

[2.8.2] error while create opSendMsg by batch message container - zstd overflow detected #13712

Open e-marchand-exensa opened 2 years ago

e-marchand-exensa commented 2 years ago

Describe the bug Just trying to upgrade from Pulsar 2.6.4 to 2.8.2, running into this exception very quickly on a standalone test (from scratch, so no backlog or anything).

WAR|11/145138.238 o.a.p.c.i.ProducerImpl@-client-io-68-10 [persistent://<tenant>/<namespace>/<topic>] [<producerName>] error while create opSendMsg by batch message container
java.lang.IllegalStateException: Overflow detected
        at org.apache.pulsar.shade.io.airlift.compress.zstd.Util.checkState(Util.java:59)
        at org.apache.pulsar.shade.io.airlift.compress.zstd.BitOutputStream.close(BitOutputStream.java:85)
        at org.apache.pulsar.shade.io.airlift.compress.zstd.HuffmanCompressor.compressSingleStream(HuffmanCompressor.java:130)
        at org.apache.pulsar.shade.io.airlift.compress.zstd.HuffmanCompressor.compress4streams(HuffmanCompressor.java:75)
        at org.apache.pulsar.shade.io.airlift.compress.zstd.ZstdFrameCompressor.encodeLiterals(ZstdFrameCompressor.java:333)
        at org.apache.pulsar.shade.io.airlift.compress.zstd.ZstdFrameCompressor.compressBlock(ZstdFrameCompressor.java:224)
        at org.apache.pulsar.shade.io.airlift.compress.zstd.ZstdFrameCompressor.compressFrame(ZstdFrameCompressor.java:172)
        at org.apache.pulsar.shade.io.airlift.compress.zstd.ZstdFrameCompressor.compress(ZstdFrameCompressor.java:145)
        at org.apache.pulsar.shade.io.airlift.compress.zstd.ZStdRawCompressor.compress(ZStdRawCompressor.java:27)
        at org.apache.pulsar.common.compression.CompressionCodecZstd.encode(CompressionCodecZstd.java:65)
        at org.apache.pulsar.client.impl.BatchMessageContainerImpl.getCompressedBatchMetadataAndPayload(BatchMessageContainerImpl.java:135)
        at org.apache.pulsar.client.impl.BatchMessageContainerImpl.createOpSendMsg(BatchMessageContainerImpl.java:189)
        at org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1796)
        at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$13(ProducerImpl.java:1457)
        at org.apache.pulsar.common.util.Runnables$CatchingAndLoggingRunnable.run(Runnables.java:53)
        at org.apache.pulsar.shade.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
        at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
        at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

and consumer received a erroneous message. I changed nothing to the code and/or the configuration, so I was using batching and ZSTD compression with version 2.6.4.

To Reproduce I'm not sure how to reproduce, here is the configuration for the producer:

builder
      .producerName( producerName )
      .topic( topic )
      .enableBatching( true )
      .batchingMaxMessages( 256 )
      .batchingMaxPublishDelay( 150, TimeUnit.MILLISECONDS )
      .blockIfQueueFull( true )
      .maxPendingMessages( 1024 )
      .maxPendingMessagesAcrossPartitions( 2 * 1024 )
      .sendTimeout( 60000, TimeUnit.MILLISECONDS )
      .compressionType( CompressionType.ZSTD );

The producer may be used by 16 threads concurrently. Individual messages are small. I will try with a different compression if possible.

Expected behavior Should not fail to compress the message.

Desktop (please complete the following information):

e-marchand-exensa commented 2 years ago

I tried all others compression types without problem, at least from my testing environment. Revert back to ZSTD compression and getting error in less than 2 minutes.

e-marchand-exensa commented 2 years ago

@sijie seems OK with v2.7.4 but same issue with v2.9.1

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.

e-marchand-exensa commented 2 years ago

The issue still exists with version 2.10.0. Still no problem with 2.6.4.