rocketmq-baiji / rocketmq

Mirror of Apache RocketMQ
Apache License 2.0
4 stars 20 forks source link

全局排序的Consumer #2

Open rocketmq-baiji opened 5 years ago

rocketmq-baiji commented 5 years ago

RocketMQ的Topic的分队列存储的,队列的特点如下:

  1. 单个队列内,按照发送时间进行排序;
  2. 队列之间无序。

有些场景,比如股票撮合,需要全局对多个队列进行排序。

可以考虑在客户端实现一个全局排序的OrderedConsumer

class OrderedConsumer{
        //按照发送时间(getBornTimestamp)进行排序
       MessageExt receive();
}

可以利用PullConsumer进行实现:

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

实现时,可以逐步来,先假定MessageQueue的数量是固定。

huangxinkaty commented 5 years ago

对于客户端来说,不能并行消费,如果开了很多个线程去消费数据,则消费端又把顺序打乱了。 数据量大了,客户端消费很慢,mq中一定会堆积很多数据。

9组 青衿

TingyanXiang commented 5 years ago

一般情况下,在单partition(在rocketMQ单queue )中数据是有序的,但是全局有序是很难保证的。即便实现了全局有序性,性能也很差,也就失去了意义。 所以我们应该在业务上避免需要在rocketMQ中进行全局排序的需求。

9组 蹊兮

xjtushilei commented 5 years ago

高可用方面

正常情况下,我们可以通过从一个topic的不同的queue中去取数据,然后来在客户端进行排序。但是一旦broker挂掉了,发生了broker的切换,则这时候我们之前的想法就会乱掉。这种情况应该要考虑到,但是目前还没有比较好的的实现。

所以,还是要避免全局有序,或者系统并不需要很高的可用性和性能。

9组 寻剑

qiwei94 commented 5 years ago

思路一:可以将负载均衡的调度算法做一个修改,不使用类似于RR,DRR这样的算法,而是根据队列对头元素的时间戳大小进行选择,选择最小的进行输出。 思路二:获取当前所有的队列数量n,在client处维持一个大小为n的队列,每次取出每个队列的队头元素,使用插入排序,client消费这个大小为n的队列,直到消耗完毕,重复此过程。 9组 钢羽

GreatDreamer-W commented 5 years ago

思路1

假设只有一个Consumer的情况下,并假定MessageQueue的数量是固定的。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。在拉取过程中,获取到相应的queue的PullResult信息,解析出MessageExt的列表。首先取出该列表的第一个元素,获取到它的bornTimeStamp信息。这个时间戳就是该消息创建时的时间戳,可以根据这个属性判断消息的创建时间。具体实现的算法是:

  1. 获取n个queue的PullResult。进而获得它的List字段。解析出第一个MessageExt元素的bornTimeStamp。
  2. 对比获取到的n条bornTimeStamp信息,选取值最小的消息进行。
  3. 取出被消费的列表中的下一个MessageExt,并获取bornTimeStamp字段。
  4. 将新加入的bornTimeStamp与之前参与比较的n-1条bornTimeStamp再做比较,取出值最小的一条进行消费。
  5. 重复3~4的过程,从刚刚被消费的queue中取出新的条目继续进行比较。知道所有的queue中的消息被取出。

该算法根据每个queue中存放的消息的bornTimeStamp来判断创建时间,并且通过n个queue中消息的时间戳排序得到全局有序后交由Consumer进行消费。

思路2

因为在发送消息的时候,消息发送默认是采用轮询的方式发送到不同的Queue。在一条Queue里面,RocketMQ的确可以保证FIFO的。

可能存在如下情况:如果有一个订单,包括多条消息,这些消息必须顺序消费,这种情况下,使用默认的发送方式无法满足。可以考虑直接在生产端将消息投递到同一个Queue中。

RocketMQ消息生产端示例代码如下:

int orderId = i % 10;
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

SendResult = sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

System.out.println(sendResult);

按照这个示例,把订单号取模再丢到selector中,selector保证同一个模都会投递到同一条Queue中。 即:相同的订单号->有相同的模->有相同的Queue。

这样同一批需要做到顺序消费的肯定会投递到同一个queue,同一个queue肯定会投递到同一个消费实例,同一个消费实例肯定是顺序拉取并顺序提交线程池的,只要保证消费端顺序消费即可。如果消费端是使用MessageListenerOrderly则自带此实现,如果是使用MessageListenerConcurrently,则需要把线程池改为单线程模式。

SerenaYr commented 5 years ago

思路一:一个比较直接的思路是取出每个队列中前k个消息进行排序,取出其中最早的k个消息。考虑到队列中消息是按时间有序的,对于每个队列中第一个消息组成的集合,则全局最早的一个消息一定在这个集合中。同理,对于每个队列中前k个消息组成的集合,则全局最早的k个消息一定在这个集合中。 思路二:思路一中对每个队列进行遍历,一次只能取出一个消息,效率较低。可以用最小堆维持每个队列中第一个消息,将时间最早的消息出列,则所有消息出列的顺序即为全局有序。

freshmanGDUT commented 5 years ago

2 [issue 2] 第六组第三题-闭麦听歌,神码的都对

第三题解题思路:使用两个最小堆,其大小均为32*n,n为队列数量。先在每个队列中选取max_num为32的数据量放入到第一个最小堆,将第二个最小堆作为缓存堆,之后循环读取队列中的数据,写入缓存堆中,当缓存堆写入完毕后,将数据从缓存堆的最顶端写入到第一个最小堆中,每写入一个输出一个,从而实现全局排序。