bsm / sarama-cluster

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 [DEPRECATED]
MIT License
1.01k stars 222 forks source link

Upon closed dead subscription, Resubscribe #261

Closed steve-gray closed 6 years ago

steve-gray commented 6 years ago

It appears that when a partition suffers from the conditions that cause it to die, potentially as a result of either slow consumption or something else - there's conditions that lead to this code in Sarama being hit:


func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
    for _, child := range newSubscriptions {
        bc.subscriptions[child] = none{}
        Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
    }

    for child := range bc.subscriptions {
        select {
        case <-child.dying:
            Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
            close(child.trigger)
            delete(bc.subscriptions, child)
        default:
            break
        }
    }
}

(From consumer.go in Sarama)

Upon this dead subscription, there seems to be no consumer-group level hook to periodically check for this condition - so as long as the process remains alive it'll hold it's own lease on the partition and stop another process claiming it.

dim commented 6 years ago

@steve-gray I am currently working on a PR to integrate consumer groups into sarama itself https://github.com/Shopify/sarama/pull/1099. As part of the new API, you should be able to address these issues in your implementation, i.e. trigger a rebalance by existing a handler when a partition is stuck/slow.