kedacore / keda

KEDA is a Kubernetes-based Event Driven Autoscaling component. It provides event driven scale for any container running in Kubernetes
https://keda.sh
Apache License 2.0
8.04k stars 1.01k forks source link

Unable to create Scaled Object with Oauthbearer SASL type with Confluent Cloud cluster #5757

Open acartag7 opened 2 months ago

acartag7 commented 2 months ago

Report

We are trying to setup Kafka Scaled object with our confluent cloud dedicated cluster using SASL/OAUTHBEARER authentication and we are getting authentication failures.

"error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}

I've tried following the documentation here, but seems like there is a problem with the extensions part as during my tests, I saw the following:

If I set the oauthExtensions: invalid=nothing I get the usual logicalCluster is missing a cluster_id

"error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY"}

Now if I set up oauthExtensions: extension_identityPoolId=pool-ebYj without the cluster id I get the authentication failed message (the same if I put the the pool id and cluster id as in the manifests below):

"error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}

I know its not the credentials as when I input incorrect credentials I get the following message in the operator directly from microsoft Entra ID:

"error": "error creating kafka client: kafka: client has run out of available brokers to talk to: oauth2: \"unauthorized_client\" \"XXXXXXXXX: Application with identifier 'YYYYY-YYYY-YYYY-YYYY-YYYYYYYYYxxx' was not found in the directory 'XXX AG'. This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You may have sent your authentication request to the wrong tenant. Trace ID: xxxx-xxxx-xxx-xxxx Correlation ID: xxxx-xxxx-xxx-xxxx Timestamp: 2024-04-30 16:00:34Z\" \"https://login.microsoftonline.com/error?code=700016\""}

The Scaledobject using sasl plaintext and the api keys is working without issues but we can't use this auth method in our setup.

I think this issue hasn't been reported before, any ideas on what I could try?

Expected Behavior

The Kafka scaler is active, in ready status and the deployments scale properly.

Actual Behavior

Authentication Fails with sasl/oauthbearer.

Steps to Reproduce the Problem

To test this you will need a kafka cluster and you should authenticate with SASL/OAUTHBEARER Deploy the manifests below:

kind: Secret
apiVersion: v1
metadata:
  name: keda-secrets-0004
stringData:
  username: superusername
  password: superpassword
  oauthTokenEndpointUri: https://login.microsoftonline.com/tenant_id/oauth2/token
  scopes: "superusername/.default"
  oauthExtensions: extension_logicalCluster=lkc-x8ff65,extension_identityPoolId=pool-xyzs
type: Opaque
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: trigger-authentication-0004
spec:
  secretTargetRef:
    - parameter: username
      name: keda-secrets-0004
      key: username
    - parameter: password
      name: keda-secrets-0004
      key: password
    - parameter: oauthTokenEndpointUri
      name: keda-secrets-0004
      key: oauthTokenEndpointUri
    - parameter: scopes
      name: keda-secrets-0004
      key: scopes
    - parameter: oauthExtensions
      name: keda-secrets-0004
      key: oauthExtensions
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: scaler-0004
spec:
  minReplicaCount: 0
  maxReplicaCount: 3
  scaleTargetRef:
    name: kafka-inspector-keda-test
    apiVersion: apps/v1
    kind: Deployment
  pollingInterval:  60
  triggers:
    - type: kafka
      authenticationRef:
        name: trigger-authentication-0004
      metadata:
        bootstrapServers: lkc-x8ff65.privdomxxxxx.eu-central-1.aws.confluent.cloud:9092
        topic: test-topic
        consumerGroup: app.myconsumer-dev
        lagThreshold: '50'
        activationLagThreshold: '0'
        offsetResetPolicy: latest
        sasl: "oauthbearer"
        tls: "enable"
---
# create simple nginx deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kakfa-inspector-keda-test
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kakfa-inspector-keda-test
  template:
    metadata:
      labels:
        app: kakfa-inspector-keda-test
    spec:
      containers:
      - name: nginx
        image: nginx:latest
        ports:
        - containerPort: 80

Logs from KEDA operator

