wvanbergen / kafka

Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
MIT License
373 stars 141 forks source link

Error not handled with cg.instance.ReleasePartition() #91

Closed bhirbec closed 8 years ago

bhirbec commented 8 years ago

Hi,

I've noticed that the error returned by ReleasePartition is not handled here: https://github.com/wvanbergen/kafka/blob/master/consumergroup/consumer_group.go#L357

defer cg.instance.ReleasePartition(topic, partition) 

If an error is returned (which I guess means that the partition wasn't released) could it prevent another consumer to claim it?

Thanks, Benoit

wvanbergen commented 8 years ago

Yes, that could be the case.

However, there is not really much we can do in that case, except retrying. The other option would be to shut down / crash the consumer, after which the Zookeeper connection will eventually time out and the partition will be released implicitly.

I'd welcome PRs to make this more resilient.

bhirbec commented 8 years ago

What about notifying the caller (the piece of code that calls JoinConsumerGroup) by sending an error into the errors channel? This gives the opportunity to the caller to close the group and call JoinConsumerGroup again.

Code would be something like this:

defer func() {
    err := cg.instance.ReleasePartition(topic, partition)
    if err != nil {
        cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err)
        cg.errors <- &sarama.ConsumerError{
            Topic:     topic,
            Partition: partition,
            Err:       err,
        }
    }
}()

Does it seem reasonable to you? If so, I'll be glad to send you a PR.

Please, let me know that you think.

wvanbergen commented 8 years ago

:+1:

bhirbec commented 8 years ago

Thanks in advance for taking a look at this PR

wvanbergen commented 8 years ago

Should be fixed now your PR is merged.