twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.6k stars 158 forks source link

GroupTransactSession Close hangs, preventing restart #753

Open iamnoah opened 1 month ago

iamnoah commented 1 month ago

We have a GroupTransactSession that we use in a slightly unusual way. We don't actually consume a topic, but instead use offset commit metadata as the cursor for our transactional producer (see #559).

The session is created with these options:

        kgo.ConsumerGroup(...),
        kgo.ConsumeTopics(...),
        kgo.TransactionalID(...),
        kgo.FetchIsolationLevel(kgo.ReadCommitted()),
        kgo.BlockRebalanceOnPoll(),
        kgo.DisableAutoCommit(),
        kgo.FetchMaxWait(200 * time.Millisecond),
        kgo.OnPartitionsAssigned(func(ctx context.Context, client *kgo.Client, m map[string][]int32) {
            client.PauseFetchPartitions(m) // never actually fetch anything, we don't care what is in the topic
            // ... + some non-blocking internal state management
        }),
        kgo.OnOffsetsFetched(func(ctx context.Context, client *kgo.Client, response *kmsg.OffsetFetchResponse) error {
            // internal state management
        }),
        kgo.OnPartitionsRevoked(func(ctx context.Context, client *kgo.Client, m map[string][]int32) {
            client.ForceMetadataRefresh() // ... + some state management

When we hit the MSK auth error described in #731 we are attempting to bail out and restart the process. Sometimes we are getting stuck in LeaveGroupContext:

goroutine 235 [select, 16 minutes]:
github.com/twmb/franz-go/pkg/kgo.(*Client).LeaveGroupContext(0xc000481008, {0x14d91a8, 0xc000d58c80?})
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:254 +0x131
github.com/twmb/franz-go/pkg/kgo.(*Client).close(0xc000481008, {0x14d91a8, 0xc000d58c80})
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/client.go:1006 +0x1ba
github.com/twmb/franz-go/pkg/kgo.(*Client).Close(...)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/client.go:993
github.com/twmb/franz-go/pkg/kgo.(*GroupTransactSession).Close(...)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/txn.go:157

This keeps the process from exiting, but with our producer unable to produce.

The loop that invokes Close looks like this (simplified):

    defer client.Close()

    for {
        // doing this to ensure we have updated partition assignments
        client.PollFetches(nil)

        // ... load some data ...

        client.ProduceSync(...)

        client.End(kgo.PreTxnCommitFnContext(ctx, func(request *kmsg.TxnOffsetCommitRequest) error {
            request.Topics = []kmsg.TxnOffsetCommitRequestTopic{
                {
                    Topic: ...,
                    Partitions: []kmsg.TxnOffsetCommitRequestTopicPartition{
                        {
                            Metadata: currentCursor,
                        },
                    },
                },
            }
            return nil
        }), kgo.TryCommit)
    }

Other franz-go routines at the time of the above trace:

goroutine 82259 [sync.Cond.Wait, 16 minutes]:
sync.runtime_notifyListWait(0xc000e4bb90, 0x0)
    runtime/sema.go:569 +0x159
sync.(*Cond).Wait(0x54aa10?)
    sync/cond.go:70 +0x85
github.com/twmb/franz-go/pkg/kgo.(*consumer).waitAndAddRebalance(0xc000481698)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer.go:261 +0xb4
github.com/twmb/franz-go/pkg/kgo.(*Client).LeaveGroupContext.func1()
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:246 +0x32
created by github.com/twmb/franz-go/pkg/kgo.(*Client).LeaveGroupContext in goroutine 235
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:245 +0xda

goroutine 242 [chan receive, 16 minutes]:
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat(0xc000d42c80)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:887 +0x42b
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage(0xc000d42c80)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:392 +0x199
created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).findNewAssignments in goroutine 241
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:1786 +0x9f9

goroutine 226 [select]:
github.com/twmb/franz-go/pkg/kgo.(*Client).reapConnectionsLoop(0xc000481008)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/broker.go:558 +0x155
created by github.com/twmb/franz-go/pkg/kgo.NewClient in goroutine 1
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/client.go:519 +0xbdd

goroutine 225 [select, 2 minutes]:
github.com/twmb/franz-go/pkg/kgo.(*Client).updateMetadataLoop(0xc000481008)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/metadata.go:173 +0x1ef
created by github.com/twmb/franz-go/pkg/kgo.NewClient in goroutine 1
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/client.go:518 +0xb9b

goroutine 82291 [select]:
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).heartbeat(0xc000d42c80, 0xc00119d3e0, 0xc0011122e8)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:940 +0x26c
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat.func1()
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:850 +0x11c
created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat in goroutine 242
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:847 +0x385

goroutine 82292 [sync.Cond.Wait, 16 minutes]:
sync.runtime_notifyListWait(0xc000e4bb90, 0x1)
    runtime/sema.go:569 +0x159
sync.(*Cond).Wait(0xc0005ad801?)
    sync/cond.go:70 +0x85
github.com/twmb/franz-go/pkg/kgo.(*consumer).waitAndAddRebalance(0xc000481698)
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer.go:261 +0xb4
github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).assign.func1()
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:793 +0x8a
created by github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).assign in goroutine 242
    github.com/twmb/franz-go@v1.15.1/pkg/kgo/consumer_group.go:784 +0x85

Dependencies:

    github.com/twmb/franz-go v1.15.1
    github.com/twmb/franz-go/pkg/kadm v1.9.0
    github.com/twmb/franz-go/pkg/kmsg v1.7.0

What could be causing this? I suspect the ForceMetadataRefresh and/or the fact that we don't commit offsets in OnPartitionsRevoked, but looking through the code, I don't really understand how it hangs where it does, as the consumer group ctx seems like it should already have been canceled.

twmb commented 1 month ago

Are you exiting the poll loop somewhere without allowing rebalance? You may want to check CloseAllowingRebalance().

iamnoah commented 1 month ago

@twmb thanks I will try that. GroupTransactSession doesn't expose that directly, so I'm replacing GroupTransactSession.Close with GroupTransactSession.Client().CloseAllowingRebalance()?

twmb commented 4 weeks ago

That will work, but I'll leave this issue open as a reminder to spot check if any APIs are worth mirroring to GroupTransactSession.

twmb commented 3 weeks ago

From a quick audit, I think only AllowRebalance and CloseAllowingRebalance are worth adding to GroupTransactSession -- for other APIs, a user can go through Client()