twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.61k stars 158 forks source link

Added a config variable to default to the leader epoch #690

Closed Ramsey-B closed 3 months ago

Ramsey-B commented 3 months ago

Recently I've been encountering an issue where I get a non-retriable error stating topic <topic name> partition <partition> lost records; the client consumed to offset <last offset> but was reset to offset <earliest offset>

After some digging I found the consumer group was stuck on epoch 0. I forced to onto the leader epoch and everything seemed to start working again. But on restart of the service, it encountered the same error. I'm not setting the epoch anywhere in my code for this so I'm not sure why it would jump back to epoch 0.

This is only occurring with a specific topic (the topic only has 1 partition). To commit we're calling CommitRecords with the record after processing. We don't alter the record at all.

The "unique" thing about the specific topic thats encountering this error is that we only consume messages every 3 hours. So during that 3 hours I call PauseFetchPartitions and put the goroutine to sleep. After the 3 hours I call ResumeFetchPartitions, process the records, and call CommitRecords.

My initial thought was that during the 3 hour window, the epoch leader changes but I then commit a record to the older epoch, but that doesn't make sense for it to be stuck on the epoch 0. I would also assume that a change in the epoch would likely trigger a rebalance, in which case we wouldn't commit the stale records.

I made these changes to help bypass that specific error but I'm guessing theres a risk of data loss in doing this but I'm not super familiar with how epochs work.

Ramsey-B commented 3 months ago

Adding this for future reference for anyone else that has the same issue. These changes can help you get back on the correct epoch but you shouldn't need them.

My issue was that the sleeping goroutine was blocking the quit channel. To fix this I created a separate goroutine to listen to the quit signal closure. I'm using a context in my code so I utilize the cancel but you could do this with just the quit channel if you prefer.

func (pc *pconsumer) consume(ctx context.Context) {
    defer close(pc.done)

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // creates a goroutine for listening for quit signals
    // the separate goroutine is needed because the goroutine will be blocked while sleeping.
    go func() {
        <-pc.quit
        cancel() // cancel the context when the quit signal is received
    }()

    for {
        select {
        case <-ctx.Done():
            return // Exit function if context was cancelled
        case recs := <-pc.recs:
            if err := ctx.Err(); err != nil {
                return // check if the context is cancelled here just incase
            }

            for _, rec := range recs {
                // pause the partition and resume after the delay
                pc.cl.PauseFetchPartitions(map[string][]int32{pc.topic: {pc.partition}})
                if !sleepOrCancel(ctx, time.Duration(3)*time.Hour) {
                    return // Exit function if context was cancelled
                }
                pc.cl.ResumeFetchPartitions(map[string][]int32{pc.topic: {pc.partition}})

                // process the record
            }

            err := pc.cl.CommitRecords(ctx, recordsToCommit...)
            if err != nil {
                // handle the error
            }
        }
    }
}

func sleepOrCancel(ctx context.Context, duration time.Duration) bool {
    select {
    case <-time.After(duration):
        // Duration passed, continue
        return true
    case <-ctx.Done():
        // Context was cancelled, exit function
        return false
    }
}