apache / rocketmq-client-cpp

Apache RocketMQ cpp client
https://rocketmq.apache.org/
Apache License 2.0
360 stars 158 forks source link

How to set the pull number for each message? #452

Open cgeffect opened 1 year ago

cgeffect commented 1 year ago

When there is a large amount of message accumulation, the consumer's callback function will retrieve multiple messages at once, ranging from 1 to 32, but the maximum is 32 this function msgs param size is 1 <= msgs.size() <= 32 virtual rocketmq::ConsumeStatus consumeMessage(const std::vector &msgs);

This set of messages cannot have a separate status of CONSUME_SUCCESS or other status. So I must wait for all messages in this group to be processed by the business before being uniformly marked as CONSUME_SUCCESS or other status.

I set consumer setConsumeMessageBatchMaxSize(1), but is not work.

For example, if the size of msgs is 32, then I just need to return CONSUME_ SUCCESS means that all 32 messages will be consumed and removed from the message queue broker. Actually, I only want to consume one by one, rather than group by group

But the same logic applies in the Java version of Rocketmq, I can take one out of msgs and mark return CONSUME_SUCCESS Other messages will continue to be sent and have not been removed from the broker.

OR I can set setPullBatchSize, it default value is 32.

public void setPullBatchSize(int pullBatchSize) { this.pullBatchSize = pullBatchSize; }

this is cpp consumer config

// 消费者 std::string unique_groupname = group + "" + tag; consumer = std::make_shared(unique_group_name); consumer->setNamesrvAddr(namesrv); consumer->setGroupName(group); consumer->setConsumeThreadCount(max_thread_count); consumer->setConsumeFromWhere(rocketmq::CONSUME_FROM_LAST_OFFSET); // 设置每次最大拉取消息量。仅在mq发生消息堆积时候有效 consumer->setConsumeMessageBatchMaxSize(1); consumer->setTcpTransportConnectTimeout(30 * 1000); consumer->setAsyncPull(false); // set sync pull consumer->setMessageModel(rocketmq::CLUSTERING); consumer->setInstanceName(group); rocketmq::elogLevel inputLevel = rocketmq::eLOG_LEVEL_LEVEL_NUM; consumer->setLogLevel(inputLevel);

What should I do?