twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.7+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.76k stars 177 forks source link

Possible race condition in kadm.CommitOffsets #167

Closed Zach-Johnson closed 2 years ago

Zach-Johnson commented 2 years ago

Hi - I've noticed some strange behavior when using kadm.CommitOffsets. If I use this function with a groupID that does not exist yet, I see

error UNKNOWN_MEMBER_ID: The coordinator is not aware of this member.

if the admin client is created sufficiently long after the regular client is set up. If I instead call this function immediately after creating the initial client, the request succeeds and the behavior is as I expect it: I get a new consumer group that begins consuming at the offsets I specified.

The context here is that previously I was trying to set up a new consumer group with specific offsets to start from and I used to do something like

client.PollRecords(ctx, 1)
client.SetOffsets(startingPositions)
...

to force the consumer group creation initially which turns out to work badly in some cases because I think I'm abusing SetOffsets here since not necessarily all the topics/partitions will have been consumed on the initial PollRecords

So I suspect there is some race happening in kadm that I'm unaware of, although maybe I'm abusing the behavior here as well - in general, is there a good pattern for forcing a consumer group to start from a specific set of offsets? Perhaps I should do something like this instead?

client.PollRecords(ctx, 1) // Force the consumer group creation if it doesn't exist
client.CommitRecords(....)

The race in kadm seems easily reproducible with something like this with a groupID that doesn't exist yet:

    e := func(err error) {
        if err != nil {
            fmt.Println("error", err)
            os.Exit(1)
        }
    }

    cl, err := kgo.NewClient(opts...)
    e(err)

    defer cl.Close()

    adm := kadm.NewClient(cl)

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    // time.Sleep(time.Second) // Uncommenting this results in the error

    resp, err := adm.CommitOffsets(ctx, groupID, kadm.Offsets{topic: map[int32]kadm.Offset{
        0: kadm.Offset{
            Topic: topic, Partition: 0, At: 4, LeaderEpoch: -1,
        },
        1: kadm.Offset{
            Topic: topic, Partition: 1, At: 4, LeaderEpoch: -1,
        },
        2: kadm.Offset{
            Topic: topic, Partition: 2, At: 8, LeaderEpoch: -1,
        },
    }})
    e(err)
    e(resp.Error())

    fmt.Println("resp", resp)
twmb commented 2 years ago

The admin level CommitOffsets cannot be used on an active group, so this is likely a race in your usage. I'd actually expect ILLEGAL_GENERATION, but perhaps there's some other race when committing with the first generation. If you commit immediately, it's likely that the kgo client guts have not yet joined the group yet, so your admin commit is going through first and forcing a commit to exist that the group then starts from.

If you want to have a group start from offsets, you need to commit before the group is joined for the first time (or when the group is completely empty). Otherwise, if you're only consuming with one member, you can use the kgo.Client SetOffsets function. This will set the partitions at the given offsets within the client. Once partitions are eventually "dirty", commits for dirty partitions will go through (but partitions that are not consumed are not dirty, so commits for those will not go through).

I recommend using the admin commit before starting the group.

Zach-Johnson commented 2 years ago

Understood, thank you!