segmentio / kafka-go

Kafka library in Go
MIT License
7.66k stars 791 forks source link

Aws cloud watch can't find groupID #930

Open kscooo opened 2 years ago

kscooo commented 2 years ago

Describe the bug

kafka.NewConsumerGroup can't automatically create groupid on aws cloudwatch

Kafka Version

  • kafka 2.6.2
  • kafka-go v0.4.32

To Reproduce

package main

import (
    "github.com/segmentio/kafka-go"
)

func NewMulReader(opts ...Option) *MulReader {
        // read config file 
    cfg := new(options)
    for _, opt := range opts {
        opt.apply(cfg)
    }

       // init ConsumerGroup
       creds := credentials.NewStaticCredentials(cfg.auth.accessKeyId, cfg.auth.secretAccessKey, "")
    mechanism := &aws_msk_iam.Mechanism{
        Signer: sigv4.NewSigner(creds),
        Region: cfg.region,
    }

    dialer := &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        SASLMechanism: mechanism,
        TLS:           &tls.Config{},
    }

    c, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{
        ID:      cfg.groupID, // but can't automatically create groupID on aws cloudwatch, but `ListGroups` can find this groupID
        Brokers: cfg.brokers,
        Dialer:  dialer,
        Topics:  []string{cfg.topic},
    })
    if err != nil {
        panic(err)
    }
        // read example: https://pkg.go.dev/github.com/segmentio/kafka-go#example-Generation.Start-ConsumerGroupParallelReaders
}

func NewKafkaClient(opts ...Option) *Reader {
         // read config file 
    cfg := new(options)
    for _, opt := range opts {
        opt.apply(cfg)
    }

    creds := credentials.NewStaticCredentials(cfg.auth.accessKeyId, cfg.auth.secretAccessKey, "")
    mechanism := &aws_msk_iam.Mechanism{
        Signer: sigv4.NewSigner(creds),
        Region: cfg.region,
    }

    dialer := &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        SASLMechanism: mechanism,
        TLS:           &tls.Config{},
    }

    r = kafka.NewReader(kafka.ReaderConfig{
            GroupID: cfg.groupID, // can automatically create groupid on aws cloudwatch
            Brokers: cfg.brokers,
            Topic:   cfg.topic,
            Dialer:  dialer,
        })

        r.ReadMessage(context.Background())

    r.Close() 
}

Expected Behavior

NewConsumerGroup can automatically create groupID on aws cloudwatch

Observed Behavior

NewReader can automatically create groupid on aws cloudwatch, but NewConsumerGroup can't. ListGroups can find NewReader groupID and NewConsumerGroup groupID.

Additional Context

I've consulted with aws tech support, but he's not familiar with the code level, and he suspects that the behavior of aws msk may not be consistent with open source kafka, resulting in groupIDs not being created automatically in aws cloudwatch. he suggested that I ask the kafka-go community first to see if that would help.

kscooo commented 2 years ago

New discovery: Observed that the groupID created by NewConsumerGroup is not found by ListGroups after some time(few min).

achille-roussel commented 2 years ago

Hello @kscooo

We might need a few more details to understand whether the issue comes from kafka-go or MSK. The issue you describe resembles some kind of mismatch between the client and server hearbeat interval, if the client doesn't send heartbeats often enough the server may assume that the consumer group is inactive and delete it.

kscooo commented 2 years ago

msk config

auto.create.topics.enable=false default.replication.factor=3 min.insync.replicas=2 num.io.threads=8 num.network.threads=5 num.partitions=1 num.replica.fetchers=2 replica.lag.time.max.ms=30000 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 unclean.leader.election.enable=true zookeeper.session.timeout.ms=18000

kscooo commented 2 years ago

When I shut down the consumers run, after a while, the ListGroups can't look up the consumers groupID, cloudwatch always can't create the groupid

After closing the single reader, the groupid still exists and so does cloudwatch.

topic always exist

achille-roussel commented 2 years ago

Thanks for the extra information.

Do you know the value of offset.retention.minutes in your server?

kscooo commented 2 years ago

RetentionTime in ReaderConfig and ConsumerGroupConfig use the library's default settings

ConsumerGroupConfig is set to 24 hours * 7, but it's the same as before

achille-roussel commented 2 years ago

Hello @kscooo

I wanted to check if you had updates to share on this issue?

kscooo commented 2 years ago

@achille-roussel Not for the time being, I follow up because of other things to give up the tracking

achille-roussel commented 1 year ago

Hello @kscooo!

Do you know if you are still experiencing this issue?

kscooo commented 1 year ago

Sorry, using franz-go to get the job done because of authentication issues with aws, have given up tracking this issue