aws / aws-msk-iam-sasl-signer-go

Apache License 2.0
15 stars 5 forks source link

Example for integration with confluentinc/confluent-kafka-go #12

Open bitdean opened 10 months ago

bitdean commented 10 months ago

Describe the feature

Since confluentinc/confluent-kafka-go is most generic and it uses underlaying librdkafka natively. Would it be possible that someone would provide example how to integrate with it?

Use Case

Proposed Solution

No response

Other Information

No response

Acknowledgements

aws-msk-iam-sasl-signer-go Module Versions Used

1.0.0

Go version used

1.21

nboergerotto commented 6 months ago

I got it running with the following code:

package main

import (
    "context"
    "fmt"
    "github.com/aws/aws-msk-iam-sasl-signer-go/signer"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "time"
)

func main() {
    bearerToken := createToken()

    producer := createProducerWithToken(bearerToken)
    defer producer.Close()

    produceMessagesToTopic(producer)

    flush(producer)
}

func createToken() kafka.OAuthBearerToken {
    token, tokenExpirationTime, err := signer.GenerateAuthToken(context.TODO(), "YOUR-AWS-REGION")
    if err != nil {
        panic(err)
    }
    seconds := tokenExpirationTime / 1000
    nanoseconds := (tokenExpirationTime % 1000) * 1000000
    bearerToken := kafka.OAuthBearerToken{
        TokenValue: token,
        Expiration: time.Unix(seconds, nanoseconds),
    }

    return bearerToken
}

func createProducerWithToken(bearerToken kafka.OAuthBearerToken) *kafka.Producer {
    kafkaConfig := &kafka.ConfigMap{
        "bootstrap.servers": "YOUR-BROKER-URL",
        "client.id":         "golang-test-producer",
        "security.protocol": "sasl_ssl",
        "sasl.mechanism":    "OAUTHBEARER",
    }
    producer, err := kafka.NewProducer(kafkaConfig)
    if err != nil {
        panic(err)
    }

    err = producer.SetOAuthBearerToken(bearerToken)
    if err != nil {
        panic(err)
    }
    return producer
}

func produceMessagesToTopic(producer *kafka.Producer) {
    topic := "YOUR-TOPIC-NAME"
    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        err := producer.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
        fmt.Printf("Successfully produced word %s\n", word)
        if err != nil {
            fmt.Printf("Failed to produce: %v\n", err)
        }
    }
}

func flush(producer *kafka.Producer) {
    producer.Flush(15 * 1000)
}

You have to replace the following strings in my code: YOUR-BROKER-URL, YOUR-AWS-REGION, YOUR-TOPIC-NAME

For running the code in an ECS-Task, I'm using the following Dockerfile:

# syntax=docker/dockerfile:1

FROM --platform=linux/amd64 golang:1.22 as builder
WORKDIR /app
COPY . .
RUN go mod download
RUN GOOS=linux go build -o /kafka-producer-golang-confluent

FROM --platform=linux/amd64 debian:bookworm-slim as runner
COPY --from=builder /kafka-producer-golang-confluent /kafka-producer-golang-confluent
RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates
CMD ["/kafka-producer-golang-confluent"]

For me, in order to get it running, it was very important to install ca-certificates ("RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates"), since debian:bookworm-slim doesn't have them by default. Depending on your docker base image, you may not need to do this though.

This exact code is successfully producing events on my kafka-topic, once the client-policies are correctly set as stated in the AWS documentation.

dixon14 commented 4 days ago

Hi, I encountered a problem when connecting consumer to the brokers using the IAM signer and the confluent kafka go client.

Setup the code similar as the above example. When I tried to ReadMessage from a topic, it return KafkaErrTimeOut. Looked at the debug logs - it shows no brokers is available for coordination but my brokers are healthy. If anyone can help on this?

type MessageProcessor interface {
    Process(msg *confluentkafka.Message) error
}

type Consumer struct {
    config       config.Config
    consumer     *confluentkafka.Consumer
    processorMap map[string]MessageProcessor
}

func New(
    ctx context.Context,
    config config.Config,
    processorMap map[string]MessageProcessor,
) *Consumer {
    configMap := &confluentkafka.ConfigMap{
        "bootstrap.servers":  config.KafkaBootstrapServers,
        "group.id":           config.KafkaGroupID,
        "security.protocol":  "SASL_SSL",
        "sasl.mechanism":     "OAUTHBEARER",
        "auto.offset.reset":  "latest",
        "enable.auto.commit": true,
        "session.timeout.ms": 60000,
        // "heartbeat.interval.ms": 60000,
        // "max.poll.interval.ms":  300000,
        // "debug":                 "all",
    }

    consumer, err := confluentkafka.NewConsumer(configMap)
    if err != nil {
        return nil
    }

    token, err := createToken(ctx)
    if err != nil {
        return nil
    }

    if err := consumer.SetOAuthBearerToken(token); err != nil {
        return nil
    }

    var topics []string
    for topic := range processorMap {
        topics = append(topics, topic)
    }

    // Subscribe to multiple topics
    if err = consumer.SubscribeTopics(topics, nil); err != nil {
        log.Err(err).Msg("Unable to subscribe to topics.")
        return nil
    }

    return &Consumer{
        config:       config,
        consumer:     consumer,
        processorMap: processorMap,
    }
}

// Start indicates that Kafka consumer will start polling messages
// from the subscribed topics in the kafka bootstrap servers
func (c *Consumer) Start() error {
    for {
        msg, err := c.consumer.ReadMessage(100 * time.Millisecond)
        if err != nil {
            return err
        }
        // fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
        //  *msg.TopicPartition.Topic, string(msg.Key), string(msg.Value))
        topic := *msg.TopicPartition.Topic
        processor := c.processorMap[topic]

        if err := processor.Process(msg); err != nil {
            log.Err(err).Msgf("Error in processing message from topic %s", topic)
            return err
        }
    }
}

func createToken(ctx context.Context) (confluentkafka.OAuthBearerToken, error) {
    token, tokenExpirationTimeMs, err := signer.GenerateAuthToken(ctx, "us-east-1")
    if err != nil {
        return confluentkafka.OAuthBearerToken{}, err
    }

    seconds := tokenExpirationTimeMs / 1000
    nanoseconds := (tokenExpirationTimeMs % 1000) * 1000000
    bearerToken := confluentkafka.OAuthBearerToken{
        TokenValue: token,
        Expiration: time.Unix(seconds, nanoseconds),
    }

    return bearerToken, nil
}