apache / pulsar-client-go

Apache Pulsar Go Client Library
https://pulsar.apache.org/
Apache License 2.0
659 stars 336 forks source link

Resend after reconnection cause reproduce of the same msg #731

Open WinChua opened 2 years ago

WinChua commented 2 years ago

https://github.com/apache/pulsar-client-go/blob/965045aa0d2c077826f64d0b88a5d5e55d9d50c4/pulsar/producer_partition.go#L272

After the reconnection of the producer, the msg in the pendingQueue will be resend. But not every msg in pendingQueue hasn't send to broker, the resend logic here seems to deal with the problem that producer meet a network error when producing a msg which cause the msg doesn't really send to broker.

While, the code here

https://github.com/apache/pulsar-client-go/blob/965045aa0d2c077826f64d0b88a5d5e55d9d50c4/pulsar/producer_partition.go#L793

will also trigger the reconnection logic again and again, as well as the resend logic, which cause the consumer to consume the same msg with difference msgId

WinChua commented 2 years ago

Try to present a solution, the problem is that in the resend logic after the reconnection of producer, no mater the item in pendingQueue has been sent or not, all the items will be resend to broker, which will cause the redundancy of msg. We can add sent flag in pendingItemQueue initialized as false, which will be set to true after the corresponding buffer has been sent successfully. And filter the items which has been sent after the reconnection, resend the item which has not been sent only.