twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.7+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.76k stars 175 forks source link

fetch using topic id #790

Open arjunnair1997 opened 1 month ago

arjunnair1997 commented 1 month ago

Hello,

I'm trying to use the franz go kafka client to consume records from a source cluster. I'm using a direct consumer, because the workload is to consume a list of arbitrary (topic id, partition id, offset) periodically.

I'm trying to achieve this by keeping a single client alive. When it receives the list of (topic id, partition id, offset) to consume from, it performs remove consume topic partitions using RemoveConsumePartitions or even PurgeTopicsFromClient, and adds the new consume topic partitions using AddConsumePartitions.

However, the above APIs use the topic name string and not the topic ids. I think I can make my use case of ensuring I'm fetching from the correct topic work if the FetchTopic struct also has the topic id in it and it's returned from the PollRecords function with the topic ids also set.

I think it's possible to do this by:

  1. Adding topic id field to FetchTopic.
  2. Adding a config to indicate that the topic id should be set in FetchTopic.
  3. Setting the topic id in the FetchTopic upon fetches(I think this is possible for request versions >= 13?)

What do you think? If you're open to this, then I can implement this.

twmb commented 1 month ago

I'm on board, though I'd skip (2).

I think this is the full patch:

diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go
index 4f1ebe6f..55315644 100644
--- a/pkg/kgo/record_and_fetch.go
+++ b/pkg/kgo/record_and_fetch.go
@@ -274,6 +274,8 @@ func (p *FetchPartition) EachRecord(fn func(*Record)) {
 type FetchTopic struct {
    // Topic is the topic this is for.
    Topic string
+   // TopicID is the ID of the topic, if your cluster supports topic IDs.
+   TopicID [16]byte
    // Partitions contains individual partitions in the topic that were
    // fetched.
    Partitions []FetchPartition
@@ -560,6 +562,7 @@ func (fs Fetches) EachTopic(fn func(FetchTopic)) {
    for topic, partitions := range topics {
        fn(FetchTopic{
            topic,
+           [16]byte{},
            partitions,
        })
    }
diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go
index 7a55ef0c..cddb4189 100644
--- a/pkg/kgo/source.go
+++ b/pkg/kgo/source.go
@@ -979,6 +979,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

        fetchTopic := FetchTopic{
            Topic:      topic,
+           TopicID:    rt.TopicID,
            Partitions: make([]FetchPartition, 0, len(rt.Partitions)),
        }

I can push this after I cut a patch release. There aren't many new features at the moment, so I may wait on releasing this for a little bit -- is that alright? (potentially a month?)

arjunnair1997 commented 1 month ago

Hey, that's great, thanks. If you decide to merge it in now, I can pin franz-go master, and start using it. Otherwise, I can fork franz go for a bit until you decide to merge it in.

twmb commented 1 month ago

I was burned in the past by merging a feature then having to revert it to issue a quick bugfix release; if you're open to using the branch for now, please do that 😅

I'll leave this issue open, it'll be closed once I merge the PR -- which I'll do right as I'm about to cut a release.