apache / rocketmq-client-cpp

Apache RocketMQ cpp client
https://rocketmq.apache.org/
Apache License 2.0
366 stars 158 forks source link

Consume Offset been Updated Incorrectly with Orderly Push Consumer #322

Closed luyui closed 4 years ago

luyui commented 4 years ago

BUG REPORT

  1. Please describe the issue you observed:
  1. Please tell us about your environment:

    • What is your OS?

      CentOS 7

    • What is your client version?

      b884f17df

    • What is your RocketMQ version?

      4.7.2

  2. Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc):

Here is the debug log: 14805_rocketmq-cpp.log, the message consume callback is not returned during the whole log time range.

It seems the incorrect consume offset update happened somewhere between line 5906 and 6200

5906: 2020-Jul-16 11:49:46.390552:oneway updateConsumeOffsetToBroker of mq:MessageQueue [topic=dev-prefix-new-test, brokerName=broker-0.rocketmq-dev-240-141, queueId=1], its offset is:8036[updateConsumeOffsetToBroker:310] ... 6200: 2020-Jul-16 11:49:51.393309:oneway updateConsumeOffsetToBroker of mq:MessageQueue [topic=dev-prefix-new-test, brokerName=broker-0.rocketmq-dev-240-141, queueId=1], its offset is:8037[updateConsumeOffsetToBroker:310]

luyui commented 4 years ago

Seems I might have found some clue.

When the consume orderly service tries to consume the queue(ConsumeMessageOrderlyService.cpp:183):

request->takeMessages(msgs, 1);

it may move the only one pending message from m_msgTreeMap into m_msgTreeMapTemp, which makes the next pull consider all messages been consumed in the queue(DefaultMQPushConsumerImpl.cpp:100, DefaultMQPushConsumerImpl.cpp:122):

pullRequest->getMessage(msgs);
if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
  m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
}

thus updating the consume progress wrongly.

This logic will only occur when there's olny one message to be consumed in the queue and consumer orderly is used, which matches my use case nicely.

For now I have changed the PullRequest::getMessage function to return messages in both m_msgTreeMap and m_msgTreeMapTemp, is this change sane? Any potential bug here?

ifplusor commented 4 years ago

I think u are right but use getCacheMsgCount() to check cache is empty is better, and getCacheMsgCount need to change to below:

int PullRequest::getCacheMsgCount() {
  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
  return m_msgTreeMap.size() + m_msgTreeMapTemp.size();
}
ifplusor commented 4 years ago

@fluyu Could you pull a request to fix this issue?

luyui commented 4 years ago

I think u are right but use getCacheMsgCount() to check cache is empty is better, and getCacheMsgCount need to change to below:

int PullRequest::getCacheMsgCount() {
  boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
  return m_msgTreeMap.size() + m_msgTreeMapTemp.size();
}

Thanks for the suggestion, followed and verified, works like a charm.

@fluyu Could you pull a request to fix this issue?

My pleasure and done. #324