apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.31k stars 11.71k forks source link

[Bug] Unable to consume messages orderly for ordered message while using RocketMQ 5 client #7999

Open CodingOX opened 8 months ago

CodingOX commented 8 months ago

Before Creating the Bug Report

Runtime platform environment

Unbuntu 20.04

RocketMQ version

RocketMQ 5.2

JDK Version

JDK 21

Describe the Bug

When attempting to consume messages in order using the RocketMQ 5 client with a 2 master no any slaves, it is observed that the consumer is unable to consume messages sequentially.

Steps to Reproduce

Create FIFO topic

./mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 192.168.10.233:9876 -a +message.type=FIFO

Producer Code, Write in Kotlin


import cn.hutool.core.lang.generator.SnowflakeGenerator
import org.apache.rocketmq.client.apis.ClientConfiguration
import org.apache.rocketmq.client.apis.ClientServiceProvider
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicLong

fun main() {

    val log: Logger = LoggerFactory.getLogger("OCP")
    val provider = ClientServiceProvider.loadService()

    val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081"
    val clientConfiguration = ClientConfiguration.newBuilder()
        .setEndpoints(endpoints)
        .build()

    val topic = "FIFOTopic"
    val tag = "tag-2"

    val producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build()

    val atomicLong = AtomicLong()
    val snowflakeGenerator = SnowflakeGenerator()
    val messageGroup = "OrderedGroup-1"
    try {
        val ts = System.currentTimeMillis()
        // 
        repeat(30) {
            val id = snowflakeGenerator.next()
            val curIndex = atomicLong.incrementAndGet()
            val message = provider
                .newMessageBuilder()
                .setTag(tag)
                .setKeys(id.toString())
                .setTopic(topic)
                .setMessageGroup(messageGroup) // Set MessageGroup For FIFO
                .setBody(("$ts - $curIndex").toByteArray())
                .build()

            val receipt = producer.send(message)

            log.info("Send message: $curIndex - $id - ${receipt.messageId}")
        }
    } catch (ex: Exception) {
        log.error("Send message error", ex)
    }
}

Consumer Code


import cn.hutool.core.thread.ThreadUtil
import org.apache.rocketmq.client.apis.ClientConfiguration
import org.apache.rocketmq.client.apis.ClientServiceProvider
import org.apache.rocketmq.client.apis.consumer.ConsumeResult
import org.apache.rocketmq.client.apis.consumer.FilterExpression
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit

fun main() {
    val log: Logger = LoggerFactory.getLogger("ORC")

    val provider = ClientServiceProvider.loadService()
    val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081"
    val clientConfiguration = ClientConfiguration.newBuilder()
        .setEndpoints(endpoints)
        .build()

    val topic = "FIFOTopic"
    val tag = "tag-2"
    val topicToTag = mapOf(topic to FilterExpression(tag, FilterExpressionType.TAG))

    // 
    val cq = ConcurrentLinkedQueue<String>()

    provider.newPushConsumerBuilder()
        .setClientConfiguration(clientConfiguration)
        .setSubscriptionExpressions(topicToTag)
        .setConsumerGroup("OrderConsumerGroupV3")
        .setMessageListener {
            val msg = StandardCharsets.UTF_8.decode(it!!.body)
            cq.offer(msg.toString())
            ConsumeResult.SUCCESS
        }.build()

    ThreadUtil.sleep(10, TimeUnit.SECONDS)

    log.info(cq.joinToString("\n"))

    ThreadUtil.sleep(1, TimeUnit.HOURS)
}

What Did You Expect to See?

Te messages should be consumed in the order they were sent. However, the consumer is not able to achieve this expected behavior.

What Did You See Instead?

Messages are not consumed in the order in which they were sent?

16:55:02.686 [main] INFO ORC -- 1712134499102 - 1
1712134499102 - 2
1712134499102 - 3
1712134499102 - 4
1712134499102 - 6
1712134499102 - 5
1712134499102 - 7
1712134499102 - 10
1712134499102 - 11
1712134499102 - 12
1712134499102 - 9
1712134499102 - 8
1712134499102 - 13
1712134499102 - 14
1712134499102 - 15
1712134499102 - 16
1712134499102 - 17
1712134499102 - 21
1712134499102 - 18
1712134499102 - 19
1712134499102 - 22
1712134499102 - 23
1712134499102 - 20
1712134499102 - 24
1712134499102 - 25
1712134499102 - 26
1712134499102 - 27
1712134499102 - 28
1712134499102 - 29
1712134499102 - 30

Additional Context

From the dashboard, we can see the message store in the same message queue. image

And From the http api /message/queryMessageByTopicAndKey.query?key=xxx&topic=FIFOTopic, we can get the message details. The message with index 6 arrived at the broker later than the message with index 5. but 6 is consume before than 5.

3424672656 commented 8 months ago

rocketmq currently only supports sequential consumption for a single queue

lizhimins commented 7 months ago

Need to set consumer group as order consumption mode

CodingOX commented 7 months ago

Need to set consumer group as order consumption mode

From the doc : https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage and souce code , I can't find the api for order consumption mode? Can you give me more help?

CodingOX commented 7 months ago

Need to set consumer group as order consumption mode

Need to set consumer group as order consumption mode

You mean that if using RocketMQ version 5 and requiring a first-in-first-out (FIFO) ordering queue, I must configure the topic with only one message queue?

redlsz commented 7 months ago

Configure your consumer group as order with "-o" option.

./mqadmin updateSubGroup -c DefaultCluster -n 192.168.10.233:9876 -g OrderConsumerGroupV3 -o true

CodingOX commented 7 months ago

Configure your consumer group as order with "-o" option.

./mqadmin updateSubGroup -c DefaultCluster -n 192.168.10.233:9876 -g OrderConsumerGroupV3 -o true

This is my original shell that from official document. I have used -o true for the topic creation.

./mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 192.168.10.233:9876 -a +message.type=FIFO

So this cause of the problem can be ruled out.

mtfcd commented 5 months ago

I found the same problem. I use the Rust client. First I thought it was a client bug. But later I use grpcurl to send the same messages and I got the same result. The Rust client surpport batch send. I only found this problem in batch send api. When use batch send. not only the order is wrong, also the keys are wrong, all the messages in the same batch will have the key of the first message of the batch. and in the batch send scenario, the message are put in different queue randomly.