twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.8+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.84k stars 188 forks source link

Question: Anyone had experience of using kgo driver with Google's PubSubLite Kafka wire protocol API? #524

Closed ns-gzhang closed 1 year ago

ns-gzhang commented 1 year ago

Update on 2023-10-07 (for those who may read this): last we were told to migrate off PubSubLite by Google. So probably there is no value to read this anymore.

Thanks for the fantastic library @twmb . I wonder if there are folks out there using kgo to interact with PubSubLite (PSL) of GCP? Any experiences that could be shared would be appreciated!

We are testing out... and encountered some incompatibility, especially with consumer group and ListOffsets requests. In particular, we try to find a consumer group's committed offsets for a topic (https://pkg.go.dev/github.com/twmb/franz-go/pkg/kadm#Client.FetchOffsetsForTopics). PSL always returns -1. As a background, a PSL subscription is kind of a consumer group on a topic. And here is what GCP PSL folks told us the challenges on their end (they claim to conform to the Confluent Kafka protocol specs):

...
In particular, the ListOffsetsForTopics method does not just make a kafka OffsetFetchRequest with the consumer group and
offsets: instead it makes a list topic request, then makes an OffsetFetchRequest for [*all* topics for the consumer group]
(https://github.com/twmb/franz-go/blob/b13e4c4c6074dbab4699dc36f55bc09d1c1c2e2f/pkg/kadm/groups.go#L841), and
discards results that don't match the topics it listed. This makes no sense to me, as it would be easier to just make the single
[OffsetFetchRequest](https://kafka.apache.org/protocol.html#The_Messages_OffsetFetch) which takes a topic list.

We do not have the concept of a "multi topic consumer group" in Pub/Sub Lite, thus we have substituted the subscription
everywhere a topic is needed on the consumer group side APIs. This does mean however that we cannot service an
OffsetFetchRequest with no topics.

@twmb what do you think? If kgo can be made to be more compatible with PubSubLite, that will be more fantastic.

We also encountered a problem after v1.13.5 when a reader tries to join a consumer group, it had a problem with error: unable to join group session: ILLEGAL_GENERATION: Specified group generation id is not valid. And eventually it was successful. (only one consumer reader in the group) On v1.13.5, it just hang without any error.

Thanks!

twmb commented 1 year ago

What do they mean by discards the results that don't match the topics that were listed? The returned result contains all topics that were committed to as well as topics that do not have commits (for what it's worth, the function is confusing).

FetchOffsetsForTopics specifically is meant for when you want offsets even if they are not yet committed to by the group. I recommend using FetchOffsets which does just issue one OffsetFetch request: https://github.com/twmb/franz-go/blob/pkg/kadm/v1.9.0/pkg/kadm/groups.go#L792-L796

An OffsetFetchRequest with no topics is meant to return the offsets for all topics that were committed. If they don't support that (very old) behavior, then you need to build a manual OffsetFetch request and specify the single topic and all partitions in it. The kadm client doesn't have that behavior currently because generally when you want offsets, you want offsets for every topic in the group, and the protocol itself supports a nullable topics field to request all commits.

I don't know about the generation error, can you add debug logs? It sounds like something else they may not support.

ns-gzhang commented 1 year ago

Thanks @twmb . A fundamental "limitation" with PSL as they say, is subscription = a consumergroup on a single topic, and one consumer group can have only one topic. All requests with consumer group is replaced with a subscription (on a topic).

I've probably mixed two different offsets in the discussions. Our usage patterns have two: one can use a consumer group without caring about which partitions messages come from; another requires reading from specific partitions as logs. We have a need to fetch/list offsets for partition's first and last offsets, as well as commit offsets. 1) For commit offsets, FetchOffsetsForTopics() was not working as expected for PSL as mentioned above. I tested with manual OffsetFetchRequest, and going through client.Request(), it does return the commit offsets. So this is a good alternative (workaround). Thanks for the suggestion.

2) For partition-based reading, at start of a job, we check the available first and last offsets, so we use ListOffsets to find out. I saw kadm has ListStartOffsets(ctx, topics...) and ListEndOffsets(ctx, topics). Even I specify one topic, PSL didn't seem to return the correct EndOffsets. And I also manually constructed ListOffsetsRequest with a topic, and sent with client.Request() PSL has an EOF error. I assume client.Request() won't change the provided request. Even with a specified topic, PSL seems to have a problem. (The PSL folks' response above seemed to indicate topics are removed and then filtered from returned results) Here is the test code snippet:

    // Construct ListOffsetRequest
    topicPartitionFirst0 := kmsg.NewListOffsetsRequestTopicPartition()
    topicPartitionFirst0.Partition = 0
    topicPartitionFirst0.Timestamp = FirstOffset
    topicPartitionLast0 := kmsg.NewListOffsetsRequestTopicPartition()
    topicPartitionLast0.Partition = 0
    topicPartitionLast0.Timestamp = LastOffset

    topicPartitionFirst1 := kmsg.NewListOffsetsRequestTopicPartition()
    topicPartitionFirst1.Partition = 0 // typo: should be 1
    topicPartitionFirst1.Timestamp = FirstOffset
    topicPartitionLast1 := kmsg.NewListOffsetsRequestTopicPartition()
    topicPartitionLast1.Partition = 0 // typo: should be 1
    topicPartitionLast1.Timestamp = LastOffset

    topicOffsetReq := kmsg.NewListOffsetsRequestTopic()
    topicOffsetReq.Topic = topic
    topicOffsetReq.Partitions = []kmsg.ListOffsetsRequestTopicPartition{
        topicPartitionFirst0,
        topicPartitionLast0,
        topicPartitionFirst1,
        topicPartitionLast1,
    }

    listOffsetReq := kmsg.NewListOffsetsRequest()
    listOffsetReq.Topics = []kmsg.ListOffsetsRequestTopic{topicOffsetReq}

    resp, err := client.Request(context.Background(), &listOffsetReq) // return Response
    if err != nil {
        fmt.Println("list offset", "error", err, "partitions", 0, 1)
        return
    }

Debug dump:

dialing the broker
client created
[DEBUG] opening connection to broker; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[DEBUG] connection opened to broker; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[DEBUG] issuing api versions request; broker: seed_0, version: 3
[DEBUG] wrote ApiVersions v3; broker: seed_0, bytes_written: 31, write_wait: 25.482µs, time_to_write: 60.307µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: seed_0, bytes_read: 121, read_wait: 57.926µs, time_to_read: 93.87306ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: seed_0
[DEBUG] wrote SASLHandshake v1; broker: seed_0, bytes_written: 24, write_wait: 23.095µs, time_to_write: 95.883µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: seed_0, bytes_read: 34, read_wait: 28.925µs, time_to_read: 28.827263ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: seed_0, addr: us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: seed_0, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: seed_0, bytes_written: 3171, write_wait: 21.819µs, time_to_write: 100.298µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: seed_0, bytes_read: 16, read_wait: 25.655µs, time_to_read: 112.596419ms, err: <nil>
[DEBUG] connection initialized successfully; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[DEBUG] wrote ApiVersions v0; broker: seed_0, bytes_written: 17, write_wait: 561.566766ms, time_to_write: 78.757µs, err: <nil>
[DEBUG] read ApiVersions v0; broker: seed_0, bytes_read: 104, read_wait: 52.064µs, time_to_read: 26.423359ms, err: <nil>
ping successful
[DEBUG] wrote Metadata v1; broker: seed_0, bytes_written: 97, write_wait: 24.805µs, time_to_write: 66.166µs, err: <nil>
[DEBUG] read Metadata v1; broker: seed_0, bytes_read: 311, read_wait: 53.498µs, time_to_read: 97.933226ms, err: <nil>
[DEBUG] sharded request; req: ListOffsets, destinations: [601576412]
[DEBUG] opening connection to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] connection opened to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] issuing api versions request; broker: 601576412, version: 3
[DEBUG] wrote ApiVersions v3; broker: 601576412, bytes_written: 31, write_wait: 8.396µs, time_to_write: 40.008µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 601576412, bytes_read: 121, read_wait: 29.666µs, time_to_read: 4.186362683s, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 601576412
[DEBUG] wrote SASLHandshake v1; broker: 601576412, bytes_written: 24, write_wait: 12.314µs, time_to_write: 70.745µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 601576412, bytes_read: 34, read_wait: 27.369µs, time_to_read: 1.612004859s, err: <nil>
[DEBUG] beginning sasl authentication; broker: 601576412, addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 601576412, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 601576412, bytes_written: 3171, write_wait: 12.596µs, time_to_write: 81.153µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 601576412, bytes_read: 16, read_wait: 24.277µs, time_to_read: 4.0765286s, err: <nil>
[DEBUG] connection initialized successfully; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] wrote ListOffsets v4; broker: 601576412, bytes_written: 170, write_wait: 17.666475959s, time_to_write: 69.933µs, err: <nil>
[DEBUG] read ListOffsets v4; broker: 601576412, bytes_read: 0, read_wait: 72.942µs, time_to_read: 2.009925653s, err: EOF
[DEBUG] read from broker errored, killing connection; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412, successful_reads: 0, err: EOF
[DEBUG] sharded request failed, resharding and reissuing; req: ListOffsets, time_since_start: 21.776269151s, tries: 0, err: EOF
[DEBUG] opening connection to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] connection opened to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] issuing SASLHandshakeRequest; broker: 601576412
[DEBUG] wrote SASLHandshake v1; broker: 601576412, bytes_written: 24, write_wait: 22.15µs, time_to_write: 43.205µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 601576412, bytes_read: 34, read_wait: 23.229µs, time_to_read: 61.762482ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 601576412, addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 601576412, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 601576412, bytes_written: 3171, write_wait: 19.298µs, time_to_write: 204.905µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 601576412, bytes_read: 16, read_wait: 53.882µs, time_to_read: 93.00273ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] wrote Metadata v1; broker: 601576412, bytes_written: 97, write_wait: 210.042753ms, time_to_write: 64.377µs, err: <nil>
[DEBUG] read Metadata v1; broker: 601576412, bytes_read: 311, read_wait: 32.152µs, time_to_read: 77.616231ms, err: <nil>
[DEBUG] sharded request; req: ListOffsets, destinations: [601576412]
[DEBUG] wrote ListOffsets v4; broker: 601576412, bytes_written: 170, write_wait: 41.742µs, time_to_write: 65.736µs, err: <nil>
[DEBUG] read ListOffsets v4; broker: 601576412, bytes_read: 0, read_wait: 40.728µs, time_to_read: 107.640338ms, err: EOF
[DEBUG] read from broker errored, killing connection; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412, successful_reads: 1, err: EOF
[DEBUG] sharded request failed, resharding and reissuing; req: ListOffsets, time_since_start: 26.172443652s, tries: 1, err: EOF
[DEBUG] wrote Metadata v1; broker: seed_0, bytes_written: 97, write_wait: 100.653µs, time_to_write: 73.381µs, err: <nil>
[DEBUG] read Metadata v1; broker: seed_0, bytes_read: 311, read_wait: 40.208µs, time_to_read: 74.868222ms, err: <nil>
[DEBUG] sharded request; req: ListOffsets, destinations: [601576412]
[DEBUG] opening connection to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] connection opened to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] issuing SASLHandshakeRequest; broker: 601576412
[DEBUG] wrote SASLHandshake v1; broker: 601576412, bytes_written: 24, write_wait: 8.022µs, time_to_write: 41.77µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 601576412, bytes_read: 34, read_wait: 25.742µs, time_to_read: 125.252305ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 601576412, addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 601576412, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 601576412, bytes_written: 3171, write_wait: 12.089µs, time_to_write: 99.778µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 601576412, bytes_read: 16, read_wait: 25.188µs, time_to_read: 113.316354ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] wrote ListOffsets v4; broker: 601576412, bytes_written: 170, write_wait: 301.218695ms, time_to_write: 74.49µs, err: <nil>
[DEBUG] read ListOffsets v4; broker: 601576412, bytes_read: 0, read_wait: 39.3µs, time_to_read: 101.338719ms, err: EOF
[DEBUG] read from broker errored, killing connection; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412, successful_reads: 0, err: EOF
list offset error EOF partitions 0 1

