apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.13k stars 11.64k forks source link

[Bug] PopMessageProcessor maybe no release lock #8704

Closed zonghaishang closed 1 day ago

zonghaishang commented 1 week ago

Before Creating the Bug Report

Runtime platform environment

mac os

RocketMQ version

5.3+

JDK Version

jdk 11

Describe the Bug

开启 enablePopMessageThreshold 触发popInflightMessageThreshold 可能会导致锁不释放。

    private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId, boolean isRetry, GetMessageResult getMessageResult,
        PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
        Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
        StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
        String lockKey =
            topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
        boolean isOrder = requestHeader.isOrder();
        long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
            false, lockKey, false);
        CompletableFuture<Long> future = new CompletableFuture<>();
        if (!queueLockManager.tryLock(lockKey)) {
            restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
            future.complete(restNum);
            return future;
        }

        //  这里的逻辑是已经获取锁了,如果这里 return future; 就会导致锁未释放
        //  TODO:
        //  应该把 future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); 提前
        // 或者 放到  .... 下一行

        if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) {
            POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId);
            restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
            future.complete(restNum);
            return future;
        }

        try {
            future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
        //  ....

Steps to Reproduce

代码走读发现

What Did You Expect to See?

代码走读发现

What Did You See Instead?

代码走读发现

Additional Context

代码走读发现

yuz10 commented 1 day ago

https://github.com/apache/rocketmq/issues/8601