Closed liusongsen closed 3 years ago
// 处理业务逻辑 $receiptHandles = array(); foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n", $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(), $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(), $message->getMessageKey()); print_r($message->getProperties()); }
// $message->getNextConsumeTime()前若不确认消息消费成功,则消息会重复消费 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样 print_r($receiptHandles); // 这里有点问题了,消费完消息后,都没有进行消费确认,可是发现服务端该消息已经非消费了(消费状态是已消费) exit; try { $this->consumer->ackMessage($receiptHandles); } catch (\Exception $e) { if ($e instanceof MQ\Exception\AckMessageException) { // 某些消息的句柄可能超时了会导致确认不成功 printf("Ack Error, RequestId:%s\n", $e->getRequestId()); foreach ($e->getAckMessageErrorItems() as $errorItem) { printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode()); } } } print "ack finish\n";
已解决
// 处理业务逻辑 $receiptHandles = array(); foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n", $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(), $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(), $message->getMessageKey()); print_r($message->getProperties()); }