3) For generation ID error during joining a group, later we found it could work sometimes without our notice. But I have the following debug dump. options used for the client:

        maxPartBytes := int32(64e6) // 64MB
    tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
    opts := []kgo.Opt{
        kgo.SeedBrokers(strings.Split(brokers, ",")...),
        kgo.ConsumeResetOffset(kgo.NewOffset().At(0)), // if offset is OutOfRange
        kgo.ConsumeTopics(topic),
        kgo.ConsumerGroup(consumerGroup),
        kgo.FetchMaxBytes(maxPartBytes),
        kgo.FetchMaxPartitionBytes(maxPartBytes),
        kgo.FetchMaxWait(time.Second),
        kgo.DisableAutoCommit(),
        kgo.RetryBackoffFn(func(n int) time.Duration {
            return 1 << (n % 6) * time.Second // up to 32 seconds
        }),
        kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, nil)),

        // SASL Options
        kgo.SASL(plain.Auth{
            User: user,
            Pass: password,
        }.AsMechanism()),

        // Configure TLS. Uses SystemCertPool for RootCAs by default.
        kgo.Dialer(tlsDialer.DialContext),
    }

Debug output: (the project number is redacted to 999999, nothing else touched)

dialing the broker
client created
[DEBUG] opening connection to broker; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[INFO] immediate metadata update triggered; why: querying metadata for consumer initialization
[DEBUG] connection opened to broker; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[DEBUG] issuing api versions request; broker: seed_0, version: 3
[DEBUG] wrote ApiVersions v3; broker: seed_0, bytes_written: 31, write_wait: 25.213µs, time_to_write: 64.328µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: seed_0, bytes_read: 121, read_wait: 40.711µs, time_to_read: 90.902544ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: seed_0
[DEBUG] wrote SASLHandshake v1; broker: seed_0, bytes_written: 24, write_wait: 16.291µs, time_to_write: 72.758µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: seed_0, bytes_read: 34, read_wait: 23.459µs, time_to_read: 25.223858ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: seed_0, addr: us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: seed_0, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: seed_0, bytes_written: 3171, write_wait: 21.218µs, time_to_write: 93.008µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: seed_0, bytes_read: 16, read_wait: 53.573µs, time_to_read: 96.156686ms, err: <nil>
[DEBUG] connection initialized successfully; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[DEBUG] wrote ApiVersions v0; broker: seed_0, bytes_written: 17, write_wait: 263.599461ms, time_to_write: 75.754µs, err: <nil>
[DEBUG] wrote Metadata v1; broker: seed_0, bytes_written: 97, write_wait: 263.491697ms, time_to_write: 48.777µs, err: <nil>
[DEBUG] read ApiVersions v0; broker: seed_0, bytes_read: 104, read_wait: 173.979µs, time_to_read: 31.487455ms, err: <nil>
ping successful
[DEBUG] read Metadata v1; broker: seed_0, bytes_read: 311, read_wait: 31.744518ms, time_to_read: 67.250508ms, err: <nil>
[INFO] beginning to manage the group lifecycle; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub
[DEBUG] blocking commits from join&sync
[INFO] joining group; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub
[DEBUG] sharded request; req: FindCoordinator, destinations: [any]
[DEBUG] opening connection to broker; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] connection opened to broker; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] issuing api versions request; broker: 6543210, version: 3
[DEBUG] wrote ApiVersions v3; broker: 6543210, bytes_written: 31, write_wait: 15.299µs, time_to_write: 33.365µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 6543210, bytes_read: 121, read_wait: 35.054µs, time_to_read: 63.922143ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 6543210
[DEBUG] wrote SASLHandshake v1; broker: 6543210, bytes_written: 24, write_wait: 20.108µs, time_to_write: 79.264µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 6543210, bytes_read: 34, read_wait: 20.389µs, time_to_read: 29.620993ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 6543210, addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 6543210, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 6543210, bytes_written: 3171, write_wait: 12.339µs, time_to_write: 58.72µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 6543210, bytes_read: 16, read_wait: 26.97µs, time_to_read: 81.334575ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] sharded request failed, resharding and reissuing; req: FindCoordinator, time_since_start: 236.317357ms, tries: 0, err: broker is too old; the broker has already indicated it will not know how to handle the request
[DEBUG] sharded request; req: FindCoordinator, destinations: [any]
[DEBUG] opening connection to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] connection opened to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] issuing api versions request; broker: 601576412, version: 3
[DEBUG] wrote ApiVersions v3; broker: 601576412, bytes_written: 31, write_wait: 8.965µs, time_to_write: 39.214µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 601576412, bytes_read: 121, read_wait: 22.419µs, time_to_read: 78.209956ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 601576412
[DEBUG] wrote SASLHandshake v1; broker: 601576412, bytes_written: 24, write_wait: 20.101µs, time_to_write: 51.711µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 601576412, bytes_read: 34, read_wait: 26.435µs, time_to_read: 29.73184ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 601576412, addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 601576412, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 601576412, bytes_written: 3171, write_wait: 17.135µs, time_to_write: 83.713µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 601576412, bytes_read: 16, read_wait: 32.505µs, time_to_read: 79.581843ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] wrote FindCoordinator v1; broker: 601576412, bytes_written: 94, write_wait: 242.929313ms, time_to_write: 60.657µs, err: <nil>
[DEBUG] read FindCoordinator v1; broker: 601576412, bytes_read: 62, read_wait: 33.085µs, time_to_read: 22.345507ms, err: <nil>
[DEBUG] opening connection to broker; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] connection opened to broker; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] issuing SASLHandshakeRequest; broker: 6543210
[DEBUG] wrote SASLHandshake v1; broker: 6543210, bytes_written: 24, write_wait: 7.908µs, time_to_write: 44.185µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 6543210, bytes_read: 34, read_wait: 24.625µs, time_to_read: 74.207018ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 6543210, addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 6543210, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 6543210, bytes_written: 3171, write_wait: 14.443µs, time_to_write: 101.703µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 6543210, bytes_read: 16, read_wait: 25.296µs, time_to_read: 103.60413ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] wrote JoinGroup v0; broker: 6543210, bytes_written: 241, write_wait: 234.083417ms, time_to_write: 52.845µs, err: <nil>
[DEBUG] read JoinGroup v0; broker: 6543210, bytes_read: 114, read_wait: 41.635µs, time_to_read: 32.307738ms, err: <nil>
[INFO] joined; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, member_id: 7a749616-40e7-4480-9057-5c910dda1bb2, instance_id: <nil>, generation: -1, leader: false
[INFO] syncing; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, protocol_type: consumer, protocol: cooperative-sticky
[DEBUG] wrote SyncGroup v0; broker: 6543210, bytes_written: 139, write_wait: 26.995µs, time_to_write: 64.549µs, err: <nil>
[DEBUG] read SyncGroup v0; broker: 6543210, bytes_read: 24, read_wait: 40.657µs, time_to_read: 29.077189ms, err: <nil>
[INFO] synced; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, assigned: 
[DEBUG] unblocking commits from join&sync
[INFO] new group session begun; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, added: , lost: 
[INFO] beginning heartbeat loop; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub
[DEBUG] entering OnPartitionsAssigned; with: map[]
[DEBUG] heartbeating; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub
[DEBUG] wrote Heartbeat v0; broker: 6543210, bytes_written: 135, write_wait: 72.538µs, time_to_write: 264.907µs, err: <nil>
[DEBUG] read Heartbeat v0; broker: 6543210, bytes_read: 10, read_wait: 88.161µs, time_to_read: 21.350199ms, err: <nil>
[DEBUG] heartbeat complete; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, err: ILLEGAL_GENERATION: Specified group generation id is not valid.
[INFO] heartbeat errored; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, err: ILLEGAL_GENERATION: Specified group generation id is not valid.
[DEBUG] entering OnPartitionsLost; with: map[]
[INFO] injecting fake fetch with an error; err: unable to join group session: ILLEGAL_GENERATION: Specified group generation id is not valid., why: notification of group management loop error
[INFO] assigning partitions; why: clearing assignment at end of group management session, how: unassigning everything, input: 
[ERROR] join and sync loop errored; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, err: ILLEGAL_GENERATION: Specified group generation id is not valid., consecutive_errors: 1, backoff: 2s
Error from reader: unable to join group session: ILLEGAL_GENERATION: Specified group generation id is not valid.
[DEBUG] blocking commits from join&sync
[INFO] joining group; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub
[DEBUG] wrote JoinGroup v0; broker: 6543210, bytes_written: 273, write_wait: 31.085µs, time_to_write: 139.547µs, err: <nil>
[DEBUG] read JoinGroup v0; broker: 6543210, bytes_read: 114, read_wait: 48.222µs, time_to_read: 19.503683ms, err: <nil>
[INFO] joined; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, member_id: 7a749616-40e7-4480-9057-5c910dda1bb2, instance_id: <nil>, generation: -1, leader: false
[INFO] syncing; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, protocol_type: consumer, protocol: cooperative-sticky
[DEBUG] wrote SyncGroup v0; broker: 6543210, bytes_written: 139, write_wait: 16.731µs, time_to_write: 50.95µs, err: <nil>
[DEBUG] read SyncGroup v0; broker: 6543210, bytes_read: 24, read_wait: 30.239µs, time_to_read: 20.315476ms, err: <nil>
[INFO] synced; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, assigned: 
[DEBUG] unblocking commits from join&sync
[INFO] new group session begun; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, added: , lost: 
[INFO] beginning heartbeat loop; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub
[DEBUG] entering OnPartitionsAssigned; with: map[]
[DEBUG] heartbeating; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub
[DEBUG] wrote Heartbeat v0; broker: 6543210, bytes_written: 135, write_wait: 32.325µs, time_to_write: 168.002µs, err: <nil>
[DEBUG] read Heartbeat v0; broker: 6543210, bytes_read: 10, read_wait: 65.725µs, time_to_read: 24.083579ms, err: <nil>
[DEBUG] heartbeat complete; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, err: ILLEGAL_GENERATION: Specified group generation id is not valid.
[INFO] heartbeat errored; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, err: ILLEGAL_GENERATION: Specified group generation id is not valid.
[DEBUG] entering OnPartitionsLost; with: map[]
[INFO] injecting fake fetch with an error; err: unable to join group session: ILLEGAL_GENERATION: Specified group generation id is not valid., why: notification of group management loop error
[INFO] assigning partitions; why: clearing assignment at end of group management session, how: unassigning everything, input: 
[ERROR] join and sync loop errored; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, err: ILLEGAL_GENERATION: Specified group generation id is not valid., consecutive_errors: 2, backoff: 4s
Error from reader: unable to join group session: ILLEGAL_GENERATION: Specified group generation id is not valid.
... (and it just repeats)
twmb commented 1 year ago

