twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.61k stars 158 forks source link

Metadata Refresh Does't Update Topic IDs After Topic Recreation #676

Closed aratz-lasa closed 4 months ago

aratz-lasa commented 4 months ago

Summary

Metadata refresh does not update Topic IDs after a topic is recreated, causing continuous Fetch failures.

Environment

Problem Description

When a Kafka topic is deleted and subsequently recreated with the same name, the franz-go library's metadata refresh mechanism does not appear to update the Topic ID associated with the topic. This mismatch between the metadata and the broker state leads to continuous failures in subsequent Fetch requests.

Expected Behavior

During a metadata refresh, the following should occur:

  1. The library should check both the topic name and the Topic ID.
  2. If a topic name exists, but the associated Topic ID has changed, it indicates that the topic has been recreated.
  3. The library should update its internal metadata to reflect the new Topic ID.

Steps to Reproduce

  1. Create a Kafka topic (e.g., "mytopic")
  2. Use the franz-go client to perform operations on the topic (produce/consume).
  3. Delete the Kafka topic "mytopic".
  4. Recreate a Kafka topic with the same name "mytopic".
  5. Attempt to perform Fetch requests using the franz-go client.

Observed Behavior

Fetch requests continuously fail, likely due to the client using the outdated Topic ID from before the topic's recreation.

Here is a script to reproduce the issue:

package main

import (
   "context"
   "fmt"
   "time"

   "github.com/twmb/franz-go/pkg/kadm"
   "github.com/twmb/franz-go/pkg/kgo"
)

func main() {
   var (
       topic = "franzgo_bug_report_topic"
   )

   consumer, err := kgo.NewClient(
       kgo.SeedBrokers("localhost:9092"),
       kgo.ConsumerGroup("franzgo_bug_report_consumer_group"),
       kgo.ConsumeTopics(topic),
   )
   if err != nil {
       panic(fmt.Errorf("error creating consumer: %w", err))
   }

   producer, err := kgo.NewClient(
       kgo.SeedBrokers("localhost:9092"),
   )
   if err != nil {
       panic(fmt.Errorf("error creating producer: %w", err))
   }

   admin, err := kadm.NewOptClient(
       kgo.SeedBrokers("localhost:9092"),
   )
   if err != nil {
       panic(fmt.Errorf("error creating admin: %w", err))
   }

   _, err = admin.CreateTopic(context.Background(), 1, 1, nil, topic)
   if err != nil {
       panic(fmt.Errorf("error creating topic: %w", err))
   }
   fmt.Println("TOPIC CREATED")

   go func() {
       for {
           ctx, cc := context.WithTimeout(context.Background(), 5*time.Second)
           results := producer.ProduceSync(ctx, &kgo.Record{
               Key:   []byte("Key"),
               Value: []byte("Value"),
               Topic: topic,
           })
           cc()
           if err := results.FirstErr(); err != nil {
               fmt.Printf("ERROR: %s\n", err.Error())
           }
           time.Sleep(time.Second)
       }
   }()

   go func() {
       for {
           ctx, cc := context.WithTimeout(context.Background(), 5*time.Second)
           fetches := consumer.PollFetches(ctx)
           cc()

           if err := fetches.Err(); err != nil {
               fmt.Printf("ERROR: %s\n", err.Error())
           }
       }
   }()

   fmt.Println("SLEEPING 10s BEFORE DELETING TOPIC")
   time.Sleep(10 * time.Second)
   _, err = admin.DeleteTopics(context.Background(), topic)
   if err != nil {
       panic(fmt.Errorf("error deleting topic: %w", err))
   }
   fmt.Println("TOPIC DELETED")

   <-context.Background().Done()
}

Impact

This bug can disrupt applications relying on the franz-go library for Kafka interactions. When topics are recreated (which might happen for various reasons like configuration changes), clients cannot interact with the new topic.

Workaround

Currently, the only known workaround is to restart the application using the franz-go library. This forces a full metadata refresh, picking up the correct Topic ID.

twmb commented 4 months ago

This is intentional, and IIRC, part of the reason topic IDs were introduced in Kafka itself -- to guard against a client automatically producing to or consuming from a topic that has been deleted and recreated.

If you want to keep the client alive, you could PurgeTopicsFromClient and then AddConsumeTopics.

From what I remember, it's not a common workflow to want to delete and recreate topics while a client is active, and it's actually more important to be notified when such a thing happens. I'm not sure if it's worth adding an option to automatically internally update for new topic IDs... maybe?

aratz-lasa commented 4 months ago

Thanks for sharing your perspective. I read about Topic IDs in KIP-516 being for brokers, but it makes sense that there could be issues on the client side too. Like, accidentally producing to a new topic thinking it's the old one seems like a real risk.

I agree that having a flag may be a good solution since you let the user decide what it expects from the workload.

twmb commented 4 months ago

Actually I remember trying to do this, for https://github.com/twmb/franz-go/pull/444

I remember running into odd internal race conditions and deemed this quickly way too hard to automate. One conflicting problem is that Kafka itself doesn't have a guarantee to always return consistent metadata responses. One broker can return a topic exists while another doesn't (shortly after creation or deletion). Mixing that with trying to detect recreated topics and what is changing when starts to get way more complicated than it's worth (in that I spend multiple evenings figuring out what would need to change where, and even that wasn't sufficient and I had to back out of my changes). I'd rather leave this to the end user to manually purge the topic and re-consume the topic.

I'm going to close this given just how complicated it is and because an end-user workaround exists. Let me know if you think this is important enough to add as an option but, fwiw, I'll likely add a help-wanted tag then and leave it for an eventual community contribution.