Closed qianlnk closed 5 years ago
@qianlnk thanks for the PR, but I don't actually understand the bug. Also, for numerous reasons, it's not safe to expose the go channel over which messages are delivered, so we wouldn't merge that API addition.
@mreiferson We can't receive the message from message channel until the message transfer to message channel(because the message channel may be blocked). I hold the opinion that count of messagesReceived
should be put after the message transfer to message channel. as for the user, messagesReceived
must be equal to the number received from message handler.
You propose to not increment messagesReceived
until the handler starts processing the message. I think the original intention is to count the message received as soon as the process receives it from nsqd (and I think this is even more important for totalRdyCount
).
hmm ... I think it might be possible that some messages could be received by the consumer process from nsqd, just after Stop() is called and incomingMessages
is closed, and we could possibly handle it more elegantly, though it's a tricky distributed timing problem ... what you could do is call ChangeMaxInFlight(0) and wait a couple seconds, then call Stop()
, and in that case you should get consistent numbers, and no dropped messages
1、messagesReceived should be add after message had transport to message channel.
2、 add Consume method,use Consume to get message channel if someone want to custom
ack
ornack
.