re 2) Can you issue the start and end as two separate requests? I think Kafka proper rejects the request as invalid if there are duplicate partitions. However, the client should be getting INVALID_REQUEST rather than the connection closed. Also you should just be able to use kadm.List{Start,End}Offsets here -- the only extra thing these functions do is a metadata request to discover the partitions in the topic.


re 3) I think you want RetryBackoffFn to use / and not %.

This is weird:

[DEBUG] wrote JoinGroup v0; broker: 6543210, bytes_written: 241, write_wait: 234.083417ms, time_to_write: 52.845µs, err: <nil>
[DEBUG] read JoinGroup v0; broker: 6543210, bytes_read: 114, read_wait: 41.635µs, time_to_read: 32.307738ms, err: <nil>
[INFO] joined; group: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, member_id: 7a749616-40e7-4480-9057-5c910dda1bb2, instance_id: <nil>, generation: -1, leader: false

The response to the JoinGroup is success, but with a -1 generation, which is invalid. Also, the client is not the leader -- was there another client running at the same time? This is also accepted in the sync group, but then finally rejected at the first heartbeat that follows. You can see in the JoinGroup responses that follow that the generation returned is always -1 -- this is not a valid generation.

ns-gzhang commented 1 year ago

Thanks Travis @twmb again. Appreciate you taking time to look into this.

re 2) Sorry I had typo, meant to fetch partitions 0 and 1. I've tried with correction, as well as pretty much all the combinations with partition 0 and 1 with start offsets only, with partition 0 start offset only, the errors looked similar/same. all EOF. I used kadm.List{Start,End}Offsets() before, and they seemed to "blame" that requests with too broader scope were sent (fetching all topics/partitions - then filtering out to keep requested ones), so I switched to more specific raw requests. Made no differences.

Ironically, they claimed to support Confluent cgo Kafka driver. I tested it, it did not do any better on offset fetching.

re 3) RetryBackoffFn used % instead of / - it's intentional. We don't want to extend the backoff time indefinitely, but cycle through 1-32 seconds.

I was the only one doing the testing on the topic/subscription.

So basically we could not use consumer group to read with kgo. Interestingly, partition-reading without a consumer group worked well (w/ option kgo.ConsumePartitions(topicPartitionOffset),), and I could even commit out-of-band with a fake consumer group name (PSL did not care) for the monitoring tool to track lags. (but fetchOffsets didn't work.) Our major use cases are reading from partitions. Maybe we can switch consumer group reading to partition reading. I assume that level interface is stable and less troublesome. What do you think? (the problem is that PSL support refused to support/test kgo driver and we would be in trouble if any new issue pops up during production...)

(a side note, Confluent cgo driver was able to consume messages through consumer group after some initial struggle on connection with multiple error messages from the driver - looks like their connection timeout is too short.)


Another challenge we experienced is the producer. We want to have manual routing of messages (w/ option kgo.RecordPartitioner(kgo.ManualPartitioner()),). It worked flawlessly from day 1, and worked well for a long time during our testing. But one day suddenly it didn't work. And then it worked again after many days. Seems to be transient service reliability issue (partition-reading continued to work during that period of time).

twmb commented 1 year ago

level interface

What do you mean level interface?

Also -- Confluent driver -- so, you're able to group consume using their client with no problems? If yes -- do you know if there's a log level / logger you can enable to see how they handle the -1 member ID response? I wonder if they're just converting it to 0. I just peeked the librdkafka source and I'm looking for a line that says JoinGroup response: GenerationId, as well as a corresponding heartbeat log Heartbeat for group. The librdkafka source doesn't do any special handling for -1 responses.


ManualPartitioner -- there's no magic here, this really sounds like a Google problem -- was one partition unavailable for a while? I wonder if Google normally expects people to retry on other partitions.

ns-gzhang commented 1 year ago

Thanks @twmb again!

that level interface is stable ...

I assumed that the consumer group interface is one level above the partition-reading interface. So "that level interface" meant to be "partition-reading interface".

Re 2) One good news on OffsetFetch is that fetching commit offsets with the following code worked correctly, FYI.

        ofrTopic := kmsg.OffsetFetchRequestTopic{
        Topic:      topic,
        Partitions: []int32{0, 1},
    }
    ofr := kmsg.OffsetFetchRequest{
        Version: 7,
        Group:   topic,
        Topics:  []kmsg.OffsetFetchRequestTopic{ofrTopic},
    }

    resp, err := client.Request(context.Background(), &ofr)

