confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.65k stars 659 forks source link

Cant get oauthbearer to work in go #572

Closed davidmontgom closed 3 years ago

davidmontgom commented 3 years ago

Hi,

python kafka works with oauthbaerer confluent for python does not support oauthbearer Go I guess does but there are no clear examples on how to consume

Below is my code. I get no errors but I get no streams while python streams away.

So why will the go client not work with oauth?

Only output is below

Created Consumer rdkafka#consumer-1 Ignored OAuthBearerTokenRefresh

In python I connect thiks the below:

consumer = KafkaConsumer(
        group_id=None,
        bootstrap_servers=['test:9094'],
        security_protocol='SASL_SSL',
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider=TokenProvider(client_id,client_secret),  # returns a JWT 
        ssl_check_hostname=False,
        ssl_context=create_ssl_context(),
        auto_offset_reset=offset,
        enable_auto_commit=False, 
        value_deserializer=decode_topic
        )
    consumer.subscribe(topics=topic_list)

import (
    "context"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "os"
    "os/signal"
    "reflect"
    "syscall"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "golang.org/x/oauth2/clientcredentials"
)

func handleOAuthBearerTokenRefreshEvent(client kafka.Handle, e kafka.OAuthBearerTokenRefresh) {
    oauthBearerToken, retrieveErr := retrieveUnsecuredToken(e)
    if retrieveErr != nil {
        fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr)
        client.SetOAuthBearerTokenFailure(retrieveErr.Error())
    } else {
        setTokenError := client.SetOAuthBearerToken(oauthBearerToken)
        if setTokenError != nil {
            fmt.Fprintf(os.Stderr, "%% Error setting token and extensions: %v\n", setTokenError)
            client.SetOAuthBearerTokenFailure(setTokenError.Error())
        }
    }
}

func retrieveUnsecuredToken(e kafka.OAuthBearerTokenRefresh) (kafka.OAuthBearerToken, error) {

    //https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/oauthbearer_example/oauthbearer_example.go

    conf := &clientcredentials.Config{
        ClientID:     "dfdfdf",
        ClientSecret: "adfadfa",
        TokenURL:     "https://test.com/auth/realms/pro-realm/protocol/openid-connect/token",
    }
    token, _ := conf.Token(context.Background())
    extensions := map[string]string{}
    oauthBearerToken := kafka.OAuthBearerToken{
        TokenValue: token.AccessToken,
        Expiration: token.Expiry,
        Principal:  "principal=dude",
        Extensions: extensions,
    }
    fmt.Println(oauthBearerToken)
    return oauthBearerToken, nil
}

func main() {

    if len(os.Args) < 5 {
        fmt.Fprintf(os.Stderr, "Usage: %s <broker> <group> <environment> <topics..>\n",
            os.Args[0])
        os.Exit(1)
    }

    broker := os.Args[1]
    group := os.Args[2]
    environment := os.Args[3]
    topics := os.Args[4:]
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    oauthConf := "principal=dude"

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":                   broker,
        "security.protocol":                   "SASL_PLAINTEXT", //"SASL_PLAINTEXT",  SASL_SSL
        "sasl.mechanisms":                     "OAUTHBEARER",
        "sasl.oauthbearer.config":             oauthConf,
        "broker.address.family":               "v4",
        "group.id":                            group,
        "session.timeout.ms":                  6000,
        "enable.ssl.certificate.verification": false,
        "auto.offset.reset":                   "earliest"})

    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }
    fmt.Printf("Created Consumer %v\n", c)
    err = c.SubscribeTopics(topics, nil) //topics is a slice []string https://blog.golang.org/slices-intro
    run := true

    go func(eventsChan chan kafka.Event) {
        for ev := range eventsChan {
            oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
            fmt.Println("wow", oart)
            if !ok {
                // Ignore other event types
                continue
            }

            handleOAuthBearerTokenRefreshEvent(c, oart)
        }
    }(c.Events())

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }
            switch e := ev.(type) {
            case *kafka.Message:
                fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value))
                if e.Headers != nil {
                    fmt.Printf("%% Headers: %v\n", e.Headers)
                }
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
                if e.Code() == kafka.ErrAllBrokersDown {
                    run = false
                }
            default:
                fmt.Printf("Ignored %v\n", e)
            }
        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()

}
edenhill commented 3 years ago

You are using both c.Events() and c.Poll(), they're reading off the same channel, so you will need to choose one. As shown in your output: Ignored OAuthBearerTokenRefresh The c.Poll() gets the TokenRefresh event but ignores it.

I suggest you add a case kafka.OAuthBearerTokenRefresh to your c.Poll() loop.

davidmontgom commented 3 years ago

I added the below:= and took out the events. I am now able to see that tokens were called. '

                      case kafka.OAuthBearerTokenRefresh:
                oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
                fmt.Println("wow", oart)
                if !ok {
                    // Ignore other event types
                    continue
                }
                handleOAuthBearerTokenRefreshEvent(c, oart)

Can I use SASL_SSL for "security.protocol" like my python code that works? I am using SASL_SSL with group.id 0

I now get the below error:

  FindCoordinator response error: Group authorization failed.
% Error: Broker: Group authorization failed: FindCoordinator response error: Group authorization failed.

Simultaneously my python code is working.

I assume this is correct? handleOAuthBearerTokenRefreshEvent(c, oart)

Thanks for help!!!!!!!!

edenhill commented 3 years ago

If you get "Group authorization failed" it means your client was able to connect and authenticate to the cluster, but ACLs are limiting the access to groups. Authentication vs authorization.

https://docs.confluent.io/platform/current/kafka/authorization.html

davidmontgom commented 3 years ago

Could it be Principal: "principal=dude"? I did not have to supply with python. If not principal=dude ten what option on the config am I missng?

The only differene between python and confluent now apears to be this in python:

def create_ssl_context():

    _ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    _ssl_context.options |= ssl.OP_NO_SSLv2
    _ssl_context.options |= ssl.OP_NO_SSLv3
    _ssl_context.verify_mode = ssl.CERT_NONE
    _ssl_context.check_hostname = False
    _ssl_context.load_default_certs()

    return _ssl_context

consumer = KafkaConsumer(
        group_id=None,
        bootstrap_servers=['test:9094'],
        security_protocol='SASL_SSL',
        sasl_mechanism='OAUTHBEARER',
        sasl_oauth_token_provider=TokenProvider(client_id,client_secret),  # returns a JWT 
        ssl_check_hostname=False,
        ssl_context=create_ssl_context(),
        auto_offset_reset=offset,
        enable_auto_commit=False, 
        value_deserializer=decode_topic
        )
    consumer.subscribe(topics=topic_list)
edenhill commented 3 years ago

Your user is not authorized to access the groups, check your ACLs.