IBM / sarama

Sarama is a Go library for Apache Kafka.
MIT License
11.56k stars 1.76k forks source link

The client is not authorized to access this topic #2974

Open aethir-paas opened 2 months ago

aethir-paas commented 2 months ago
Description

When I use SASL OAUTHBEARER authentication, is there an internal mechanism to automatically refresh the token? Currently, my service encounters the error: "The client is not authorized to access this topic."

Versions
Sarama Kafka Go
v1.43.2 v3.5.1 V1.19
Configuration
type accessTokenProvider struct {
    awsRegion   string
    awsProfiles string
    token       string
    mu          sync.RWMutex
}

func (m *accessTokenProvider) Token() (*sarama.AccessToken, error) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    var err error
    if m.awsProfiles == "" {
        m.token, _, err = signer.GenerateAuthToken(context.Background(), m.awsRegion)
        return &sarama.AccessToken{Token: m.token}, err
    } else {
        m.token, _, err = signer.GenerateAuthTokenFromProfile(context.Background(), m.awsRegion, m.awsProfiles)
        return &sarama.AccessToken{Token: m.token}, err
    }
}

func (m *accessTokenProvider) RefreshToken() {
    for {
        var token string
        var err error
        if m.awsProfiles == "" {
            token, _, err = signer.GenerateAuthToken(context.Background(), m.awsRegion)
        } else {
            token, _, err = signer.GenerateAuthTokenFromProfile(context.Background(), m.awsRegion, m.awsProfiles)
        }
        if err != nil {
            fmt.Println("RefreshToken error :", err)
        }

        m.mu.Lock()
        m.token = token
        m.mu.Unlock()

        // 等待令牌到期前的一段时间再刷新
        time.Sleep(10 * time.Minute)
    }
}

func InitConfig(awsRegion string, awsProfiles string, SASL, TLS bool) *sarama.Config {
    /*err := godotenv.Load()
    if err != nil {
        log.Fatal("Error loading .env file")
    }
    */
    /*awsRegion, hasEnvRegion := os.LookupEnv(awsRegion)
    if !hasEnvRegion {
        log.Fatal("AWS_REGION environment variable not set")
    }*/

    configure := sarama.NewConfig()
    if SASL {
        configure.Net.SASL.Enable = true
        configure.Net.SASL.Mechanism = sarama.SASLTypeOAuth

        tokenProvider := &accessTokenProvider{
            awsRegion:   awsRegion,
            awsProfiles: awsProfiles,
        }
        configure.Net.SASL.TokenProvider = tokenProvider
        go func() {
            tokenProvider.RefreshToken()
        }()
    }

    if TLS {
        configure.Net.TLS.Enable = true
        configure.Net.TLS.Config = &tls.Config{}
    }
    configure.Consumer.Offsets.Initial = sarama.OffsetOldest
    return configure
}
Logs
The client is not authorized to access this topic

2024-08-26 20:00:00.528033078 +0000 UTC m=+43107.359917041 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic 2024-08-26 20:00:01.528134796 +0000 UTC m=+43108.360018739 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic 2024-08-26 20:00:02.867247119 +0000 UTC m=+43109.699131083 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic 2024-08-26 20:00:05.02019884 +0000 UTC m=+43111.852082706 kafka: Failed to produce message to topic container_and_session_event: kafka server: The client is not authorized to access this topic

Additional Context

I reviewed the relevant documentation, and it seems that when the producer sends a message, it triggers authenticateViaSASLv1, which retrieves the latest token through an interface class. However, this behavior is not as I expected. Currently, the authentication fails periodically after some time. I’m not sure what internal mechanism could be used to refresh the token automatically.

JunliWang commented 1 month ago

Broker has per listener config that forces clients to re-authenticate: https://kafka.apache.org/documentation/#brokerconfigs_connections.max.reauth.ms By default it is off.