Re 3) Consumer group debug info: I enabled debug for "consumer,cgrp" for the Confluent cgo driver. Here is the dump from the beginning to receiving the first message (like you said, it didn't seem to care about -1 for generation id):

%7|1691077959.277|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "unused": updating member id "(not-set)" -> ""
%7|1691077959.277|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state init -> query-coord (join-state init)
%7|1691077959.278|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v2.1.1 (0x20101ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x2100)
%7|1691077959.278|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%7|1691077959.278|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op SUBSCRIBE in state query-coord (join-state init)
%7|1691077959.278|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "unused": subscribe to new subscription of 1 topics (join-state init)
%7|1691077959.278|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%3|1691077959.288|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 10ms in state CONNECT)
2023-08-03T15:52:39.288Z    ERROR   kafka/kafka_consumer.go:285 read message error:sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 10ms in state CONNECT)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:285
2023-08-03T15:52:39.288Z    ERROR   kafka/kafka_consumer.go:285 read message error:1/1 brokers are down
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:285
%7|1691077960.277|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
... <the above line is repeated 18 times removed here>
%7|1691077961.278|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%3|1691077961.280|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 3ms in state CONNECT, 1 identical error(s) suppressed)
2023-08-03T15:52:41.280Z    ERROR   kafka/kafka_consumer.go:285 read message error:sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 3ms in state CONNECT, 1 identical error(s) suppressed)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:285
%7|1691077961.528|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused": querying for coordinator: intervaled in state query-coord
%7|1691077961.528|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state query-coord -> wait-coord (join-state init)
%7|1691077961.530|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused" coordinator is us-west1-kafka-pubsub.googleapis.com:443 id 6543210
%7|1691077961.530|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: Group "unused" changing coordinator -1 -> 6543210
%7|1691077961.530|COORDSET|rdkafka#consumer-1| [thrd:main]: Group "unused" coordinator set to broker sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/6543210
%7|1691077961.530|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state wait-coord -> wait-broker-transport (join-state init)
%7|1691077961.531|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused": querying for coordinator: intervaled in state wait-broker-transport
%7|1691077961.531|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused" coordinator is us-west1-kafka-pubsub.googleapis.com:443 id 6543210
%3|1691077961.533|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: us-west1-kafka-pubsub.googleapis.com:443: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 2ms in state CONNECT)
2023-08-03T15:52:41.534Z    ERROR   kafka/kafka_consumer.go:285 read message error:GroupCoordinator: us-west1-kafka-pubsub.googleapis.com:443: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 2ms in state CONNECT)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:285
%7|1691077961.608|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state wait-broker-transport -> up (join-state init)
%7|1691077961.608|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 0 subscribed topic(s)
%7|1691077961.608|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)
%7|1691077961.608|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": postponing join until up-to-date metadata is available
%7|1691077961.608|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-metadata (state up)
%7|1691077961.608|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077961.608|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.608|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.608|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.608|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.608|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state wait-metadata (rebalance rejoin=false)
%7|1691077961.701|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Group "unused": effective subscription list changed from 0 to 1 topic(s):
%7|1691077961.701|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]:  Topic projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub with 2 partition(s)
%7|1691077961.701|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": subscription updated from metadata change: rejoining group in state wait-metadata
%7|1691077961.701|GRPLEADER|rdkafka#consumer-1| [thrd:main]: Group "unused": resetting group leader info: group (re)join
%7|1691077961.701|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused" (re)joining in join-state wait-metadata with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
%7|1691077961.701|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "unused" initiating rebalance (NONE) in state up (join-state wait-metadata) with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
%7|1691077961.701|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": Rejoining group without an assignment: Metadata for subscribed topic(s) has changed
%7|1691077961.701|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-metadata -> init (state up)
%7|1691077961.701|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 1 subscribed topic(s)
%7|1691077961.701|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)
%7|1691077961.701|JOIN|rdkafka#consumer-1| [thrd:main]: sasl_ssl://6543210.us-west1-kafka-pubsub.googleapis.com:443/6543210: Joining group "unused" with 1 subscribed topic(s) and member id ""
%7|1691077961.701|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-join (state up)
%7|1691077961.707|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol range, LeaderId 257b057c-64c9-4ac8-a779-e96c915c1e7c, my MemberId e1bb40c5-665b-4bf2-9527-334952e93436, member metadata count 0: (no error)
%7|1691077961.707|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "unused": updating member id "" -> "e1bb40c5-665b-4bf2-9527-334952e93436"
%7|1691077961.707|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-join -> wait-sync (state up)
%7|1691077961.708|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (10 bytes of MemberState data)
%7|1691077961.708|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.708|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-sync -> wait-assign-call (state up)
%7|1691077961.708|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": delegating assign of 0 partition(s) to application on queue rd_kafka_cgrp_new: new assignment
%7|1691077961.709|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": setting group assignment to 0 partition(s)
%7|1691077961.709|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077961.709|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-assign-call)
%7|1691077961.709|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op ASSIGN in state up (join-state wait-assign-call)
%7|1691077961.709|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": new assignment of 0 partition(s) in join-state wait-assign-call
%7|1691077961.709|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: No current assignment to clear
%7|1691077961.709|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Added 0 partition(s) to assignment which now consists of 0 partition(s) where of 0 are in pending state and 0 are being queried
%7|1691077961.709|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-assign-call -> steady (state up)
%7|1691077961.709|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077961.709|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.709|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.709|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.709|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077961.709|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077964.277|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077964.277|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077964.277|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077964.277|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077964.277|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077964.277|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077964.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077967.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077969.277|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077969.277|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077969.277|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077969.277|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077969.277|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077969.277|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077970.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077973.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077974.277|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077974.277|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077974.277|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077974.277|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077974.277|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077974.277|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077976.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077979.277|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077979.277|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077979.277|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077979.277|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077979.277|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077979.277|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077979.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077982.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077984.277|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077984.277|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077984.277|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077984.277|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077984.277|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077984.277|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077985.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077988.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077989.277|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077989.277|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077989.278|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077989.278|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077989.278|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077989.278|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077991.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077994.277|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077994.278|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.278|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.278|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.278|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.278|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077994.709|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1691077994.710|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "unused" heartbeat error response in state up (join-state steady, 0 partition(s) assigned): Broker: Group rebalance in progress
%7|1691077994.710|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "unused" is rebalancing (EAGER) in state up (join-state steady) with 0 assigned partition(s): rebalance in progress
%7|1691077994.710|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state steady -> wait-unassign-call (state up)
%7|1691077994.710|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": delegating revoke of 0 partition(s) to application on queue rd_kafka_cgrp_new: rebalance in progress
%7|1691077994.711|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": clearing group assignment
%7|1691077994.711|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-unassign-call)
%7|1691077994.711|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op ASSIGN in state up (join-state wait-unassign-call)
%7|1691077994.711|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: No current assignment to clear
%7|1691077994.711|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-unassign-call -> wait-unassign-to-complete (state up)
%7|1691077994.711|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077994.711|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.711|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.711|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.711|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.711|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state wait-unassign-to-complete (rebalance rejoin=false)
%7|1691077994.711|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": unassign done in state up (join-state wait-unassign-to-complete)
%7|1691077994.711|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": Rejoining group without an assignment: Unassignment done
%7|1691077994.711|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-unassign-to-complete -> init (state up)
%7|1691077994.711|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 1 subscribed topic(s)
%7|1691077994.711|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (33010ms old)
%7|1691077994.711|JOIN|rdkafka#consumer-1| [thrd:main]: sasl_ssl://6543210.us-west1-kafka-pubsub.googleapis.com:443/6543210: Joining group "unused" with 1 subscribed topic(s) and member id "e1bb40c5-665b-4bf2-9527-334952e93436"
%7|1691077994.711|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-join (state up)
%7|1691077994.717|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId 0, Protocol range, LeaderId 18b49206-aaaf-44fd-a9e3-77566d16c213, my MemberId e1bb40c5-665b-4bf2-9527-334952e93436, member metadata count 0: (no error)
%7|1691077994.717|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-join -> wait-sync (state up)
%7|1691077994.718|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (98 bytes of MemberState data)
%7|1691077994.718|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1691077994.718|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset INVALID
%7|1691077994.718|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset INVALID
%7|1691077994.718|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-sync -> wait-assign-call (state up)
%7|1691077994.718|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": delegating assign of 2 partition(s) to application on queue rd_kafka_cgrp_new: new assignment
%7|1691077994.718|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": setting group assignment to 2 partition(s)
%7|1691077994.718|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1691077994.718|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset INVALID
%7|1691077994.718|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset INVALID
%7|1691077994.718|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-assign-call)
%7|1691077994.718|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op ASSIGN in state up (join-state wait-assign-call)
%7|1691077994.718|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": new assignment of 2 partition(s) in join-state wait-assign-call
%7|1691077994.718|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: No current assignment to clear
%7|1691077994.718|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Added 2 partition(s) to assignment which now consists of 2 partition(s) where of 2 are in pending state and 0 are being queried
%7|1691077994.718|PAUSE|rdkafka#consumer-1| [thrd:main]: Resuming fetchers for 2 assigned partition(s): assign called
%7|1691077994.718|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-assign-call -> steady (state up)
%7|1691077994.718|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077994.718|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1691077994.718|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1691077994.718|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1691077994.718|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1691077994.718|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1691077994.718|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1691077994.718|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.718|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.718|SRVPEND|rdkafka#consumer-1| [thrd:main]: Querying committed offset for pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1691077994.718|SRVPEND|rdkafka#consumer-1| [thrd:main]: Querying committed offset for pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
%7|1691077994.719|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Fetching committed offsets for 2 pending partition(s) in assignment
%7|1691077994.719|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Group unused OffsetFetchRequest(v0) for 2/2 partition(s)
%7|1691077994.719|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Fetch committed offsets for 2/2 partition(s)
%7|1691077994.719|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Current assignment of 2 partition(s) with 2 pending adds, 2 offset queries, 0 partitions awaiting stop and 0 offset commits in progress
%7|1691077994.750|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Adding projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] back to pending list with offset 43
%7|1691077994.750|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Adding projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] back to pending list with offset 318
%7|1691077994.750|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1691077994.750|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1691077994.750|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1691077994.750|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1691077994.750|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1691077994.750|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset 43
%7|1691077994.750|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset 318
%7|1691077994.750|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.750|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1691077994.750|SRVPEND|rdkafka#consumer-1| [thrd:main]: Starting pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] at offset 318 (leader epoch -1)
%7|1691077994.751|SRVPEND|rdkafka#consumer-1| [thrd:main]: Starting pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] at offset 43 (leader epoch -1)
%7|1691077994.751|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1691077994.751|FETCH|rdkafka#consumer-1| [thrd:main]: Partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] start fetching at offset 318 (leader epoch -1)
%7|1691077994.751|FETCH|rdkafka#consumer-1| [thrd:main]: Partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] start fetching at offset 43 (leader epoch -1)
%7|1691077994.751|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op PARTITION_JOIN in state up (join-state steady) for projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1691077994.751|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "unused": add projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1691077994.751|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op PARTITION_JOIN in state up (join-state steady) for projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
%7|1691077994.751|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "unused": add projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
%3|1691077994.759|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/6]: sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/601576412: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 7ms in state CONNECT)
%3|1691077994.759|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://601576413.us-west1-kafka-pubsub.googleapis.com:443/6]: sasl_ssl://601576413.us-west1-kafka-pubsub.googleapis.com:443/601576413: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 8ms in state CONNECT)
2023-08-03T15:53:14.759Z    ERROR   kafka/kafka_consumer.go:285 read message error:sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/601576412: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 7ms in state CONNECT)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:285
2023-08-03T15:53:14.759Z    ERROR   kafka/kafka_consumer.go:285 read message error:sasl_ssl://601576413.us-west1-kafka-pubsub.googleapis.com:443/601576413: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 8ms in state CONNECT)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:285
%3|1691077995.762|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/6]: sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/601576412: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 3ms in state CONNECT, 1 identical error(s) suppressed)
2023-08-03T15:53:15.763Z    ERROR   kafka/kafka_consumer.go:285 read message error:sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/601576412: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 3ms in state CONNECT, 1 identical error(s) suppressed)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:285
received a message   p: 1  offset: 318
ns-gzhang commented 1 year ago

was one partition unavailable for a while?

Both partitions of my test topic were not working for writing with kgo, for at least a few days probably, actually for a broader scope as we had many topics in our testing environment all not working. We didn't check all the time during that period, then we decided to switch back to using Kafka instead...

twmb commented 1 year ago

Can you try using the Range balancer with franz-go? librdkafka also used -1 for the generation while heartbeating -- same as franz-go -- so I wonder if the cooperative rebalancing is posing problematic (i.e., does Google not support a rejoin so quickly?)

ns-gzhang commented 1 year ago

Thanks @twmb Good news. After I changed to use the Range balancer option, it eventually succeeded in reading messages, after backoff a few times at 16s backoff retry. Here is the debug dump close to the last backoff:

...
[INFO] new group session begun; group: unused, added: , lost: 
[INFO] beginning heartbeat loop; group: unused
[DEBUG] entering OnPartitionsAssigned; with: map[]
[DEBUG] heartbeating; group: unused
[DEBUG] wrote Heartbeat v0; broker: 6543210, bytes_written: 67, write_wait: 29.649µs, time_to_write: 185.495µs, err: <nil>
[DEBUG] read Heartbeat v0; broker: 6543210, bytes_read: 10, read_wait: 70.302µs, time_to_read: 28.44802ms, err: <nil>
[DEBUG] heartbeat complete; group: unused, err: ILLEGAL_GENERATION: Specified group generation id is not valid.
[INFO] heartbeat errored; group: unused, err: ILLEGAL_GENERATION: Specified group generation id is not valid.
[DEBUG] entering OnPartitionsLost; with: map[]
[INFO] injecting fake fetch with an error; err: unable to join group session: ILLEGAL_GENERATION: Specified group generation id is not valid., why: notification of group management loop error
[INFO] assigning partitions; why: clearing assignment at end of group management session, how: unassigning everything, input: 
[ERROR] join and sync loop errored; group: unused, err: ILLEGAL_GENERATION: Specified group generation id is not valid., consecutive_errors: 4, backoff: 16s
[INFO] immediate metadata update triggered; why: waitmeta during join & sync error backoff
Error from reader: unable to join group session: ILLEGAL_GENERATION: Specified group generation id is not valid.
[DEBUG] wrote Metadata v1; broker: 601576412, bytes_written: 97, write_wait: 28.581µs, time_to_write: 129.682µs, err: <nil>
[DEBUG] read Metadata v1; broker: 601576412, bytes_read: 311, read_wait: 65.587µs, time_to_read: 78.941081ms, err: <nil>
[DEBUG] metadata refresh has identical topic partition data; topic: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, partition: 0, leader: 601576412, leader_epoch: -1
[DEBUG] metadata refresh has identical topic partition data; topic: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub, partition: 1, leader: 601576413, leader_epoch: -1
[DEBUG] reaped connections; time_since_last_reap: 20.000106863s, reap_dur: 607.975µs, num_reaped: 2
[DEBUG] blocking commits from join&sync
[INFO] joining group; group: unused
[DEBUG] wrote JoinGroup v0; broker: 6543210, bytes_written: 188, write_wait: 125.634µs, time_to_write: 235.469µs, err: <nil>
[DEBUG] read JoinGroup v0; broker: 6543210, bytes_read: 101, read_wait: 95.011µs, time_to_read: 39.022475ms, err: <nil>
[INFO] joined; group: unused, member_id: 504cbdcb-ba4b-4258-9ea6-f3cfd0e35f0e, instance_id: <nil>, generation: 0, leader: false
[INFO] syncing; group: unused, protocol_type: consumer, protocol: range
[DEBUG] wrote SyncGroup v0; broker: 6543210, bytes_written: 71, write_wait: 21.188µs, time_to_write: 51.831µs, err: <nil>
[DEBUG] read SyncGroup v0; broker: 6543210, bytes_read: 112, read_wait: 33.224µs, time_to_read: 23.066915ms, err: <nil>
[INFO] synced; group: unused, assigned: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub[0 1]
[DEBUG] unblocking commits from join&sync
[INFO] new group session begun; group: unused, added: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub[0 1], lost: 
[INFO] beginning heartbeat loop; group: unused
[DEBUG] entering OnPartitionsAssigned; with: map[projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub:[0 1]]
[DEBUG] sharded request; req: OffsetFetch, destinations: [6543210]
[DEBUG] sharded request failed, resharding and reissuing; req: OffsetFetch, time_since_start: 110.142µs, tries: 0, err: broker is too old; the broker has already indicated it will not know how to handle the request
[DEBUG] sharded request; req: OffsetFetch, destinations: [6543210]
[DEBUG] wrote OffsetFetch v0; broker: 6543210, bytes_written: 117, write_wait: 46.289µs, time_to_write: 125.842µs, err: <nil>
[DEBUG] read OffsetFetch v0; broker: 6543210, bytes_read: 124, read_wait: 135.329µs, time_to_read: 55.839703ms, err: <nil>
[INFO] assigning partitions; why: newly fetched offsets for group unused, how: assigning everything new, keeping current assignment, input: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub[0{43 e-1 ce0} 1{368 e-1 ce0}]
[DEBUG] assign requires loading offsets
[DEBUG] opening connection to broker; addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576413
[DEBUG] opening connection to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] connection opened to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] issuing SASLHandshakeRequest; broker: 601576412
[DEBUG] wrote SASLHandshake v1; broker: 601576412, bytes_written: 24, write_wait: 13.613µs, time_to_write: 49.577µs, err: <nil>
[DEBUG] connection opened to broker; addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576413
[DEBUG] issuing SASLHandshakeRequest; broker: 601576413
[DEBUG] wrote SASLHandshake v1; broker: 601576413, bytes_written: 24, write_wait: 11.471µs, time_to_write: 73.866µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 601576412, bytes_read: 34, read_wait: 25.406µs, time_to_read: 65.803104ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 601576412, addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 601576412, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 601576412, bytes_written: 3171, write_wait: 12.159µs, time_to_write: 101.407µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 601576413, bytes_read: 34, read_wait: 22.828µs, time_to_read: 69.414952ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 601576413, addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 601576413, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 601576413, bytes_written: 3171, write_wait: 11.869µs, time_to_write: 151.899µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 601576412, bytes_read: 16, read_wait: 25.199µs, time_to_read: 81.847852ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] wrote Fetch v4; broker: 601576412, bytes_written: 134, write_wait: 198.952764ms, time_to_write: 56.327µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 601576413, bytes_read: 16, read_wait: 37.924µs, time_to_read: 136.243076ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576413
[DEBUG] wrote Fetch v4; broker: 601576413, bytes_written: 134, write_wait: 262.146103ms, time_to_write: 130.948µs, err: <nil>
[DEBUG] read Fetch v4; broker: 601576413, bytes_read: 1018702, read_wait: 76.966µs, time_to_read: 610.423072ms, err: <nil>
[DEBUG] updated uncommitted; group: unused, to: projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub[1{368=>1377 r1009}]
from an iterator! partition 1 offset 368
from an iterator! partition 1 offset 369
twmb commented 1 year ago

