apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.23k stars 3.58k forks source link

[Bug] Bundle unload can cause shared consumer receive duplicate messages #23056

Open mawenyu opened 3 months ago

mawenyu commented 3 months ago

Search before asking

Read release policy

Version

3.0.4

Minimal reproduce step

  1. two node broker
  2. three client process use same shared subscription to consumer one topic
  3. consumer receiveQueue 5
  4. consumer cost 3 second to handle each message and then ack
  5. unload topic from brokerA to brokerB
  6. three client process may receive some messages that already in receiveQueue of other consumer ; which will finally cause the same message be handled by two consumer

What did you expect to see?

bundle unload will not cause message duplication

What did you see instead?

bundle unload will not cause unacked message deliver to other consumer, which will cause data duplication

Anything else?

I think when consumer reconnected, consumer should first tell broker what messages they already hold , broker should redeliver these messages;

Are you willing to submit a PR?

hanmz commented 3 months ago

The problem of message duplication is difficult to avoid. The better solution now is to implement idempotence on the client side.

mawenyu commented 3 months ago

I solved this issue by modifying the consumer source code. Could you please ask the community if they would consider adopting this approach or if there are any more elegant solutions? The main modifications are:

  1. In the org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened , call the parent consumer’s org.apache.pulsar.client.impl.MultiTopicsConsumerImpl clearIncomingMessages(). —— This is necessary because Pulsar does not perform this cleanup , these message will cause duplication
  2. In the org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened delay the invoke of org.apache.pulsar.client.impl.ConsumerImpl#increaseAvailablePermits by 3 seconds and execute it asynchronously. —— This 3-second delay allows the client to acknowledge the received messages.
lhotari commented 2 months ago

I solved this issue by modifying the consumer source code. Could you please ask the community if they would consider adopting this approach or if there are any more elegant solutions? The main modifications are:

  1. In the org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened , call the parent consumer’s org.apache.pulsar.client.impl.MultiTopicsConsumerImpl clearIncomingMessages(). —— This is necessary because Pulsar does not perform this cleanup , these message will cause duplication

@mawenyu Interesting. This seems to be related to #21767 which is a draft PR I have.

  1. In the org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened delay the invoke of org.apache.pulsar.client.impl.ConsumerImpl#increaseAvailablePermits by 3 seconds and execute it asynchronously. —— This 3-second delay allows the client to acknowledge the received messages.

Just wondering if the https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch design or implementation has some gap.

lhotari commented 2 months ago

@mawenyu Which Pulsar client version and Pulsar broker version are you using?

mawenyu commented 1 week ago

@mawenyu Which Pulsar client version and Pulsar broker version are you using?

3.0.4