elastic / apm-queue

Abstraction layer over Kafka / GCP PubSub Lite to produce and consume records
Apache License 2.0
6 stars 15 forks source link

`Produce()` from `kafka.Producer` will always return no errors if `cfg.Sync=true` #513

Open endorama opened 2 months ago

endorama commented 2 months ago

Produce() method of kafka.Producer has this signature: https://github.com/elastic/apm-queue/blob/a508dbd8227928c843560343574741ffc4fca720/kafka/producer.go#L210

It also has 2 behaviors: async and sync production. This behavior is triggered by the cfg.Sync config option.

In the async production case the errors occurring are passed over to the cfg.ProduceCallback if specified. This is correct.

In the sync production case errors occurring are not collected in any way and will never be returned to the caller. Worse: the caller will never know an error happened.

This is the relevant code where we produce to kafka: https://github.com/elastic/apm-queue/blob/a508dbd8227928c843560343574741ffc4fca720/kafka/producer.go#L243-L258

I'm not sure this behavior is intended and I would consider it a bug. The solution could be to collect errors with errors.Join and return the resulting error on line 261 https://github.com/elastic/apm-queue/blob/a508dbd8227928c843560343574741ffc4fca720/kafka/producer.go#L260-L264

This way on cfg.Sync=true errors would be collected and returned. On cfg.Sync=false the code would not wait thus returning nil keeping the current behavior.