2024-04-30T15:13:57Z    INFO    Reconciling ScaledObject        {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04"}
2024-04-30T15:13:57Z    INFO    Creating a new HPA      {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "HPA.Namespace": "default", "HPA.Name": "keda-hpa-scaler-0004"}
2024-04-30T15:14:19Z    ERROR   scale_handler   error resolving auth params     {"type": "ScaledObject", "namespace": "default", "name": "scaler-0004", "scalerIndex": 0, "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}
2024-04-30T15:14:19Z    ERROR   Error getting scalers   {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}
2024-04-30T15:14:19Z    ERROR   Failed to create new HPA resource       {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "HPA.Namespace": "default", "HPA.Name": "keda-hpa-scaler-0004", "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}
2024-04-30T15:14:19Z    ERROR   failed to ensure HPA is correctly created for ScaledObject      {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}
2024-04-30T15:14:19Z    ERROR   Reconciler error        {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}

KEDA Version

2.14.0

Kubernetes Version

1.27

Platform

Any

Scaler Details

Kafka

Anything else?

No response

JorTurFer commented 1 month ago

@dttung2905 @zroubalik , you are the kafka experts šŸ˜„

fabiodellanna commented 1 month ago

Hi all, I experienced the same issue reported by @acartag7 I'm not able to autoscale a cluster with oauthbearer authentication. No news so far?

dttung2905 commented 1 month ago

sorry I missed this. Let me take a look into this in the next few days.Probably need to create a local strimzi kafka cluster with SASL/OAUTHBEARER authentication first :laughing:

dttung2905 commented 1 month ago

I just spend some times looking through the code but can't find anything yet. Partly I think its because I had not been able to set up a local strimzi cluster with SASL/OAUTHBEARER authentication. :facepalm:

Hi @adrien-f , I see you have recently contributed with this PR https://github.com/kedacore/keda/pull/5692. I think you might have an already kafka cluster setup with SASL/OAUTHBEARER and has much better understanding of this part of the code than me :smile: Do you mind help us take a look at this too ?

adrien-f commented 1 month ago

Greetings ! I'd gladly help as much as possible!

So as a reference, if not configured for IAM MSK, it will use the Kafka OAuthBearerTokenProvider:

https://github.com/kedacore/keda/blob/d63bc7adacb8b8c35569a2f7551ebd640cbd6d04/pkg/scalers/kafka_scaler.go#L659

The errors mentioning the use of OAUth, we can assume that the configuration was parsed properly. We can also add a unit test to verify that later. What's left is to try out the Token Provider, you could try this manually (code written by hand):

package main

import (
    "context"
    "sync"
    "time"

    "github.com/IBM/sarama"
    "github.com/aws/aws-msk-iam-sasl-signer-go/signer"
    "github.com/aws/aws-sdk-go-v2/aws"
    "golang.org/x/oauth2"
    "golang.org/x/oauth2/clientcredentials"
)

type TokenProvider interface {
    sarama.AccessTokenProvider
    String() string
}

type oauthBearerTokenProvider struct {
    tokenSource oauth2.TokenSource
    extensions  map[string]string
}

func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []string, extensions map[string]string) TokenProvider {
    cfg := clientcredentials.Config{
        ClientID:     clientID,
        ClientSecret: clientSecret,
        TokenURL:     tokenURL,
        Scopes:       scopes,
    }

    return &oauthBearerTokenProvider{
        tokenSource: cfg.TokenSource(context.Background()),
        extensions:  extensions,
    }
}

func (o *oauthBearerTokenProvider) Token() (*sarama.AccessToken, error) {
    token, err := o.tokenSource.Token()
    if err != nil {
        return nil, err
    }

    return &sarama.AccessToken{Token: token.AccessToken, Extensions: o.extensions}, nil
}

func (o *oauthBearerTokenProvider) String() string {
    return "OAuthBearer"
}

func main() {
    tp := OAuthBearerTokenProvider("xxx", "xxx", ....)
    token, err := tp.Token()
    if err != nil {
        fmt.Println("could not get token", err)
    } else {
        fmt.Println("got token", token)
    }
}

If we do indeed have a Token, we'll have to dig deeper šŸ˜…

djmacken557 commented 1 week ago

Hello, we are encountering the same scenario trying to set up KEDA scaler using oauthbearer to Confluent Cloud. saslplaintext also works as the original poster described. Are there any updates on this issue?

adrien-f commented 1 week ago

Hello @djmacken557 !

Would that be possible to run the debugging script like I mentioned to ensure the token is indeed fetched? The idea is to validate that it's possible to fetch the token before going further into the library.

rasifmahmud commented 1 week ago

@adrien-f @dttung2905 getting rid of the extension_ prefix solved the issue for us. For instance try with this: oauthExtensions: logicalCluster=lkc-x8ff65,identityPoolId=pool-xyzs Let's get the documentation updated. I am assuming it was tested before with that prefix? If that used to work earlier but not anymore, then I would believe confluent has changed this server-side recently.

zroubalik commented 1 week ago

@rasifmahmud thanks for the confirmation :) Are you willing to update docs?

@adrien-f do you think it makes sense to add an unit test for this?

rasifmahmud commented 1 week ago

@zroubalik sure, I will raise a PR