apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.17k stars 11.67k forks source link

Does Consumer Affect Producer's Tps? #562

Closed LvChenhui closed 4 years ago

LvChenhui commented 5 years ago

The issue tracker is ONLY used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:

BUG REPORT

  1. Please describe the issue you observed:
  1. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc): 消息大小为1k,异步落盘

FEATURE REQUEST

  1. Please describe the feature you are requesting.

  2. Provide any additional detail on your proposed use case for this feature.

  3. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?

  4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:

Liberxue commented 5 years ago

RocketMQ提供了自动从Slave读取老数据的功能;

主要由slaveReadEnable这个参数控制。默认是关的(slaveReadEnable = false bydefault)。推荐把它打开,主从都要开。这个参数打开之后,在客户端消费数据时,会判断,当前读取消息的物理偏移量跟最新的位置的差值,是不是超过了内存容量的一个百分比(accessMessageInMemoryMaxRatio= 40 by default)。如果超过了,就会告诉客户端去备机上消费数据。如果采用异步主从,也就是brokerRole等于ASYNC_AMSTER的时候,你的备机IO打爆,其实影响不太大。但是如果你采用同步主从,那还是有影响。所以这个时候,最好挂两个备机。因为RocketMQ的主从同步复制,只要一个备机响应了确认写入就可以了,一台IO打爆,问题不大。

ymwneu commented 5 years ago

由于broker资源固定,对broker来说,增加consumer消费理论上会对生产者TPS产生一定影响,但是根据实际的测试结果,这个影响应该是不大的。 你的producer和consumer是否在同一台机器上?如果在同一台机器上,并且consumer有一定消费逻辑,这可能占用机器资源,影响producer发送。

ymwneu commented 5 years ago

你具体是怎么测试的,可以把测试方法和参数配置发出来看一下。

LvChenhui commented 5 years ago

@Liberxue 测试环境中我采用的是单master无slave的配置,持久化入的内存10G(将内存挂载成硬盘)来屏蔽IO的问题。

LvChenhui commented 5 years ago

@ymwneu broker,producer和consumer都分布在不同的服务器上