I wonder if PSL doesn't expect two joins so quickly, and has some bad timing issue on the backend that generates a new generation but doesn't return it to the client right away.

So summary of issues:

Is there anything else? Wondering how quickly we can move this to close :)

ns-gzhang commented 1 year ago

Appreciate your summary @twmb !

Bullet 2. As I mentioned, it actually could work under some rare cases I assume as we had successful read with the default consumer group balancer setting, in our test deployment, with retry forever setting... At least we can use the range balancer to avoid the PSL problem.

Bullet 3. I tried list offsets request with a single partition and start only, it didn't work either. I tried different combinations. None of them worked.

Bullet 4. Sorry for the confusion. To me these requests are all similar... Here is the code and debug dump for kadm.ListStartOffsets:

        brokers := strings.Split(brokers, ",")
    tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
    opts := []kgo.Opt{
        kgo.SeedBrokers(brokers...),
        kgo.WithLogger(kgo.BasicLogger(os.Stdout, kgo.LogLevelDebug, nil)),

        kgo.RetryTimeout(0), // no retry timeout
        kgo.RetryBackoffFn(func(n int) time.Duration {
            return 1 << (n % 6) * time.Second // up to 32 seconds
        }),
        kgo.SASL(plain.Auth{
            User: user,
            Pass: password,
        }.AsMechanism()),
        kgo.Dialer(tlsDialer.DialContext),
    }

    fmt.Println("start to dial kafka:", brokers)
    admClient, err := kadm.NewOptClient(opts...)
    if err != nil {
        fmt.Println("creating kafka adm client", "error", err)
        return // nil, err
    }
    defer admClient.Close()

    sofs, err := admClient.ListStartOffsets(context.Background(), topic)
    if err != nil {
        fmt.Println("ListStartOffsets error:", err)
        return
    }

start to dial kafka: [us-west1-kafka-pubsub.googleapis.com:443]
[DEBUG] opening connection to broker; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[DEBUG] connection opened to broker; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[DEBUG] issuing api versions request; broker: seed_0, version: 3
[DEBUG] wrote ApiVersions v3; broker: seed_0, bytes_written: 31, write_wait: 39.593µs, time_to_write: 72.683µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: seed_0, bytes_read: 121, read_wait: 37.657µs, time_to_read: 64.847684ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: seed_0
[DEBUG] wrote SASLHandshake v1; broker: seed_0, bytes_written: 24, write_wait: 29.357µs, time_to_write: 111.995µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: seed_0, bytes_read: 34, read_wait: 57.684µs, time_to_read: 26.503058ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: seed_0, addr: us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: seed_0, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: seed_0, bytes_written: 3171, write_wait: 30.926µs, time_to_write: 77.718µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: seed_0, bytes_read: 16, read_wait: 25.831µs, time_to_read: 93.232193ms, err: <nil>
[DEBUG] connection initialized successfully; addr: us-west1-kafka-pubsub.googleapis.com:443, broker: seed_0
[DEBUG] wrote Metadata v1; broker: seed_0, bytes_written: 97, write_wait: 257.291252ms, time_to_write: 107.676µs, err: <nil>
[DEBUG] read Metadata v1; broker: seed_0, bytes_read: 311, read_wait: 100.396µs, time_to_read: 96.382674ms, err: <nil>
[DEBUG] opening connection to broker; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] connection opened to broker; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] issuing api versions request; broker: 6543210, version: 3
[DEBUG] wrote ApiVersions v3; broker: 6543210, bytes_written: 31, write_wait: 8.548µs, time_to_write: 40.118µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 6543210, bytes_read: 121, read_wait: 23.102µs, time_to_read: 66.417704ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 6543210
[DEBUG] wrote SASLHandshake v1; broker: 6543210, bytes_written: 24, write_wait: 27.932µs, time_to_write: 128.174µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 6543210, bytes_read: 34, read_wait: 65.69µs, time_to_read: 31.19184ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 6543210, addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 6543210, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 6543210, bytes_written: 3171, write_wait: 15.043µs, time_to_write: 130.115µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 6543210, bytes_read: 16, read_wait: 47.543µs, time_to_read: 108.637651ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 6543210.us-west1-kafka-pubsub.googleapis.com:443, broker: 6543210
[DEBUG] wrote Metadata v1; broker: 6543210, bytes_written: 97, write_wait: 257.952363ms, time_to_write: 126.817µs, err: <nil>
[DEBUG] read Metadata v1; broker: 6543210, bytes_read: 311, read_wait: 67.465µs, time_to_read: 85.456658ms, err: <nil>
[DEBUG] sharded request; req: ListOffsets, destinations: [601576412 601576413]
[DEBUG] opening connection to broker; addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576413
[DEBUG] opening connection to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] connection opened to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] issuing api versions request; broker: 601576412, version: 3
[DEBUG] wrote ApiVersions v3; broker: 601576412, bytes_written: 31, write_wait: 9.257µs, time_to_write: 49.734µs, err: <nil>
[DEBUG] connection opened to broker; addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576413
[DEBUG] issuing api versions request; broker: 601576413, version: 3
[DEBUG] wrote ApiVersions v3; broker: 601576413, bytes_written: 31, write_wait: 18.126µs, time_to_write: 49.114µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 601576412, bytes_read: 121, read_wait: 23.364µs, time_to_read: 62.832306ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 601576412
[DEBUG] wrote SASLHandshake v1; broker: 601576412, bytes_written: 24, write_wait: 13.054µs, time_to_write: 89.503µs, err: <nil>
[DEBUG] read ApiVersions v3; broker: 601576413, bytes_read: 121, read_wait: 32.209µs, time_to_read: 71.252521ms, err: <nil>
[DEBUG] issuing SASLHandshakeRequest; broker: 601576413
[DEBUG] wrote SASLHandshake v1; broker: 601576413, bytes_written: 24, write_wait: 17.505µs, time_to_write: 121.556µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 601576412, bytes_read: 34, read_wait: 62.26µs, time_to_read: 31.123958ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 601576412, addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 601576412, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 601576412, bytes_written: 3171, write_wait: 18.393µs, time_to_write: 124.18µs, err: <nil>
[DEBUG] read SASLHandshake v1; broker: 601576413, bytes_read: 34, read_wait: 47.778µs, time_to_read: 25.500823ms, err: <nil>
[DEBUG] beginning sasl authentication; broker: 601576413, addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, mechanism: PLAIN, authenticate: true
[DEBUG] issuing SASLAuthenticate; broker: 601576413, version: 0, step: 0
[DEBUG] wrote SASLAuthenticate v0; broker: 601576413, bytes_written: 3171, write_wait: 12.515µs, time_to_write: 102.532µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 601576412, bytes_read: 16, read_wait: 37.6µs, time_to_read: 79.523638ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] wrote ListOffsets v4; broker: 601576412, bytes_written: 122, write_wait: 225.118839ms, time_to_write: 148.832µs, err: <nil>
[DEBUG] read SASLAuthenticate v0; broker: 601576413, bytes_read: 16, read_wait: 26.212µs, time_to_read: 75.177888ms, err: <nil>
[DEBUG] connection initialized successfully; addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576413
[DEBUG] wrote ListOffsets v4; broker: 601576413, bytes_written: 122, write_wait: 229.058099ms, time_to_write: 103.068µs, err: <nil>
[DEBUG] read ListOffsets v4; broker: 601576412, bytes_read: 0, read_wait: 167.156µs, time_to_read: 87.217329ms, err: EOF
[DEBUG] read from broker errored, killing connection; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412, successful_reads: 0, err: EOF
[DEBUG] read ListOffsets v4; broker: 601576413, bytes_read: 0, read_wait: 59.393µs, time_to_read: 89.848354ms, err: EOF
[DEBUG] read from broker errored, killing connection; addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576413, successful_reads: 0, err: EOF
[DEBUG] sharded request failed, resharding and reissuing; req: ListOffsets, time_since_start: 2.656943156s, tries: 0, err: EOF
[DEBUG] sharded request; req: ListOffsets, destinations: [601576412]
[DEBUG] opening connection to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412
[DEBUG] sharded request failed, resharding and reissuing; req: ListOffsets, time_since_start: 2.663458903s, tries: 0, err: EOF
[DEBUG] sharded request; req: ListOffsets, destinations: [601576413]
[DEBUG] opening connection to broker; addr: 601576413.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576413
[DEBUG] connection opened to broker; addr: 601576412.us-west1-kafka-pubsub.googleapis.com:443, broker: 601576412

And it seems to retry forever and didn't return...


I raised this question that is really exploratory, and would like to see if franz-go/kgo could potentially be a feasible driver for PSL. I feel under certain restrictions (only use working functions), it could. But from my point of view, we need PSL to use our testcase in their regression test suite... Do you have any other suggestions? Thanks.

