apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
20.89k stars 11.56k forks source link

[Bug] Timed messages cannot be consumed #7641

Open zyhui98 opened 7 months ago

zyhui98 commented 7 months ago

Before Creating the Bug Report

Runtime platform environment

CentOS 7.6

RocketMQ version

5.1.4

JDK Version

1.8.0_66

Describe the Bug

2023-12-12 00:25:46 ERROR TimerEnqueueGetService - Unknown exception in enqueuing java.lang.IllegalArgumentException: null at java.nio.Buffer.limit(Buffer.java:275) at org.apache.rocketmq.store.timer.TimerMessageStore.getMessageByCommitOffset(TimerMessageStore.java:1013) at org.apache.rocketmq.store.timer.TimerMessageStore.enqueue(TimerMessageStore.java:652) at org.apache.rocketmq.store.timer.TimerMessageStore$TimerEnqueueGetService.run(TimerMessageStore.java:1269) at java.lang.Thread.run(Thread.java:745)

Steps to Reproduce

发送消息必现,有两台broke,有一台正常

What Did You Expect to See?

normal consume message

What Did You See Instead?

not stored message

Additional Context

2023-12-12 00:25:46 ERROR TimerEnqueueGetService - Unknown exception in enqueuing java.lang.IllegalArgumentException: null at java.nio.Buffer.limit(Buffer.java:275) at org.apache.rocketmq.store.timer.TimerMessageStore.getMessageByCommitOffset(TimerMessageStore.java:1013) at org.apache.rocketmq.store.timer.TimerMessageStore.enqueue(TimerMessageStore.java:652) at org.apache.rocketmq.store.timer.TimerMessageStore$TimerEnqueueGetService.run(TimerMessageStore.java:1269) at java.lang.Thread.run(Thread.java:745)

GenerousMan commented 7 months ago

两个broker收发普通消息都正常吗?只有定时消息有问题嘛?

zyhui98 commented 7 months ago

是消息太大超过我们的maxMessageSize配置的64kb,轮转不了。

daigoopautoy commented 7 months ago
    public MessageExtEncoder(final int maxMessageBodySize) {
        ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
        //Reserve 64kb for encoding buffer outside body
        int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
            maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
        byteBuf = alloc.directBuffer(maxMessageSize);
        this.maxMessageBodySize = maxMessageBodySize;
        this.maxMessageSize = maxMessageSize;
    }

my config's maxMessageSize=64kb, then maxMessageBodySize=64kb and maxMessageSize=128kb one delay message bodySize=63kb, properties=3kb, it can write in commitLog, because 63<64, 63+3+other len<128

        bufferLocal = new ThreadLocal<ByteBuffer>() {
            @Override
            protected ByteBuffer initialValue() {
                // 64kb+100
                return ByteBuffer.allocateDirect(storeConfig.getMaxMessageSize() + 100);
            }
        };

    private MessageExt getMessageByCommitOffset(long offsetPy, int sizePy) {
        for (int i = 0; i < 3; i++) {
            MessageExt msgExt = null;
            bufferLocal.get().position(0);
            // throw IllegalArgumentException because sizePy>capacity
            bufferLocal.get().limit(sizePy);
            boolean res = messageStore.getData(offsetPy, sizePy, bufferLocal.get());
            if (res) {
                bufferLocal.get().flip();
                msgExt = MessageDecoder.decode(bufferLocal.get(), true, false, false);
            }
            if (null == msgExt) {
                LOGGER.warn("Fail to read msg from commitLog offsetPy:{} sizePy:{}", offsetPy, sizePy);
            } else {
                return msgExt;
            }
        }
        return null;
    }

In org.apache.rocketmq.store.timer.TimerMessageStore#getMessageByCommitOffset , this delay message's sizepy=63+3+other len>bufferLocal.capacity=64kb+100, throw IllegalArgumentException, cause currQueueOffset cannot update.

If my opinion is right, I think it's a bug because a message should cannot be written in commitLog if it cannot be consumed, the message's limit in write and read should not be different.

Wait for your reply, thx. @RongtongJin @GenerousMan