segmentio / kafka-go

Kafka library in Go
MIT License
7.59k stars 789 forks source link

Offset for consumer group #197

Open maeglindeveloper opened 5 years ago

maeglindeveloper commented 5 years ago

Hey everyone,

I'm actually trying to use kafka-go using Reader and consumer group. I did not find a way to know what is the last offset of the partition attributed to my reader until I fetch it first message. Not using consumer group, it is working perfectly using the ReadLastOffset of a Conn object (which mean that you should know the partition btw). Any ideas ?

Thanks for your help :)

@achille-roussel

achille-roussel commented 5 years ago

Hello @maeglindeveloper

This is not a feature which is currently supported by kafka-go, however if you have spare cycles we would welcome a contribution to add it!

robfig commented 5 years ago

I need this feature, and I'd be happy to implement it if someone could provide an outline of what to do. @achille-roussel

achille-roussel commented 5 years ago

When using consumer groups, a reader may be assigned more than one partition, and may consume from more than one topic, so "the last offset" doesn't cover for these use cases, we would need a set of topic/partition/offsets.

The Client API exposes a method to retrieve this information for a consumer group, and the ConsumerGroup API offers a lower-level access to the consumer group management as well.

If none of these options are helpful, I'll be happy to hear ideas on new APIs we could introduce to cover your use cases.

robfig commented 5 years ago

The use case is rebuilding some derived data store by reading the complete contents of a Topic. Presently, there is no way to know when you've reached the end of a topic (e.g. the high watermark at the time you started reading).

The current best workaround I've seen is to treat a context.DeadlineExceeded as "no more messages", but that obviously won't work if messages continue to come in. To address that, I believe it's necessary to configure the topic with LogAppend timestamps and then compare timestamps to determine. As you point out, you may be reading past that point in one partition while messages are unread prior to that timestamp in another.

Perhaps there's no good way to do this with Kafka.

achille-roussel commented 5 years ago

We actually use kafka-go for a very similar use case at Segment, maybe a consumer group is the wrong tool here?

Have you considered using a Conn directly, you can call ReadPartitions to discover all existing partitions, open a connection to each partition, and call ReadOffsets to get the bounds of the partition, then you know what the end is.

robfig commented 5 years ago

Yeah, I agree that now seems like the best approach. Thanks!

achille-roussel commented 3 years ago

Just leaving an update that we are still looking into this issue.

perbu commented 2 years ago

Would it be appropriate to have the reader fail when trying to set the offset?

The reason I ask is that I spent a fair amount of time trying to figure out why this wasn't working before I found the issue. Or perhaps mention it the readme?

I have no idea how far into the future the fix is so in the mean time I think it would be good to have it be somewhat clearer that this doesn't work.

Also, for me the appropriate work-around was to call reader:

r.SetOffsetAt(context.Background(), startTime)

All the best, Per.

achille-roussel commented 2 years ago

Hello @perbu!

First, I'm sorry to hear you had to waste time on something that could have been better documented in kafka-go.

We always welcome contributions, since you are close to the problem, you might be in a good position to provide a valuable improvement to our documentation.

Would you happen to have the time to submit a pull request for the changes that would have been useful for you to have in the first place?

perbu commented 2 years ago

@achille-roussel I have two suggestions. One would be to for the SetOffset() to return an error when given -1. This would give people an understanding that this isn't supported (yet).

The other would be to add a snippet to the README.

My suggestion would be for the API to fail the request, since it doesn't work. But if that is not possible, adding it to the README would be helpful.

Do I read you correctly that you don't want to fail SetOffset(-1)? And you just want this behavior documented?

edit: I'm no longer available to contribute. I've migrated to another library. All the best.