twmb commented 1 year ago

Well, what's odd is the client itself internally issues a ListOffsets request in some scenarios. What are the logs for using a group consumer on a new group, with ResetOffset(NewOffset().AtEnd())? If that succeeds -- then something about how we are building list offsets is different from how the client is internally doing it.

ns-gzhang commented 1 year ago

So a PSL consumer group is really a subscription (the group name does not matter, it's the topic (in Kafka sense) that is a PSL subscription). So I created a new subscription for testing a new group. During the creation of the subscription, it's required to set option start from beginning or end. So any setting in consumer option with ConsumeResetOffset() probably does not matter. But my testing was successful - I have to send over new messages to the topic... In general, PSL suggests to do a Seek from admin client or CLI for starting position. (it's also weird that the PSL Seek operation would do a handshake with the consumer... what if the consumers are already running and they would have already started consuming from somewhere...)

I tested with the existing group also. ConsumeResetOffset(NewOffset().AtEnd()) had no effect as expected.

I also tested with ConsumePartitions(), both NewOffset().AtStart() and NewOffset().AtEnd() didn't work. But any real offset number worked fine. IIRC, Confluent cgo driver didn't work with the special OffsetBeginning on PSL either.

The list offset testing code was tested with Kafka first, and worked. Then we try out on PSL. Thanks!

twmb commented 1 year ago

So basically, PSL seems to not support listing offsets at the beginning nor the end -- which resolves the final two bullets in my summary just above. Is there further investigation to be done / anything leaving this issue open?

ns-gzhang commented 1 year ago

Thanks @twmb . I have no further questions. Just for additional info if anyone is interested. Usually we got HighWaterMark from received messages so we can get lag info right from the received msgs. But PSL does not send it...

I will let PSL folks know this. I will close this question now. Really appreciate your help!

ns-gzhang commented 1 year ago

BTW, Travis @twmb re:

PSL doesn't support null topics in an OffsetFetch request (which is what kadm uses)

Is it possible that kadm can be enhanced to take input topic/partition parameters for OffsetFetch when they are supplied in the call, so PSL might work, which is a legitimate solution when needed?

twmb commented 1 year ago

What would the API look like?

ns-gzhang commented 1 year ago

These would be the same existing interface with topics/partitions specified, such as:

func (cl *Client) ListStartOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)

func (cl *Client) ListEndOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)

func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error)

func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (ListedOffsets, error)

func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetResponses, error)

Thanks!

twmb commented 1 year ago

If you just want the topic itself, FetchOffsetsForTopics already does this. Are you requesting an API that fetches specific partitions?

ns-gzhang commented 1 year ago

Sorry I had the impression that FetchOffsetsForTopics had the generic behavior as it was not working as expected. The current return was like the following (not the expected commit offsets):

partition: 1 offset: -1
partition: 0 offset: -1

Actually this one is less important as I found the kgo client call Request(ctx, r kmsg.OffsetFetchRequestTopic) would return the expected result. Not sure why it's different...

twmb commented 1 year ago

FetchOffsetsForTopics fills in partitions that do not yet have commits with -1 for the committed offset I think you're asking for an API that only fetches specific partitions, for specific topics?

ns-gzhang commented 1 year ago

-1 is not the correct commit offsets as there were commits. This was returned from the following code:

        ofr, err := admClient.FetchOffsetsForTopics(context.Background(), consumerGroup, topic)

Here are the commit offsets I got with the following call with expected return:

        ofrTopic := kmsg.OffsetFetchRequestTopic{
        Topic:      topic,
        Partitions: []int32{0, 1},
    }
    ofr := kmsg.OffsetFetchRequest{
        Version: 7,
        Group:   topic,
        Topics:  []kmsg.OffsetFetchRequestTopic{ofrTopic},
    }

    resp, err := client.Request(context.Background(), &ofr)

partition: 0 commit offset: 43
partition: 1 commit offset: 596
twmb commented 1 year ago

Ok, so what's going on with that is:

I think an option is to add FetchOffsetsForPartitions(context.Context, TopicsSet), and then you'll manually specify all partitions you want to in the response (and -1 will be added for any partitions that are missing commits).

ns-gzhang commented 1 year ago

Thanks @twmb I feel the existing function may be good enough for PSL if it works:

ofr, err := admClient.FetchOffsetsForTopics(context.Background(), consumerGroup, topic)

as topic is specified already (although limited to 1 topic with redundant consumer group for PSL). But I see your point in general case, both consumer group and topics need to be taken as constraints. And what if kadm is starting from the topics specified (not a null topic)? And I have a second way that works to get commit offsets so this new function is less important, compared to, say, ListStartOffsets.

twmb commented 1 year ago

You're asking for a behavior change in FetchOffsetForTopics -- currently it returns commits for topics you ask for (and -1 for un-committed partitions) merged with topics that are committed to, even if you did not ask for those extra topics. You're asking for it to change to just the first bit -- only topics you ask for. That might be clearer anyway. There's not really a super clear use case for FetchOffsetsForTopics. If the client used non-null topics, then topics that were committed to but you didn't ask for would not be returned.

ns-gzhang commented 1 year ago

I see. So it was a union of the consumer group committed topics + specified topics. I was expecting this:

If the client used non-null topics, then topics that were committed to but you didn't ask for would not be returned.

Not sure if anyone relies on the current behavior. Seems the safe way is to use a different name for the new behavior (if you do implement one), such as FetchOffsetForTopicsOnly :-) ? Thanks!

twmb commented 1 year ago

The only public usage is in clones of the franz-go repo: https://github.com/search?q=fetchoffsetsfortopics+language%3AGo&type=code&l=Go I think it's safe to change.

ns-gzhang commented 1 year ago

Thanks @twmb Travis. I interacted with PSL technical folks and pointed them to this discussion also. Apparently they don't care much about Kafka golang drivers other than the Confluent one. A few points I took from their response relevant to this FYI:

Per the kgo client: I don't think this client will work with our implementation. As stated earlier, it makes assumptions about the stability of cluster topology that just don't hold for our proxy implementation. These same assumptions are not made by either of the confluent developed clients (java, librdkafka) to my knowledge.

We don't support userland balancing, instead treating all clients as followers in the group protocol, and delegate to the assignment service logic to deliver assignments.

We don't support reading special offsets (head offset, oldest offset) as doing so is an expensive operation for us and, as stated earlier, is unlikely to be a reliable backlog metric for Pub/Sub Lite. We would highly recommend that you not use this.

None of the restrictions are documented and their documentation sounded like drop-in replacement (but in fact far from that). We got no response about including a test case using kgo driver in their regression bucket, so we have no choice but to give up at the moment.

Thanks again!

twmb commented 1 year ago

Did they ever clarify what those special assumptions are?

I'd like to echo that...

I will eventually change FetchOffsetsForTopics per this discussion. That said -- if you set the initial offsets manually and consume with range

The kgo driver makes no assumptions about cluster stability nor topology.

From what I can tell in this thread, you're able to use kgo if you bump timeouts and use the range balancer -- the same thing as the cgo driver. Is anything else preventing you from using kgo?

Google support is notoriously hostile and dismissive, so their responses here do not surprise me.

ns-gzhang commented 1 year ago

From what I can tell in this thread, you're able to use kgo if you bump timeouts and use the range balancer -- the same thing as the cgo driver. Is anything else preventing you from using kgo?

