twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.6k stars 158 forks source link

PollFetches take long time for the first fetch #732

Closed secmask closed 1 month ago

secmask commented 1 month ago

I'm testing with development kafka node in local, and seem PollFetches always take few seconds to return for the first time, is this normal? is there any way to set config that I'm missing?

here's my code

seeds := []string{"localhost:9093"}

    cl, err := kgo.NewClient(
        kgo.SeedBrokers(seeds...),
        kgo.ConsumerGroup("my-group-identifier"),
        kgo.ConsumeTopics(topic),
        kgo.DisableAutoCommit(),
        kgo.FetchMaxBytes(200),
        kgo.FetchMaxWait(time.Second),
    )
    if err != nil {
        panic(err)
    }
    defer cl.Close()
    for {
        l.Info("fetching...")
        fetches := cl.PollFetches(context.Background())
        if errs := fetches.Errors(); len(errs) > 0 {
            l.Error("got errs", "errs", errs)
            panic(errs)
        }
        l.Info("fetch complete", "count", len(fetches.Records()))
        for _, fetch := range fetches.Records() {
            l.Info("record", "data", string(fetch.Value))
        }
    }

result

time=2024-05-21T23:43:53.217+07:00 level=INFO msg=fetching...
time=2024-05-21T23:43:56.304+07:00 level=INFO msg="fetch complete" count=2
time=2024-05-21T23:43:56.304+07:00 level=INFO msg=record data="message: 1"
time=2024-05-21T23:43:56.304+07:00 level=INFO msg=record data="message: 2"
time=2024-05-21T23:43:56.304+07:00 level=INFO msg=fetching...
time=2024-05-21T23:43:56.314+07:00 level=INFO msg="fetch complete" count=2
time=2024-05-21T23:43:56.314+07:00 level=INFO msg=record data="message: 3"
time=2024-05-21T23:43:56.314+07:00 level=INFO msg=record data="message: 4"
time=2024-05-21T23:43:56.314+07:00 level=INFO msg=fetching...
time=2024-05-21T23:43:56.324+07:00 level=INFO msg="fetch complete" count=2
time=2024-05-21T23:43:56.324+07:00 level=INFO msg=record data="message: 5"
time=2024-05-21T23:43:56.324+07:00 level=INFO msg=record data="message: 6"
...

you can see the time diff for the first take up 3-4 secs.

secmask commented 1 month ago

I add a logger, it spent a lot of time on read JoinGroup

time=2024-05-23T10:45:17.230+07:00 level=DEBUG msg="read FindCoordinator v4" broker=1 bytes_read=57 read_wait=38.879µs time_to_read=4.379832ms err=<nil>
time=2024-05-23T10:45:17.231+07:00 level=DEBUG msg="opening connection to broker" addr=localhost:9093 broker=1
time=2024-05-23T10:45:17.231+07:00 level=DEBUG msg="connection opened to broker" addr=localhost:9093 broker=1
time=2024-05-23T10:45:17.231+07:00 level=DEBUG msg="connection initialized successfully" addr=localhost:9093 broker=1
time=2024-05-23T10:45:17.231+07:00 level=DEBUG msg="wrote JoinGroup v9" broker=1 bytes_written=137 write_wait=587.209µs time_to_write=13.092µs err=<nil>
time=2024-05-23T10:45:17.240+07:00 level=DEBUG msg="read JoinGroup v9" broker=1 bytes_read=66 read_wait=38.105µs time_to_read=8.385049ms err=<nil>
time=2024-05-23T10:45:17.240+07:00 level=INFO msg="join returned MemberIDRequired, rejoining with response's MemberID" group=my-group-identifier member_id=kgo-2d3e0e05-dd52-4cb6-bb09-fcc2969a1d7a
time=2024-05-23T10:45:17.240+07:00 level=DEBUG msg="wrote JoinGroup v9" broker=1 bytes_written=177 write_wait=26.809µs time_to_write=10.857µs err=<nil>
time=2024-05-23T10:45:52.167+07:00 level=DEBUG msg="read JoinGroup v9" broker=1 bytes_read=193 read_wait=30.731µs time_to_read=34.926236626s err=<nil>
time=2024-05-23T10:45:52.167+07:00 level=INFO msg="joined, balancing group" group=my-group-identifier member_id=kgo-2d3e0e05-dd52-4cb6-bb09-fcc2969a1d7a instance_id=<nil> generation=2 balance_protocol=sticky leader=true
time=2024-05-23T10:45:52.167+07:00 level=INFO msg="balancing group as leader"
time=2024-05-23T10:45:52.167+07:00 level=INFO msg="balance group member" id=kgo-2d3e0e05-dd52-4cb6-bb09-fcc2969a1d7a interests="interested topics: [foo], previously owned: "
time=2024-05-23T10:45:52.167+07:00 level=INFO msg=balanced plan=kgo-2d3e0e05-dd52-4cb6-bb09-fcc2969a1d7a{foo[0]}
time=2024-05-23T10:45:52.167+07:00 level=INFO msg=syncing group=my-group-identifier protocol_type=consumer protocol=sticky
twmb commented 1 month ago

There are two potential culprits, 1) On any new group, Kafka internally forces a 3s wait, check the config value group.initial.rebalance.delay.ms for more information 2) If you are restarting your consumer using the same group, then your new consumer gets a new member ID in the existing group. Kafka triggers a join group, and then only after the previous consumer "dies" (misses the session timeout) does Kafka allow the rebalance to continue. In your above code, I don't see any room for a ctrl+c to be handled / it doesn't look like there's a proper LeaveGroup. The defer cl.Close internally does a LeaveGroup, but your for loop is infinite, so the close is never properly leaving the group.

Given the timeout in the logs is 38s, I suspect it's (2).

Let me know if you think it's something else.