gojekfarm / xtools

XTools is a submodule based repo to host re-usable Golang code.
MIT License
5 stars 0 forks source link

xkafka: Sequential & Async Consumer changes #26

Closed sonnes closed 6 months ago

sonnes commented 8 months ago

Fixes #25

The processing mode is determined by the xkafka.Concurrency option. By default, the consumer is initialized with enable.auto.offset.store=false. The offset is "stored" after the message is processed. The offset is "committed" based on the enable.auto.commit & auto.commit.interval.ms options.

It is important to understand the difference between "store" and "commit". The offset is "stored" in the consumer's memory and is "committed" to Kafka. The offset is "stored" after the message is processed and the message.Status is Success or Skip. The stored offsets will be automatically committed, unless the enable.auto.commit option is set to false.

Error Handling

By default, xkafka.Consumer will stop processing, commit last stored offset, and exit if there is a Kafka error or if the handler returns an error.

Errors can be handled by using one or more of the following options:

xkafka.ErrorHandler is called for every error that is not handled by the handler or the middlewares. It is also called for errors returned by underlying Kafka client.

Sequential Processing

Sequential processing is the default mode. It is same as xkafka.Concurrency(1).

Async Processing

Async processing is enabled by setting xkafka.Concurrency to a value greater than 1. The consumer will use a pool of Go routines to process messages concurrently.

Manual Commit

By default, the consumer will automatically commit the offset based on the enable.auto.commit & auto.commit.interval.ms options. The offsets are committed asynchronously in the background.

The consumer can be configured to commit the offset manually by setting xkafka.EnableManualCommit option to true. When ManualCommit is enabled, the consumer will synchronously commit the offset after each message is processed.

NOTE: Enabling ManualCommit will add an overhead to each message. It is recommended to use ManualCommit only when necessary.

codecov-commenter commented 7 months ago

Codecov Report

Attention: 2 lines in your changes are missing coverage. Please review.

Comparison is base (e525f9f) 99.18% compared to head (81cd850) 98.97%.

Files Patch % Lines
xkafka/consumer.go 96.55% 2 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #26 +/- ## ========================================== - Coverage 99.18% 98.97% -0.22% ========================================== Files 35 35 Lines 737 781 +44 ========================================== + Hits 731 773 +42 - Misses 6 8 +2 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.