We could make a working app using kgo driver with PubSubLite within PSL's limitation. Actually we did and deployed our app pipeline to a test environment in GCP with PSL. However, during our testing, the publisher with manual partition routing, which had been working from day 1 I tested, suddenly didn't work for quite a few days (we didn't know what happened). We had to switch from PSL to Kafka in an emergency action. That's the reason I asked them to have their regression bucket to cover our kgo driver test case. We cannot have such outages in a production deployment. We will use kgo with Kafka/Redpanda. I want to move our apps to AWS or Azure... Do you have any other suggestions? Thanks!

ns-gzhang commented 1 year ago

I'd be interested if you can verify this with the cgo driver by setting the balancer to cooperative-sticky

I did testing of cgo driver with the default v.s. "partition.assignment.strategy": "cooperative-sticky", from surface, couldn't tell much differences... it always took long time to get the consumer group ready and to read messages. Both options successfully got messages. I enabled debug, and also print reader.GetRebalanceProtocol() right after creating the consumer as well as after getting the first message. The values were NONE initially for both, and EAGER and COOPERATIVE for the default and non-default settings.

Here is the debug log dump for the non-default "cooperative-sticky" setting:

%7|1692385947.090|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "unused": updating member id "(not-set)" -> ""
%7|1692385947.091|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v2.1.1 (0x20101ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x100)
%7|1692385947.091|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state init -> query-coord (join-state init)
%7|1692385947.091|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%7|1692385947.091|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state query-coord (join-state init)
%7|1692385947.091|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
RebalanceProtocol NONE
%7|1692385947.092|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op SUBSCRIBE in state query-coord (join-state init)
%7|1692385947.092|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "unused": subscribe to new subscription of 1 topics (join-state init)
%7|1692385947.092|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%3|1692385947.101|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 9ms in state CONNECT)
2023-08-18T19:12:27.101Z    ERROR   kafka/kafka_consumer.go:293 read message error:sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 9ms in state CONNECT)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:293
2023-08-18T19:12:27.101Z    ERROR   kafka/kafka_consumer.go:293 read message error:1/1 brokers are down
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:293
%7|1692385948.091|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%7|1692385948.091|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
<... repeated dozens of times removed>
%7|1692385949.091|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%7|1692385949.162|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused": querying for coordinator: intervaled in state query-coord
%7|1692385949.162|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state query-coord -> wait-coord (join-state init)
%7|1692385949.164|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused" coordinator is us-west1-kafka-pubsub.googleapis.com:443 id 6543210
%7|1692385949.164|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: Group "unused" changing coordinator -1 -> 6543210
%7|1692385949.164|COORDSET|rdkafka#consumer-1| [thrd:main]: Group "unused" coordinator set to broker sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/6543210
%7|1692385949.164|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state wait-coord -> wait-broker-transport (join-state init)
%7|1692385949.164|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused": querying for coordinator: intervaled in state wait-broker-transport
%7|1692385949.166|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused" coordinator is us-west1-kafka-pubsub.googleapis.com:443 id 6543210
%3|1692385949.167|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: us-west1-kafka-pubsub.googleapis.com:443: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 2ms in state CONNECT)
2023-08-18T19:12:29.167Z    ERROR   kafka/kafka_consumer.go:293 read message error:GroupCoordinator: us-west1-kafka-pubsub.googleapis.com:443: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 2ms in state CONNECT)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:293
%7|1692385949.274|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state wait-broker-transport -> up (join-state init)
%7|1692385949.274|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 0 subscribed topic(s)
%7|1692385949.274|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)
%7|1692385949.274|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": postponing join until up-to-date metadata is available
%7|1692385949.274|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-metadata (state up)
%7|1692385949.274|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385949.274|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.274|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.274|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.274|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.274|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state wait-metadata (rebalance rejoin=false)
%7|1692385949.345|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Group "unused": effective subscription list changed from 0 to 1 topic(s):
%7|1692385949.345|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]:  Topic projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub with 2 partition(s)
%7|1692385949.345|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": subscription updated from metadata change: rejoining group in state wait-metadata
%7|1692385949.345|GRPLEADER|rdkafka#consumer-1| [thrd:main]: Group "unused": resetting group leader info: group (re)join
%7|1692385949.345|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused" (re)joining in join-state wait-metadata with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
%7|1692385949.345|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "unused" initiating rebalance (NONE) in state up (join-state wait-metadata) with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
%7|1692385949.345|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": Rejoining group without an assignment: Metadata for subscribed topic(s) has changed
%7|1692385949.345|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-metadata -> init (state up)
%7|1692385949.345|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 1 subscribed topic(s)
%7|1692385949.345|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)
%7|1692385949.345|JOIN|rdkafka#consumer-1| [thrd:main]: sasl_ssl://6543210.us-west1-kafka-pubsub.googleapis.com:443/6543210: Joining group "unused" with 1 subscribed topic(s) and member id ""
%7|1692385949.345|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-join (state up)
%7|1692385949.351|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol cooperative-sticky, LeaderId 82f7c4f9-93ef-4954-95de-5849ceaebd90, my MemberId 00d5053c-be7e-4ba0-aa5c-74b0aa783869, member metadata count 0: (no error)
%7|1692385949.351|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "unused": updating member id "" -> "00d5053c-be7e-4ba0-aa5c-74b0aa783869"
%7|1692385949.351|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-join -> wait-sync (state up)
%7|1692385949.356|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (10 bytes of MemberState data)
%7|1692385949.357|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.357|COOPASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": incremental assignment: 0 newly added, 0 revoked partitions based on assignment of 0 partitions
%7|1692385949.357|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-sync -> wait-assign-call (state up)
%7|1692385949.357|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": delegating incremental assign of 0 partition(s) to application on queue rd_kafka_cgrp_new: sync group assign
%7|1692385949.357|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": 0 partition(s) being added to group assignment of 0 partition(s)
%7|1692385949.357|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": setting group assignment to 0 partition(s)
%7|1692385949.357|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385949.357|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-assign-call)
%7|1692385949.357|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op ASSIGN in state up (join-state wait-assign-call)
%7|1692385949.357|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Added 0 partition(s) to assignment which now consists of 0 partition(s) where of 0 are in pending state and 0 are being queried
%7|1692385949.357|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-assign-call -> steady (state up)
%7|1692385949.357|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385949.357|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.357|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.357|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.357|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385949.357|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385952.090|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385952.090|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385952.090|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385952.090|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385952.090|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385952.091|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385952.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385955.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385957.090|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385957.091|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385957.091|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385957.091|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385957.091|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385957.091|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385958.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385961.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385962.090|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385962.091|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385962.091|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385962.091|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385962.091|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385962.091|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385964.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385967.090|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385967.091|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385967.091|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385967.091|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385967.091|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385967.091|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385967.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385970.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385972.090|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385972.090|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385972.091|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385972.091|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385972.091|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385972.091|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385973.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385976.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385977.090|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385977.091|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385977.091|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385977.091|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385977.091|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385977.091|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385979.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385982.090|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385982.091|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385982.091|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385982.091|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385982.091|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385982.091|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385982.357|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385982.358|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "unused" heartbeat error response in state up (join-state steady, 0 partition(s) assigned): Broker: Group rebalance in progress
%7|1692385982.358|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": Rejoining group with 0 owned partition(s): Group is rebalancing
%7|1692385982.358|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state steady -> init (state up)
%7|1692385982.358|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 1 subscribed topic(s)
%7|1692385982.358|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (33013ms old)
%7|1692385982.358|JOIN|rdkafka#consumer-1| [thrd:main]: sasl_ssl://6543210.us-west1-kafka-pubsub.googleapis.com:443/6543210: Joining group "unused" with 1 subscribed topic(s) and member id "00d5053c-be7e-4ba0-aa5c-74b0aa783869"
%7|1692385982.358|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-join (state up)
%7|1692385982.359|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId 0, Protocol cooperative-sticky, LeaderId dabab31c-3848-4003-84b0-2b80c4063199, my MemberId 00d5053c-be7e-4ba0-aa5c-74b0aa783869, member metadata count 0: (no error)
%7|1692385982.359|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-join -> wait-sync (state up)
%7|1692385982.360|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (98 bytes of MemberState data)
%7|1692385982.360|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385982.360|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset INVALID
%7|1692385982.361|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset INVALID
%7|1692385982.361|COOPASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": incremental assignment: 2 newly added, 0 revoked partitions based on assignment of 2 partitions
%7|1692385982.361|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-sync -> wait-assign-call (state up)
%7|1692385982.361|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": delegating incremental assign of 2 partition(s) to application on queue rd_kafka_cgrp_new: sync group assign
%7|1692385982.361|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": 2 partition(s) being added to group assignment of 0 partition(s)
%7|1692385982.361|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": setting group assignment to 2 partition(s)
%7|1692385982.361|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385982.361|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset INVALID
%7|1692385982.361|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset INVALID
%7|1692385982.361|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-assign-call)
%7|1692385982.361|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op ASSIGN in state up (join-state wait-assign-call)
%7|1692385982.361|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Added 2 partition(s) to assignment which now consists of 2 partition(s) where of 2 are in pending state and 0 are being queried
%7|1692385982.361|PAUSE|rdkafka#consumer-1| [thrd:main]: Resuming fetchers for 2 assigned partition(s): incremental assign called
%7|1692385982.361|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-assign-call -> steady (state up)
%7|1692385982.361|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385982.361|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385982.361|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1692385982.361|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1692385982.361|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385982.361|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1692385982.361|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1692385982.361|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385982.361|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385982.361|SRVPEND|rdkafka#consumer-1| [thrd:main]: Querying committed offset for pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1692385982.361|SRVPEND|rdkafka#consumer-1| [thrd:main]: Querying committed offset for pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
%7|1692385982.361|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Fetching committed offsets for 2 pending partition(s) in assignment
%7|1692385982.361|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Group unused OffsetFetchRequest(v0) for 2/2 partition(s)
%7|1692385982.361|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Fetch committed offsets for 2/2 partition(s)
%7|1692385982.361|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Current assignment of 2 partition(s) with 2 pending adds, 2 offset queries, 0 partitions awaiting stop and 0 offset commits in progress
%7|1692385982.406|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Adding projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] back to pending list with offset 274
%7|1692385982.406|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Adding projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] back to pending list with offset 759
%7|1692385982.406|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385982.406|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385982.406|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1692385982.406|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1692385982.406|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385982.406|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset 274
%7|1692385982.406|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset 759
%7|1692385982.407|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385982.407|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385982.407|SRVPEND|rdkafka#consumer-1| [thrd:main]: Starting pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] at offset 759 (leader epoch -1)
%7|1692385982.407|SRVPEND|rdkafka#consumer-1| [thrd:main]: Starting pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] at offset 274 (leader epoch -1)
%7|1692385982.407|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385982.407|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op PARTITION_JOIN in state up (join-state steady) for projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1692385982.407|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "unused": add projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1692385982.407|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op PARTITION_JOIN in state up (join-state steady) for projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
%7|1692385982.407|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "unused": add projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
%3|1692385982.554|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/6]: sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/601576412: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 146ms in state CONNECT)
%3|1692385982.554|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://601576413.us-west1-kafka-pubsub.googleapis.com:443/6]: sasl_ssl://601576413.us-west1-kafka-pubsub.googleapis.com:443/601576413: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 147ms in state CONNECT)
2023-08-18T19:13:02.554Z    ERROR   kafka/kafka_consumer.go:293 read message error:sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/601576412: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 146ms in state CONNECT)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:293
2023-08-18T19:13:02.554Z    ERROR   kafka/kafka_consumer.go:293 read message error:sasl_ssl://601576413.us-west1-kafka-pubsub.googleapis.com:443/601576413: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 147ms in state CONNECT)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:293
%3|1692385983.557|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/6]: sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/601576412: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 2ms in state CONNECT, 1 identical error(s) suppressed)
2023-08-18T19:13:03.557Z    ERROR   kafka/kafka_consumer.go:293 read message error:sasl_ssl://601576412.us-west1-kafka-pubsub.googleapis.com:443/601576412: Failed to connect to broker at [2001:4860:4802:34::d4]:443: Network is unreachable (after 2ms in state CONNECT, 1 identical error(s) suppressed)
main.(*PartitionReader).Start.func1
    /home/gzhang/go/src/kafka/kafka_consumer.go:293
2023-08-18T19:13:04.067Z    DEBUG   kafka/kafka_consumer.go:297 success in reading data {"topic": "projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub", "Partition": 1}
2023-08-18T19:13:04.067Z    DEBUG   kafka/kafka_consumer.go:297 success in reading data {"topic": "projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub", "Partition": 1}
received a message   p: 1  offset: 759
2023-08-18T19:13:04.067Z    DEBUG   kafka/kafka_consumer.go:297 success in reading data {"topic": "projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub", "Partition": 1}
2023-08-18T19:13:04.068Z    DEBUG   kafka/kafka_consumer.go:297 success in reading data {"topic": "projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub", "Partition": 1}
%7|1692385984.068|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state steady)
2023-08-18T19:13:04.068Z    DEBUG   kafka/kafka_consumer.go:297 success in reading data {"topic": "projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub", "Partition": 1}
RebalanceProtocol COOPERATIVE

Here is the debug log dump for the default setting:

