IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.23k stars 1.73k forks source link

Error when describing consumer groups #2244

Closed danielli-ziprecruiter closed 2 years ago

danielli-ziprecruiter commented 2 years ago
Versions
Sarama Kafka Go GitSHA
1.34.0 2.4.1.1 1.17 59a3d390ffc7
Logs

kafka: insufficient data to decode packet, more bytes expected

Problem Description

Upgraded from v1.33.0 to v1.34.0. Now when doing admin.DescribeConsumerGroups, it get the following error insufficient data to decode packet, more bytes expected. I believe this issue was introduced in 59a3d390ffc7.

dnwe commented 2 years ago

@danielli-ziprecruiter hmm I'm unable to reproduce this using v1.34.0 against either kafka 2.4.1 or kafka 3.1.0 backends and V2_4_1_0 for config.Version. Can you confirm what config.Version you were using and if there's anything special about your consumer groups — are they all Sarama-based, are they a mixture of Java+Sarama etc.

I was using the following code that I happened to have lying around to describe consumer groups and output the details — can you compare to your code to see if you are doing anything differently?

    desc, err := admin.DescribeConsumerGroups(groups)
    if err != nil {
        log.Fatal(err)
    }
    sort.Slice(desc, func(i, j int) bool {
        return desc[i].GroupId < desc[j].GroupId
    })
    for _, details := range desc {
        if len(details.Members) == 0 {
            continue
        }
        fmt.Println("########################################################################")
        fmt.Printf("#### Group: %s\n", details.GroupId)
        if !strings.EqualFold(details.State, "stable") {
            fmt.Printf("⚠️⚠️⚠️⚠️ State: %s\n", details.State)
        }
        for memberID, desc := range details.Members {
            var metadata *sarama.ConsumerGroupMemberMetadata
            if len(desc.MemberMetadata) > 0 {
                metadata, err = desc.GetMemberMetadata()
                if err != nil {
                    log.Printf("failed to get member metadata - got %v", err)
                    os.Exit(1)
                    continue
                }
            }
            var assignment *sarama.ConsumerGroupMemberAssignment
            if len(desc.MemberAssignment) > 0 {
                assignment, err = desc.GetMemberAssignment()
                if err != nil {
                    log.Printf("failed to get member assignment - got %v", err)
                    os.Exit(1)
                    continue
                }
            }
            fmt.Printf("\tMember: %s @ %s (%s)\n", desc.ClientId, desc.ClientHost, memberID)
            if assignment == nil || metadata == nil {
                fmt.Printf("\t\t       <no assigned topic partitions>\n")
                continue
            }
            sort.Strings(metadata.Topics)
            for _, topic := range metadata.Topics {
                if strings.TrimSpace(topic) == "" {
                    continue
                }
                partitions, ok := assignment.Topics[topic]
                if !ok {
                    fmt.Printf("\t\tTopic: %s\tPartitions: []\n", topic)
                    continue

                }
                sort.Slice(partitions, func(i, j int) bool {
                    return partitions[i] < partitions[j]
                })
                fmt.Printf("\t\tTopic: %s\tPartitions: %s\n", topic, strings.Join(strings.Fields(fmt.Sprint(partitions)), ","))
            }
        }
        fmt.Println()
        fmt.Println("########################################################################")
        fmt.Println()
    }
danielli-ziprecruiter commented 2 years ago

@dnwe We have a mix of consumer groups (Sarama, Kafka Connect, AWS Canary). I have done some more testing and believe this occurs when attempting to describe multiple consumer groups. We are passing in a slice of about 10 consumer groups. I can describe each consumer group individually, but get this error when attempting to describe 2 or 3 groups (depending on the combination of consumer groups).

aiquestion commented 2 years ago

eh.. seems to be a bug.
AuthorizedOperations shoud be in the GroupDescription acoording to the kafka prototcol code here and kafka protocol so only 1 consumer group should be good, but will failed with multiple consumer group.

i can work on a fix for it.

dnwe commented 2 years ago

@danielli-ziprecruiter please can you test 1.34.1?

danielli-ziprecruiter commented 2 years ago

It's working now for me with 1.34.1. Thank you!

dnwe commented 2 years ago

Thanks for reporting!