google / go-cloud

The Go Cloud Development Kit (Go CDK): A library and tools for open cloud development in Go.
https://gocloud.dev/
Apache License 2.0
9.45k stars 799 forks source link

pubsub/ AWS MSK Support #3398

Closed alok87 closed 4 months ago

alok87 commented 4 months ago

Does pubsub support AWS MSK? (aws serveless kafka)

AWS MSK requires these special token providers https://github.com/aws/aws-msk-iam-sasl-signer-go

vangent commented 4 months ago

As far as I know, it should. The Go CDK constructors take a sarama.Config, and it looks like the required token providers are provided on that.

vangent commented 4 months ago

The default URL opener will probably not work, as it uses a MinimalConfig and there's no way to override anything from the URL. However, you can create your own URLOpener with the saram.Config set:

https://github.com/google/go-cloud/blob/master/pubsub/kafkapubsub/kafka.go#L152

OpenTopic is here:

https://github.com/google/go-cloud/blob/master/pubsub/kafkapubsub/kafka.go#L232

alok87 commented 4 months ago

Did not get? will the following work or not!

my code is doing below, wont it work?

package main

func main() {
        config := cfg.New(appEnv)

        config.Pub.Kafka.Sarama = kafkapubsub.MinimalConfig()
    config.Pub.Kafka.Sarama.Net.SASL.Enable = true
    config.Pub.Kafka.Sarama.Net.SASL.Mechanism = sarama.SASLTypeOAuth
    config.Pub.Kafka.Sarama.Net.SASL.TokenProvider = pubsub.NewMSKAccessTokenProvider("ap-south-1")
    log.Infof(ctx, "MSK Bootstrap server: %v", config.Pub.Kafka.Address)

        pubService, err := pub.New(
        ctx,
        config.Pub,
        []string{"mytopic"},
    )

        pubService.Publish()
}
package pub

import (
    "context"
    "fmt"
    "sync"

    "github.com/IBM/sarama"
    gopubsub "gocloud.dev/pubsub"
    "gocloud.dev/pubsub/kafkapubsub"

    "github.com/myorg/myworld/pkg/log"
)

// Pub is a wrapper over gocloud pubsub
// to reduce code duplication across modules
type Pub struct {
    // openTopics keeps a list of open topics
    openTopics map[string]*gopubsub.Topic

    // config can be used later also
    config Config
}

// Config has the pubsub configuration
type Config struct {
    // Kafka configurations
    Kafka *KafkaConfig
}

// KafkaConfig keeps all kafka related config
type KafkaConfig struct {
    // Sarama kafka library configurations
    Sarama *sarama.Config
    // Address where kafka is present
    Address []string
}

// New creates a new instance of pubsub, opens subscriptions, and creates topics.
func New(
    ctx context.Context,
    config Config,
    openTopics []string,
) (*Pub, error) {
    if config.Kafka == nil {
        return nil, fmt.Errorf("only kafka supported currently")
    }

    openedTopics := make(map[string]*gopubsub.Topic)
    for _, topicName := range openTopics {
        topic, err := kafkapubsub.OpenTopic(
            config.Kafka.Address,
            config.Kafka.Sarama,
            topicName,
            nil,
        )
        if err != nil {
            return nil, fmt.Errorf(
                "could not create/establish conn with topic: %+v, err:%v",
                topicName,
                err,
            )
        }
        openedTopics[topicName] = topic
    }

    return &Pub{
        openTopics: openedTopics,
        config:     config,
    }, nil
}

// Close closes all open subscriptions and topics.
func (ps *Pub) Close(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()

    <-ctx.Done()
    for _, topic := range ps.openTopics {
        err := topic.Shutdown(ctx)
        if err != nil {
            log.Errorf(ctx, "error closing pubsub topic: %v", err)
        }
    }
}

// OpenTopic creates and registers a new topic.
func (ps *Pub) OpenTopic(
    ctx context.Context,
    topicName string,
) error {
    if _, exists := ps.openTopics[topicName]; exists {
        return fmt.Errorf("topic: %v already registered", topicName)
    }

    topic, err := kafkapubsub.OpenTopic(
        ps.config.Kafka.Address,
        ps.config.Kafka.Sarama,
        topicName,
        nil,
    )
    if err != nil {
        return fmt.Errorf(
            "could not create/establish conn with topic: %+v, err:%v",
            topicName,
            err,
        )
    }
    ps.openTopics[topicName] = topic

    return nil
}

// Publish writes to the topic.
func (ps *Pub) Publish(
    ctx context.Context,
    topicName string,
    message []byte,
) error {
    topic, ok := ps.openTopics[topicName]
    if !ok || topic == nil {
        return fmt.Errorf("topic: %v unregistered", topic)
    }

    return topic.Send(
        ctx,
        &gopubsub.Message{
            Body: message,
        },
    )
}

I am currently testing only above using go-cloud, only opening topics and publishing.

vangent commented 4 months ago

As far as I know if you set up the config.Sarama correctly for AWS MSK, it should work with Go CDK. If you are having problems I suggest you try it using the native AWS MSK to make sure your config is correct.

alok87 commented 4 months ago

Getting this kafka: client has run out of available brokers to talk to)

I debugged also

It is failing at: https://github.com/google/go-cloud/blob/master/pubsub/kafkapubsub/kafka.go#L247-L250

brokers configuration is correct, i tested with

sh-4.2# ./kafka-topics.sh --bootstrap-server $BS --command-config client.properties --list
alok87 commented 4 months ago

Working, TLS config was issue, thanks gocloud works with MSK 👍🏽