silviucpp / erlkaf

Erlang kafka driver based on librdkafka
MIT License
83 stars 41 forks source link

Manually committing offsets #54

Closed nobrayner closed 1 year ago

nobrayner commented 1 year ago

Hi! :wave:

I was poking around trying to see if there is a way to manually commit offsets to the kafka broker, and found this commit function in librdkafka docs - but as far as I can tell, this doesn't seem to be exposed by the erlkaf wrapper?

Is it possible to make this call in user-land, or does this require changes from erlkaf to support?

Cheers :raised_hands:

nobrayner commented 1 year ago

Oh, and to try to avoid the x/y problem, what I am after is ensuring that after I process an event, the offset is committed back to kafka. I need to ensure that a given event isn't processed more than once for this particular consumer.

silviucpp commented 1 year ago

If you look to the implementation https://github.com/silviucpp/erlkaf/blob/d73d035e38012e066e00aa35acab3447e30ffd32/src/erlkaf_consumer.erl#L195 each time an event returns {ok, NewState} the consumer process will call rd_kafka_offset_store which means the offset will be committed (written) to broker (or file) according to auto_commit_interval_ms which is configurable.

You can set a value of 100 ms or even lower in case you want to be so drastically but for sure the performance will decrease considerable. The only way to loose the commit between the moment we store the offset to the moment it's committed is to kill your application or a vm crash in this timeframe (auto_commit_interval_ms).