segmentio / kafka-go

Kafka library in Go
MIT License
7.3k stars 760 forks source link

#615 - listener for assignments #1305

Open nachogiljaldo opened 2 days ago

nachogiljaldo commented 2 days ago

Closes https://github.com/segmentio/kafka-go/issues/615

Adds a listener to send re-assignments to a listener.

nachogiljaldo commented 2 days ago

@achille-roussel I saw you commented on https://github.com/segmentio/kafka-go/issues/615 . Is this something you could review?

isaacd9 commented 2 days ago

I don't think adding this is a good idea. Instead, you should probably use the ConsumerGroup API to listen for a new generation and construct a new reader.

nachogiljaldo commented 2 days ago

Thanks for the feedback @isaacd9 ! Could you elaborate?

I had a quick look at the NewReader. Also it seems to use run which is private, so it's not possible to do it that way without duplicating even more code? Same for getTopics() while building the ConsumerGroupConfig.

Maybe, instead, a modification safe version of ConsumerGroup should be exposed (that only has Next but not Close) in the Reader? That allows to preserve the encapsulation of the logic to create Readers which IMHO seems important and avoids unnecessary duplication on all clients that would need this.

Something like:

type ConsumerGroupGenerationListener interface {
  Next(ctx context.Context) (*Generation, error)
}

...

func (r *Reader) ConsumerGroupGenerationListener(ctx context.Context) (ConsumerGroupGenerationListener, error) {
    if !r.useConsumerGroup() {
        return nil, errOnlyAvailableWithGroup
    }
        return r.consumerGroupGenerationListener, nil
}
nachogiljaldo commented 1 day ago

@petedannemann , sorry for the direct ping but you seem to have reviewed some of the last PRs. I wonder if this is something you could provide feedback on the approach (or the alternatives provided by Isaac or my counter proposal).