wvanbergen / kafka

Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
MIT License
373 stars 141 forks source link

Support manual committing and optional batching #104

Closed JamesOwenHall closed 8 years ago

JamesOwenHall commented 8 years ago

The consumer group library has the nice ability to batch updates to avoid flooding Zookeeper with writes. However, there is no way to detect errors when committing offsets.

The ConsumerGroup.CommitUpto() function has a return type of error but it's misleading because the function is hardcoded to return nil.

My suggestion is to make the time-based batching optional and to allow the clients to manually flush the offsets to Zookeeper. Client's can now opt out of time-based committing by setting the commit interval to 0. They can also manually commit the offsets by calling Flush().

cc @wvanbergen

Highstead commented 8 years ago

LGTM

wvanbergen commented 8 years ago

I think this is a nice feature to add.

For the normal case where you're not Flushing manually, can we send the commit errors to the ConsumerGroups#Errors() channel? The fact that this currently does not happen is probably just a result of me being lazy.

JamesOwenHall commented 8 years ago

Good idea. I updated the PR to retrofit the error into a *sarama.ConsumerError so that we don't change the API.

JamesOwenHall commented 8 years ago

It now uses a chan error.

wvanbergen commented 8 years ago

:ship: