apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.31k stars 423 forks source link

Orderly message does not work #946

Closed yangjian102621 closed 2 years ago

yangjian102621 commented 2 years ago

BUG REPORT
When i send some orderly message with producer, there are not in ordered when the consumer pull the message.

ENV

Producer code

p, err := rocketmq.NewProducer(
        producer.WithNameServer(config.NameSrvAddr),
        producer.WithGroupName(config.ProducerGroup),
        producer.WithRetry(2),
    )
    if err != nil {
        panic(err)
    }

    err = p.Start()
    if err != nil {
        panic(err)
    }

    for i := 0; i < 3; i++ {
        orderId := strconv.Itoa(i)
        for j := 0; j < 5; j++ {
            msg := &primitive.Message{
                Topic: config.Topic,
                Body:  []byte("Ordered Message Step -> " + strconv.Itoa(j)),
            }
            msg.WithShardingKey(orderId)
            res, err := p.SendSync(context.Background(), msg)
            if err != nil {
                fmt.Errorf("send message success: result=%s\n", res.String())
                                continue
            }

            fmt.Printf("send message success: result=%s\n", res.String())
        }
    }

    // close producer
    err = p.Shutdown()
    if err != nil {
        fmt.Printf("shutdown producer error: %s\n", err.Error())
    }

Consumer code

c, err := rocketmq.NewPushConsumer(
        consumer.WithGroupName(config.ConsumerGroup),
        consumer.WithNameServer(config.NameSrvAddr),
        consumer.WithConsumerModel(consumer.Clustering),
        consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
        consumer.WithConsumerOrder(true),
    )
    if err != nil {
        panic(err)
    }

    err = c.Subscribe(config.Topic, consumer.MessageSelector{}, func(ctx context.Context,
        msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        for i := range msgs {
            logrus.Infof("Receive message: %s, orderId: %s", msgs[i].Body, msgs[i].GetShardingKey())
        }
        return consumer.ConsumeSuccess, nil
    })
    if err != nil {
        panic(err)
    }

    // Note: start after subscribe
    err = c.Start()
    if err != nil {
        panic(err)
    }

    logrus.Info("Consumer Started.")

    stop := make(chan os.Signal)
    signal.Notify(stop, os.Interrupt, os.Kill)
    select {
    case <-stop:
        err = c.Shutdown()
        if err != nil {
            fmt.Printf("shutdown Consumer error: %s", err.Error())
        }
    }

I expected the steps of the message for the same order to be in order, but it's not actually

INFO[0003] Receive message: Ordered Message Step -> 2, orderId: 0 
INFO[0003] Receive message: Ordered Message Step -> 1, orderId: 1 
INFO[0003] Receive message: Ordered Message Step -> 0, orderId: 2 
INFO[0003] Receive message: Ordered Message Step -> 4, orderId: 2 
INFO[0003] Receive message: Ordered Message Step -> 1, orderId: 0 
INFO[0003] Receive message: Ordered Message Step -> 0, orderId: 1 
INFO[0003] Receive message: Ordered Message Step -> 4, orderId: 1 
INFO[0003] Receive message: Ordered Message Step -> 3, orderId: 2 
INFO[0003] Receive message: Ordered Message Step -> 3, orderId: 0 
INFO[0003] Receive message: Ordered Message Step -> 2, orderId: 1 
INFO[0003] Receive message: Ordered Message Step -> 1, orderId: 2 
INFO[0003] Receive message: Ordered Message Step -> 0, orderId: 0 
INFO[0003] Receive message: Ordered Message Step -> 4, orderId: 0 
INFO[0003] Receive message: Ordered Message Step -> 3, orderId: 1 
INFO[0003] Receive message: Ordered Message Step -> 2, orderId: 2 

Some thing else is that i send orderly message with java SDK work OK, the go client consumer could receive orderly message.

Java SDK producer

