lihongjie0209 / myblog

4 stars 0 forks source link

RocketMQ: DefaultMQProduce #121

Open lihongjie0209 opened 4 years ago

lihongjie0209 commented 4 years ago

生产者发送消息默认使用的是 DefaultMQProducer 类,下面结合实际代码来详细解释,如代码清单3-8所示。

代码清单3-8 DefaultMQProduce 示例

public class ProducerQuickStart {
    public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setInstanceName("instance1");
    producer.setRetryTimesWhenSendFailed(3);
producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
        Producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                Producer.send(msg, new SendCallback() {
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%s%n", sendResult);
                        sendResult.getSendStatus();
                    }
                    public void onException(Throwable e) {
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

发送消息要经过五个步骤:

1)设置 Producer 的 GroupName。

2)设置 InstanceName,当一个 Jvm 需要启动多个 Producer 的时候,通过设置不同的 InstanceName 来区分,不设置的话系统使用默认名称“DEFAULT”。

3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次。

4)设置 NameServer 地址。

5)组装消息并发送。

消息的发送有同步和异步两种方式,上面的代码使用的是异步方式。在第2章的例子中用的是同步方式。消息发送的返回状态有如下四种:FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE、SEND_OK,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的。

写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理逻辑。

lihongjie0209 commented 4 years ago

发送延迟消息

RocketMQ 支持发送延迟消息,Broker 收到这类消息后,延迟一段时间再处理,使消息在规定的一段时间后生效。

延迟消息的使用方法是在创建 Message 对象时,调用 setDelayTimeLevel(int level)方法设置延迟时间,然后再把这个消息发送出去。目前延迟的时间不支持任意设置,仅支持预设值的时间长度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。比如 setDelayTimeLevel(3)表示延迟 10s。

lihongjie0209 commented 4 years ago

自定义消息发送规则

一个 Topic 会有多个 Message Queue,如果使用 Producer 的默认配置,这个 Producer 会轮流向各个 Message Queue 发送消息。Consumer 在消费消息的时候,会根据负载均衡策略,消费被分配到的 Message Queue,如果不经过特定的设置,某条消息被发往哪个 Message Queue,被哪个 Consumer 消费是未知的。

如果业务需要我们把消息发送到指定的 Message Queue 里,比如把同一类型的消息都发往相同的 Message Queue,该怎么办呢?可以用 Message-QueueSelector,如代码清单3-9所示。

代码清单3-9 MessageQueueSelector 示例

public class OrderMessageQueueSelector implements MessageQueueSelector {
     public MessageQueue select(List<MessageQueue> mqs, Message msg,                                Object orderKey) {
    int id = Integer.parseInt(orderKey.toString());
    int idMainIndex  = id/100;
    int size = mqs.size();
    int index = idMainIndex%size;
    return mqs.get(index);
  }
}

发送消息的时候,把 MessageQueueSelector 的对象作为参数,使用 public SendResult send(Message msg,MessageQueueSelector selector,Object arg)函数发送消息即可。在 MessageQueueSelector 的实现中,根据传入的 Object 参数,或者根据 Message 消息内容确定把消息发往那个 Message Queue,返回被选中的 Message Queue。

lihongjie0209 commented 4 years ago

如何存储队列位置信息

实际运行中的系统,难免会遇到重新消费某条消息、跳过一段时间内的消息等情况。这些异常情况的处理,都和 Offset 有关。本节主要分析 Offset 的存储位置,以及如何根据需要调整 Offset 的值。

首先来明确一下 Offset 的含义,RocketMQ 中,一种类型的消息会放到一个 Topic 里,为了能够并行,一般一个 Topic 会有多个 Message Queue(也可以设置成一个),Offset 是指某个 Topic 下的一条消息在某个 Message Queue 里的位置,通过 Offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后继续处理。

如图3-1所示是 Offset 的类结构,主要分为本地文件类型和 Broker 代存的类型两种。对于 DefaultMQPushConsumer 来说,默认是 CLUSTERING 模式,也就是同一个 Consumer group 里的多个消费者每人消费一部分,各自收到的消息内容不一样。这种情况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构。