生产者demo

    StringBuilder sb = new StringBuilder();
    for (int j = 0; j < 1020; j += 10) {
        sb.append("hello baby");
    }

    final Message msg = new Message("TagTest",
            sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));

    int jmax = 4; //模拟4个模块
    int imax = 25; //每个模块有25个线程在调用send方法
    for (int j = 0; j < jmax; j++) {
        final ExecutorService sendThreadPool = Executors.newFixedThreadPool(imax);
        final DefaultMQProducer producer = new DefaultMQProducer("TestTagProducerGroup");
        producer.setInstanceName(UUID.randomUUID().toString());
        producer.setNamesrvAddr("*********");
        producer.setCompressMsgBodyOverHowmuch(500);
        producer.start();
        for (int i = 0; i < imax; i++) {
            sendThreadPool.execute(() -> {
                int num = 0;
                while (true) {
                    try {
                        int tagId = num % 50;
                        msg.setTags("Tag" + tagId);
                        SendResult sendResult = producer.send(msg);
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            System.out.println(sendResult.getSendStatus());
                        }
                        num++;
                    } catch (MQClientException e) {
                        e.printStackTrace();
                    } catch (RemotingException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (MQBrokerException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

消费者就是简单的pushConsumer

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestTagconsumerGroup");
    consumer.setInstanceName(UUID.randomUUID().toString());
    consumer.setNamesrvAddr("**************");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TagTest", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();

broker大致配置如下: 持久化入的内存

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=2
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
messageIndexEnable=TRUE
diskMaxUsedSpaceRatio=80
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128
listenPort=10911
autoCreateTopicEnable=true
useReentrantLockWhenPutMessage=true

消息大小为1K,启动压缩,网卡,CPU,MEM都没负载。 测试发现,当消费者组过多时(同一份数据被消费很多遍),对生产者TPS影响较大。当消费者组内增多消费者,生产者TPS也有一定影响。

Liberxue commented 5 years ago

@Liberxue 测试环境中我采用的是单master无slave的配置,持久化入的内存10G(将内存挂载成硬盘)来屏蔽IO的问题。

生产环境不使用slave毫无"高可用"可言; 软连接需要处理;slaveReadEnable 状态 https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java#L111

Liberxue commented 5 years ago

@ymwneu broker,producer和consumer都分布在不同的服务器上

生产者demo

    StringBuilder sb = new StringBuilder();
    for (int j = 0; j < 1020; j += 10) {
        sb.append("hello baby");
    }

    final Message msg = new Message("TagTest",
            sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));

    int jmax = 4; //模拟4个模块
    int imax = 25; //每个模块有25个线程在调用send方法
    for (int j = 0; j < jmax; j++) {
        final ExecutorService sendThreadPool = Executors.newFixedThreadPool(imax);
        final DefaultMQProducer producer = new DefaultMQProducer("TestTagProducerGroup");
        producer.setInstanceName(UUID.randomUUID().toString());
        producer.setNamesrvAddr("*********");
        producer.setCompressMsgBodyOverHowmuch(500);
        producer.start();
        for (int i = 0; i < imax; i++) {
            sendThreadPool.execute(() -> {
                int num = 0;
                while (true) {
                    try {
                        int tagId = num % 50;
                        msg.setTags("Tag" + tagId);
                        SendResult sendResult = producer.send(msg);
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            System.out.println(sendResult.getSendStatus());
                        }
                        num++;
                    } catch (MQClientException e) {
                        e.printStackTrace();
                    } catch (RemotingException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (MQBrokerException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

消费者就是简单的pushConsumer

  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestTagconsumerGroup");
  consumer.setInstanceName(UUID.randomUUID().toString());
  consumer.setNamesrvAddr("**************");
  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  consumer.subscribe("TagTest", "*");
  consumer.registerMessageListener(new MessageListenerConcurrently() {

      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                      ConsumeConcurrentlyContext context) {
          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
  });
  consumer.start();

broker大致配置如下: 持久化入的内存

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=2
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
messageIndexEnable=TRUE
diskMaxUsedSpaceRatio=80
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128
listenPort=10911
autoCreateTopicEnable=true
useReentrantLockWhenPutMessage=true

消息大小为1K,启动压缩,网卡,CPU,MEM都没负载。 测试发现,当消费者组过多时(同一份数据被消费很多遍),对生产者TPS影响较大。当消费者组内增多消费者,生产者TPS也有一定影响。


你根本没有批量生产/消费 建议使用rocketmq自带的benchmark做压力测试

LvChenhui commented 5 years ago

@Liberxue 官方的benchmark测试也有消费者影响生产者TPS的情况。

Liberxue commented 5 years ago

@Liberxue 官方的benchmark测试也有消费者影响生产者TPS的情况。

肯定会影响的;

首先你是在同一个Topic做生产消费,我目前的解决方案是Master+Slave来做负载;其次 你在不同Topic生产消费TPS都会按Topic平均

当Master和Slave都正常的情况下,默认从Master处读取消息,若开启了slaveReadEnable ,且Master处积压了超过40%内存总量的未消费消息,那么会从Slave=1的Broker处读取消息。

建议你查看一下源码:

https://github.com/apache/rocketmq/blob/2b692c912d18c0f9889fd73358581bcccf37bbbe/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java

https://github.com/apache/rocketmq/blob/a220364b752669c474f9795884faed9bb4e0a8c1/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

最后 我也希望能不影响 哈哈 不过....

LvChenhui commented 5 years ago
  • 当Master宕机时,长时间未向Namesrv发送心跳,Namesrv清空下线的BrokerData,Consumer从Namesrv获取的TopicRouteData里已经没有了Master的BrokerData,因此Consumer从自身的brokerAddr集合里找不到Master的BrokerAddr了,因此就按顺序向第一位Slave发送消息拉取请求。

确实读写分离在一定程度上能缓解master的压力;但在单master无slave的时候,这种情况是不是就不能避免了呢,或者说这种情况是正常的现象。 感谢您的回复,笔芯

Cicizz commented 5 years ago

在做测试时,在消息发送10e后,消费者再启动起来进行消费,tps会降低, 并且broker重启时,在load delayOffset.json后会卡住,,不再执行后续的逻辑,大家是否遇到过这种情况呢?

AnyOSR commented 5 years ago

和setConsumeFromWhere以及transientStorePoolEnable相关参数有关吧

Ah39 commented 5 years ago

我这边测试消费过快对发送是有影响,可能会导致发送失败

Ah39 commented 5 years ago

并且broker重启时,在load delayOffset.json后会卡住,,不再执行后续的逻辑,大家是否遇到过这种情况呢?

是在load commit log和consumer queue, 看 store.log

sdvdxl commented 5 years ago

这些参数在哪有详细说明?看了Apache 文档也没找到详细说明,源码上也没注释。 希望能在发布的版本上的配置文件上加上所有配置项和说明

duhenglucky commented 4 years ago

consumer的读取相对来说还是会影响发送端的性能的,尤其是在读到冷数据上面,相关的一些文档可以参考https://github.com/apache/rocketmq/tree/master/docs/cn