ublic class OrderProducer {
    public static void main(String[] args) throws Exception
    {
        DefaultMQProducer producer = new DefaultMQProducer(Config.PRODUCER_GROUP_NAME);
        producer.setNamesrvAddr(Config.NAME_SRV_ADDR);
        producer.start();

        for (int i = 1; i < 3; i++) {
            int orderId = i;
            for (int j = 1; j <= 5;j++){
                Message msg = new Message(
                        Config.TOPIC,
                        "Order_"+orderId,
                        "KEY" + orderId,
                        ("Order Step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }
        }

        // close producer
        producer.shutdown();
    }
}

The golang consumer client received ordered messages:

INFO[0000] Consumer Started.                            
INFO[0063] Receive message: Order Step 1, orderId: KEY2 
INFO[0063] Receive message: Order Step 2, orderId: KEY2 
INFO[0063] Receive message: Order Step 1, orderId: KEY4 
INFO[0063] Receive message: Order Step 2, orderId: KEY4 
INFO[0063] Receive message: Order Step 3, orderId: KEY4 
INFO[0063] Receive message: Order Step 4, orderId: KEY4 
INFO[0063] Receive message: Order Step 5, orderId: KEY4 
INFO[0063] Receive message: Order Step 3, orderId: KEY2 
INFO[0063] Receive message: Order Step 4, orderId: KEY2 
INFO[0063] Receive message: Order Step 5, orderId: KEY2 
INFO[0063] Receive message: Order Step 1, orderId: KEY1 
INFO[0063] Receive message: Order Step 2, orderId: KEY1 
INFO[0063] Receive message: Order Step 3, orderId: KEY1 
INFO[0063] Receive message: Order Step 1, orderId: KEY3 
INFO[0063] Receive message: Order Step 2, orderId: KEY3 
INFO[0063] Receive message: Order Step 3, orderId: KEY3 
INFO[0063] Receive message: Order Step 4, orderId: KEY3 
INFO[0063] Receive message: Order Step 5, orderId: KEY3 
INFO[0063] Receive message: Order Step 4, orderId: KEY1 
INFO[0063] Receive message: Order Step 5, orderId: KEY1 
INFO[0063] Receive message: Order Step 1, orderId: KEY5 
INFO[0063] Receive message: Order Step 2, orderId: KEY5 
INFO[0063] Receive message: Order Step 3, orderId: KEY5 
INFO[0063] Receive message: Order Step 4, orderId: KEY5 
INFO[0063] Receive message: Order Step 5, orderId: KEY5 

I do not know what the matter is , i need help, thanks.

yangjian102621 commented 2 years ago

I had find the reason. i should have specify a QueueSelector for the producer.

type MyQueueSelector struct{}

func (q MyQueueSelector) Select(msg *primitive.Message, mqs []*primitive.MessageQueue) *primitive.MessageQueue {
    orderId, err := strconv.Atoi(msg.GetShardingKey())
    if err == nil {
        return mqs[orderId%len(mqs)]
    } else {
        return mqs[0]
    }
}

// then create producer with specified queue selector
p, err := rocketmq.NewProducer(
        producer.WithNameServer(config.NameSrvAddr),
        producer.WithGroupName(config.ProducerGroup),
        producer.WithRetry(2),
        producer.WithQueueSelector(MyQueueSelector{}),
    )

Then it worked perfect.

INFO[0063] Receive message: Ordered Message Step -> 0, orderId: 0 
INFO[0063] Receive message: Ordered Message Step -> 1, orderId: 0 
INFO[0063] Receive message: Ordered Message Step -> 2, orderId: 0 
INFO[0063] Receive message: Ordered Message Step -> 3, orderId: 0 
INFO[0063] Receive message: Ordered Message Step -> 4, orderId: 0 
INFO[0063] Receive message: Ordered Message Step -> 0, orderId: 1 
INFO[0063] Receive message: Ordered Message Step -> 1, orderId: 1 
INFO[0063] Receive message: Ordered Message Step -> 2, orderId: 1 
INFO[0063] Receive message: Ordered Message Step -> 3, orderId: 1 
INFO[0063] Receive message: Ordered Message Step -> 4, orderId: 1 
INFO[0063] Receive message: Ordered Message Step -> 0, orderId: 2 
INFO[0063] Receive message: Ordered Message Step -> 1, orderId: 2 
INFO[0063] Receive message: Ordered Message Step -> 2, orderId: 2 
INFO[0063] Receive message: Ordered Message Step -> 3, orderId: 2 
INFO[0063] Receive message: Ordered Message Step -> 4, orderId: 2

And i found it does not had a example for orderly message Producer, considering add one?

Victor1319 commented 2 years ago

can use NewHashQueueSelector as your selector

yangjian102621 commented 2 years ago

@Victor1319 got it, thanks.