%7|1692385814.428|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "unused": updating member id "(not-set)" -> ""
%7|1692385814.429|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state init -> query-coord (join-state init)
%7|1692385814.429|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v2.1.1 (0x20101ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x100)
%7|1692385814.429|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%7|1692385814.429|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state query-coord (join-state init)
%7|1692385814.429|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
RebalanceProtocol NONE
%7|1692385814.429|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op SUBSCRIBE in state query-coord (join-state init)
%7|1692385814.429|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "unused": subscribe to new subscription of 1 topics (join-state init)
%7|1692385814.429|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "unused": no broker available for coordinator query: intervaled in state query-coord
%7|1692385814.523|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused": querying for coordinator: intervaled in state query-coord
%7|1692385814.523|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state query-coord -> wait-coord (join-state init)
%7|1692385814.525|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused" coordinator is us-west1-kafka-pubsub.googleapis.com:443 id 6543210
%7|1692385814.525|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: Group "unused" changing coordinator -1 -> 6543210
%7|1692385814.525|COORDSET|rdkafka#consumer-1| [thrd:main]: Group "unused" coordinator set to broker sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/6543210
%7|1692385814.525|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state wait-coord -> wait-broker-transport (join-state init)
%7|1692385814.525|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused": querying for coordinator: intervaled in state wait-broker-transport
%7|1692385814.526|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: sasl_ssl://us-west1-kafka-pubsub.googleapis.com:443/bootstrap: Group "unused" coordinator is us-west1-kafka-pubsub.googleapis.com:443 id 6543210
%7|1692385814.601|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed state wait-broker-transport -> up (join-state init)
%7|1692385814.601|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 0 subscribed topic(s)
%7|1692385814.601|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)
%7|1692385814.601|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": postponing join until up-to-date metadata is available
%7|1692385814.601|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-metadata (state up)
%7|1692385814.601|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385814.601|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.601|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.601|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.601|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.601|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state wait-metadata (rebalance rejoin=false)
%7|1692385814.692|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Group "unused": effective subscription list changed from 0 to 1 topic(s):
%7|1692385814.692|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]:  Topic projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub with 2 partition(s)
%7|1692385814.692|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": subscription updated from metadata change: rejoining group in state wait-metadata
%7|1692385814.692|GRPLEADER|rdkafka#consumer-1| [thrd:main]: Group "unused": resetting group leader info: group (re)join
%7|1692385814.692|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused" (re)joining in join-state wait-metadata with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
%7|1692385814.692|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "unused" initiating rebalance (NONE) in state up (join-state wait-metadata) with 0 assigned partition(s): Metadata for subscribed topic(s) has changed
%7|1692385814.692|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": Rejoining group without an assignment: Metadata for subscribed topic(s) has changed
%7|1692385814.692|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-metadata -> init (state up)
%7|1692385814.692|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 1 subscribed topic(s)
%7|1692385814.692|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)
%7|1692385814.692|JOIN|rdkafka#consumer-1| [thrd:main]: sasl_ssl://6543210.us-west1-kafka-pubsub.googleapis.com:443/6543210: Joining group "unused" with 1 subscribed topic(s) and member id ""
%7|1692385814.692|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-join (state up)
%7|1692385814.695|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol range, LeaderId 4b291182-e7ab-4433-9ef1-e8416d85ab05, my MemberId 30d4f39f-1faf-4697-9782-abeba61e2e03, member metadata count 0: (no error)
%7|1692385814.695|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "unused": updating member id "" -> "30d4f39f-1faf-4697-9782-abeba61e2e03"
%7|1692385814.695|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-join -> wait-sync (state up)
%7|1692385814.698|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (10 bytes of MemberState data)
%7|1692385814.698|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.698|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-sync -> wait-assign-call (state up)
%7|1692385814.698|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": delegating assign of 0 partition(s) to application on queue rd_kafka_cgrp_new: new assignment
%7|1692385814.698|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": setting group assignment to 0 partition(s)
%7|1692385814.698|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385814.698|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-assign-call)
%7|1692385814.698|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op ASSIGN in state up (join-state wait-assign-call)
%7|1692385814.698|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": new assignment of 0 partition(s) in join-state wait-assign-call
%7|1692385814.698|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: No current assignment to clear
%7|1692385814.698|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Added 0 partition(s) to assignment which now consists of 0 partition(s) where of 0 are in pending state and 0 are being queried
%7|1692385814.698|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-assign-call -> steady (state up)
%7|1692385814.698|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385814.698|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.698|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.698|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.698|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385814.698|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385817.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385819.428|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385819.428|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385819.428|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385819.428|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385819.428|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385819.428|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385820.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385823.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385824.428|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385824.428|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385824.428|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385824.428|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385824.428|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385824.428|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385826.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385829.428|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385829.428|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385829.428|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385829.428|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385829.428|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385829.428|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385829.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385832.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385834.428|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385834.429|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385834.429|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385834.429|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385834.429|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385834.429|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385835.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385838.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385839.428|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385839.428|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385839.428|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385839.428|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385839.428|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385839.428|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385841.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385844.428|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385844.429|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385844.429|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385844.429|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385844.429|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385844.429|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385844.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385847.698|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Heartbeat for group "unused" generation id -1
%7|1692385847.704|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "unused" heartbeat error response in state up (join-state steady, 0 partition(s) assigned): Broker: Group rebalance in progress
%7|1692385847.704|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "unused" is rebalancing (EAGER) in state up (join-state steady) with 0 assigned partition(s): rebalance in progress
%7|1692385847.704|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state steady -> wait-unassign-call (state up)
%7|1692385847.704|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": delegating revoke of 0 partition(s) to application on queue rd_kafka_cgrp_new: rebalance in progress
%7|1692385847.704|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": clearing group assignment
%7|1692385847.704|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-unassign-call)
%7|1692385847.704|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op ASSIGN in state up (join-state wait-unassign-call)
%7|1692385847.705|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: No current assignment to clear
%7|1692385847.705|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-unassign-call -> wait-unassign-to-complete (state up)
%7|1692385847.705|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385847.705|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385847.705|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385847.705|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385847.705|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385847.705|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state wait-unassign-to-complete (rebalance rejoin=false)
%7|1692385847.705|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": unassign done in state up (join-state wait-unassign-to-complete)
%7|1692385847.705|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": Rejoining group without an assignment: Unassignment done
%7|1692385847.705|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-unassign-to-complete -> init (state up)
%7|1692385847.705|JOIN|rdkafka#consumer-1| [thrd:main]: Group "unused": join with 1 subscribed topic(s)
%7|1692385847.705|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (33012ms old)
%7|1692385847.705|JOIN|rdkafka#consumer-1| [thrd:main]: sasl_ssl://6543210.us-west1-kafka-pubsub.googleapis.com:443/6543210: Joining group "unused" with 1 subscribed topic(s) and member id "30d4f39f-1faf-4697-9782-abeba61e2e03"
%7|1692385847.705|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state init -> wait-join (state up)
%7|1692385847.712|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId 0, Protocol range, LeaderId 274c5d3c-9b40-4aa6-9e2c-3e591ae8ed51, my MemberId 30d4f39f-1faf-4697-9782-abeba61e2e03, member metadata count 0: (no error)
%7|1692385847.712|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-join -> wait-sync (state up)
%7|1692385847.713|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (98 bytes of MemberState data)
%7|1692385847.713|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385847.713|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset INVALID
%7|1692385847.713|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset INVALID
%7|1692385847.713|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-sync -> wait-assign-call (state up)
%7|1692385847.713|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": delegating assign of 2 partition(s) to application on queue rd_kafka_cgrp_new: new assignment
%7|1692385847.713|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "unused": setting group assignment to 2 partition(s)
%7|1692385847.713|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385847.713|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset INVALID
%7|1692385847.713|GRPASSIGNMENT|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset INVALID
%7|1692385847.713|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-assign-call)
%7|1692385847.714|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op ASSIGN in state up (join-state wait-assign-call)
%7|1692385847.714|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "unused": new assignment of 2 partition(s) in join-state wait-assign-call
%7|1692385847.714|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: No current assignment to clear
%7|1692385847.714|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Added 2 partition(s) to assignment which now consists of 2 partition(s) where of 2 are in pending state and 0 are being queried
%7|1692385847.714|PAUSE|rdkafka#consumer-1| [thrd:main]: Resuming fetchers for 2 assigned partition(s): assign called
%7|1692385847.714|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "unused" changed join state wait-assign-call -> steady (state up)
%7|1692385847.714|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385847.714|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385847.714|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1692385847.714|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1692385847.714|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385847.714|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1692385847.714|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1692385847.714|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385847.714|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385847.714|SRVPEND|rdkafka#consumer-1| [thrd:main]: Querying committed offset for pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1692385847.714|SRVPEND|rdkafka#consumer-1| [thrd:main]: Querying committed offset for pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
%7|1692385847.714|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Fetching committed offsets for 2 pending partition(s) in assignment
%7|1692385847.714|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Group unused OffsetFetchRequest(v0) for 2/2 partition(s)
%7|1692385847.714|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/6543210: Fetch committed offsets for 2/2 partition(s)
%7|1692385847.714|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Current assignment of 2 partition(s) with 2 pending adds, 2 offset queries, 0 partitions awaiting stop and 0 offset commits in progress
%7|1692385847.757|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Adding projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] back to pending list with offset 274
%7|1692385847.757|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: Adding projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] back to pending list with offset 686
%7|1692385847.757|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1692385847.757|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385847.757|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset STORED
%7|1692385847.757|DUMP_ALL|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset STORED
%7|1692385847.757|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 2 partition(s):
%7|1692385847.757|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] offset 274
%7|1692385847.757|DUMP_PND|rdkafka#consumer-1| [thrd:main]:  projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] offset 686
%7|1692385847.757|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385847.757|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1692385847.757|SRVPEND|rdkafka#consumer-1| [thrd:main]: Starting pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1] at offset 686 (leader epoch -1)
%7|1692385847.757|SRVPEND|rdkafka#consumer-1| [thrd:main]: Starting pending assigned partition projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0] at offset 274 (leader epoch -1)
%7|1692385847.757|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "unused": assignment operations done in join-state steady (rebalance rejoin=false)
%7|1692385847.757|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op PARTITION_JOIN in state up (join-state steady) for projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1692385847.757|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "unused": add projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [1]
%7|1692385847.757|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op PARTITION_JOIN in state up (join-state steady) for projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
%7|1692385847.757|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "unused": add projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub [0]
received a message   p: 1  offset: 686
2023-08-18T19:10:48.145Z    DEBUG   kafka/kafka_consumer.go:297 success in reading data {"topic": "projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub", "Partition": 1}
%7|1692385848.145|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "unused" received op GET_REBALANCE_PROTOCOL in state up (join-state steady)
2023-08-18T19:10:48.145Z    DEBUG   kafka/kafka_consumer.go:297 success in reading data {"topic": "projects/999999/locations/us-west1-c/subscriptions/topic-test-gz-sub", "Partition": 1}
RebalanceProtocol EAGER

Hope this helps. Thanks.

twmb commented 1 year ago

Looked at the logs -- this is a bit weird. The only visible difference between the cooperative-sticky in confluent-kafka-go vs. my own is that PSL isn't rejecting the writes with ILLEGAL_GENERATION. The client eventually receives a 0 generation at which point it should just ... proceed successfully.

From the logs, it looks like the cooperative sticky confluent-kafka-go succeeded by chance.

ns-gzhang commented 1 year ago

Does confluent-kafka-go repeatably reliably succeed? Are there times where "illegal generation" is encountered anywhere in the logs?

We didn't use confluent kafka-go in long-term test deployment so not sure if it it would happen. But during the unit tests I conducted, it seems to be consistently able to succeed.

Does CooperativeSticky with franz-go repeatably reliably fail? Are there times where "illegal generation" is not encountered?

For some reason we actually deployed to long-term test environment, and it actually could succeed and read messages sometimes (with retry-forever logic). Only that my test runs consistently returned errors.

twmb commented 1 year ago

What I would expect is that once it started to succeed, it would succeed forever / until the group was restarted. Re: confluent -- it just seems a bit strange here since both clients are actually behaving the same, per the logs. There looks to be no real difference between the two, minus Google returning generation 0 for confluent vs. generation -1 for franz-go.

twmb commented 1 year ago

I'm going to close this, and I'll merge the referenced PR that changes FetchOffsetsForTopics. kadm will (soon) return only the requested topics, unless the old behavior is opted into with kadm.FetchAllGroupTopics.