segmentio / kafka-go

Kafka library in Go
MIT License
7.58k stars 786 forks source link

w.WriteMessages: context deadline exceeded on MSK IAM #927

Open ayoul3 opened 2 years ago

ayoul3 commented 2 years ago

Describe the bug I activated IAM authentication on a Kafka cluster 2.7.0 (AWS MSK). Reading topics worked like a charm.
However when writing to a partition I suffered timeout errors at the discovery query when listing all brokers and partitions. It turns out that brokers on port 9098 (IAM authentication) does not like this empty Request metadata v8. Specifically the empty Topics field translated to 0xffffffff at the network level.

I reached out to AWS MSK to look into it, but maybe there is something to be done at the client to avoid running into this issue. The regular Kafka Java client does not have this issue, so there is a chance they'll just snooze off my bug report. We'll see.

I did not find an open issue on this, so putting this one here for visibility. Curious to know if someone else could reproduce.

Kafka Version

To Reproduce

import ( "context" "crypto/tls" "os" "strings" "time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam"

)

func main() { awsRegion := aws.String(os.Getenv("AWS_REGION")) sess, err := session.NewSession(&aws.Config{ Region: awsRegion, }) if err != nil { log.Logger.Fatal().Err(err) }

kafkaDialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    SASLMechanism: &aws_msk_iam.Mechanism{
        Signer: sigv4.NewSigner(sess.Config.Credentials),
        Region: *awsRegion,
    },
    TLS: &tls.Config{},
}

brokers := strings.Split(os.Getenv("KAFKA_SERVERS"), ",")
topic := os.Getenv("KAFKA_TOPIC")
batchSize := int(10e6) // 10MB
w := kafka.NewWriter(kafka.WriterConfig{
    Brokers:   brokers,
    Topic:     topic,
    Dialer:    kafkaDialer,
    BatchSize: batchSize,
    Balancer:  &kafka.CRC32Balancer{},
})

err = w.WriteMessages(context.Background(),
    kafka.Message{
        Key:   []byte("Key-A"),
        Value: []byte("Hello World!"),
    },
)
if err != nil {
    log.Logger.Err(err).Msg("error while writing batch")
}

}



**Expected Behavior**

The message should be written to the topic.  
At the network level, the cluster should respond with a list of brokers and partitions at the discovery level.

**Observed Behavior**
The client hangs on the Metadata Request and finally times out:
"level":"error","error":"context deadline exceeded","time":"2022-06-05T13:02:52+02:00","message":"error while writing batch"}

**Additional Context**
I did a nasty patch to validate the hypothesis of the null topics causing issue. Might unlock people running into the same [issue](https://gist.github.com/ayoul3/a8de378740d5ce051c609594e89da1ca). Definitely a hack though...
dominicbarnes commented 2 years ago

Unfortunately, I haven't been able to reproduce this issue on my own MSK cluster yet. I started with a serverless variety which I'm unsure about which version of Kafka it uses. I'll try again with a provisioned cluster at v2.7.0 to see if I can reproduce.

In the interim, have you made any discoveries on your end?

ayoul3 commented 2 years ago

No nothing. The support has not been much helpful. The patch I linked above works properly but the issue is still there on a provisionned cluster. I hit the same issue with the franz-go kafka library.

ducquangkstn commented 1 year ago

We face the same issue. And debugging it, it points to the same point with pool.discover.

I also run kafka cli (Java) to get metadata of a topic, it turns out that kafka with IAM is significant lower than kafka unauthenticated (not sure this is due to my setup or AWS issue)

time bin/kafka-topics.sh --bootstrap-server xxxxx:9092 --topic xxxxx --describe
real    0m2.345s
user    0m2.788s
sys 0m0.244s
time bin/kafka-topics.sh --bootstrap-server xxxxx:9098 --topic xxxxx --describe
real    0m11.401s
user    0m4.580s
sys 0m0.274s
ducquangkstn commented 1 year ago

Btw, IMO, this library issue is lack of error wrapping. Hence users have to debug manually.