apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.14k stars 11.65k forks source link

How to judge whether a topic is orderly or unordered? #5420

Open PEIYANGXINQU opened 1 year ago

PEIYANGXINQU commented 1 year ago

How to judge whether a topic is orderly or unordered? Can I view it from the RocketMQ-Console? I did not find the relevant documentation.

mxsm commented 1 year ago

Hi @PEIYANGXINQU There is no such thing as sequential or non-sequential topics, we use MessageQueueSelector to select queue that message to send. if message in the same queue, we can consume message in order. doc for 4.x: https://rocketmq.apache.org/docs/4.x/producer/06message2 doc for 5.0: https://rocketmq.apache.org/docs/featureBehavior/03fifomessage

PEIYANGXINQU commented 1 year ago

收到,谢谢!

kaori-seasons commented 1 year ago

Hello @PEIYANGXINQU

Rocketm's topic itself does not have the ability to produce and consume sequentially and currently only supports partial ordering

If you want to ensure local order, it is recommended to refer to the following example:

Producer.java

@RestController
public class Product {
    private static List<ProductOrder> orderList = null;
    private static String producerGroup = "test_producer";
    /**
     * Simulation data
     */
    static {
        orderList = new ArrayList<>();
        orderList.add(new ProductOrder("XXX001", "Order Creation"));
        orderList.add(new ProductOrder("XXX001", "Order payment"));
        orderList.add(new ProductOrder("XXX001", "Order Completed"));
        orderList.add(new ProductOrder("XXX002", "Order Creation"));
        orderList.add(new ProductOrder("XXX002", "Order payment"));
        orderList.add(new ProductOrder("XXX002", "Order Completed"));
        orderList.add(new ProductOrder("XXX003", "Order Creation"));
        orderList.add(new ProductOrder("XXX003", "Order payment"));
        orderList.add(new ProductOrder("XXX003", "Order Completed"));
    }

    @GetMapping("message")
    public void sendMessage() throws Exception {
        // sample producer
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        //Do not open the vip channel, open the port port will be reduced by 2
        producer.setVipChannelEnabled(false);
        //Bind name server
        producer.setNamesrvAddr("IP:9876");
        producer.start();
        for (ProductOrder order : orderList) {
            //1. Generate message
            Message message = new Message(JmsConfig.TOPIC, "", order.getOrderId(), order.toString().getBytes());
            //2. Sending a message is to select the corresponding queue for each message
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    //3. The value of arg is actually the orderId passed in below
                    String orderid = (String) arg;
                    //4. Because the order is of String type, it is converted to int type through hashCode
                    int hashCode = orderid.hashCode();
                    //5, because the hashCode may be negative, so take the absolute value
                    hashCode = Math.abs(hashCode);
                    //6. Ensure that the same order number must be allocated on the same queue
                    long index = hashCode % mqs.size();
                    return mqs.get((int) index);
                }
            }, order.getOrderId(),50000);

            System.out.printf("Product: send status=%s, storage queue=%s, orderid=%s, type=%s\n", sendResult.getSendStatus(),
                                      sendResult.getMessageQueue().getQueueId(), order.getOrderId(), order.getType());
        }
        producer.shutdown();
    }
}

Since rocketmq uses a segmented lock, it does not lock the entire Broker but a single Queue in the lock, because as long as a single Queue is locked, local sequential consumption can be guaranteed. Therefore, if it is in a concurrent consumption scenario, it is not recommended to use MessageListenerConcurrently, but to use MessageListenerOrderly. Here is an example of a consumer:

@Slf4j
@Component
public class Consumer {

    /**
     * Consumer entity object
     */
    private DefaultMQPushConsumer consumer;
    /**
     * Consumer group
     */
    public static final String CONSUMER_GROUP = "consumer_group";
    /**
     * instantiate the object through the constructor
     */
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr("IP:9876");
        //It is not recommended to enable vipChannel. Once enabled, when the rocketmq consumer starts to rebalance, the normal port (9876) will carry all the messages and transfer them to the vip port. But the consumer will not use the vip port for pulling messages.
        consumer.setVipChannelEnabled(false);
        //Subscribe information under topics and tags (* represents all tags)
        consumer.subscribe(JmsConfig.TOPIC, "*");
            //Register the monitoring of consumption. Note that the sequential consumption is MessageListenerOrderly and the concurrent is ConsumeConcurrentlyContext
        consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            // get message
            MessageExt msg = msgs.get(0);
            //The consumer gets the message here, only the output is done, and the subsequent logic processing is not performed
            log.info("Consumer-Thread Name={}, Message={}", Thread.currentThread().getName(), new String(msg.getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        });
        consumer.start();
    }
}
kaori-seasons commented 1 year ago

It looks like the documentation is not good enough I need to add to this

PEIYANGXINQU commented 1 year ago

Hi @PEIYANGXINQU There is no such thing as sequential or non-sequential topics, we use MessageQueueSelector to select queue that message to send. if message in the same queue, we can consume message in order. doc for 4.x: https://rocketmq.apache.org/docs/4.x/producer/06message2 doc for 5.0: https://rocketmq.apache.org/docs/featureBehavior/03fifomessage

@mxsm @complone ,from the document it says:create Topic by specifying the -o parameter (--order) to be true, which represents ordered messages,some one have created the topic in test environment,now I want to create them in the prod environment.I just want to refer to these practices,but it seems that I don't know how to start because the rocketmq console do not show order=true property.

PEIYANGXINQU commented 1 year ago

.\mqadmin updateTopic Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0 Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 usage: mqadmin updateTopic [-b ] [-c ] [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ] -b,--brokerAddr create topic to which broker -c,--clusterName create topic to which cluster -h,--help Print help -n,--namesrvAddr Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876

### - -o,--order set topic's order(true|false)****

-p,--perm set topic's permission(2|4|6), intro[2:W 4:R; 6:RW] -r,--readQueueNums set read queue nums -s,--hasUnitSub has unit sub (true|false) -t,--topic topic name -u,--unit is unit topic (true|false) -w,--writeQueueNums set write queue nums

caigy commented 1 year ago

Currently order field does not take effect. 'Orderly' is the behavior in message producing, storage and consuming.

github-actions[bot] commented 11 months ago

This issue is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs.

PEIYANGXINQU commented 11 months ago

收到,谢谢!