twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.8+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.84k stars 188 forks source link

CLUSTER_AUTHORIZATION_FAILED when producing. #26

Closed ain-valtin closed 3 years ago

ain-valtin commented 3 years ago

Using franz-go 0.6.9 with Go 1.16

I create client like

    client, err := kgo.NewClient(
        kgo.SeedBrokers(fmt.Sprintf("%s:%d", address, port)),
        kgo.SASL(scram.Auth{User: username, Pass: secret}.AsSha256Mechanism()),
        kgo.BatchCompression(kgo.SnappyCompression()),
        kgo.ClientID("my-client-id"),
        kgo.MaxVersions(kversion.V2_6_0()),
        kgo.WithLogger(&loggerForFranz{log, kgo.LogLevelInfo}),
    )

and use it to issue MetadataRequest and ListOffsetsRequest, then set client.AssignPartitions() based on response and consume records from topic. This works ok. I leave an background goroutine PollFetches-ing.

Using the same client I also try to produce record to topic:

    err := client.Produce(ctx,
        &kgo.Record{
            Topic: "my.topic.name",
            Key:   key,
            Value: value,
        },
        func(r *kgo.Record, e error) {
            if e != nil {
                // log error
            }
        })

this logs error from promise: "CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed.".

stack trace

/app/pkg/mod/github.com/twmb/franz-go@v0.6.9/pkg/kgo/producer.go : 147 : github.com/twmb/franz-go/pkg/kgo.(*Client).finishRecordPromise
/app/pkg/mod/github.com/twmb/franz-go@v0.6.9/pkg/kgo/sink.go : 1184 : github.com/twmb/franz-go/pkg/kgo.(*recBuf).lockedFailAllRecords
/app/pkg/mod/github.com/twmb/franz-go@v0.6.9/pkg/kgo/sink.go : 1166 : github.com/twmb/franz-go/pkg/kgo.(*recBuf).failAllRecords
/app/pkg/mod/github.com/twmb/franz-go@v0.6.9/pkg/kgo/sink.go : 305 : github.com/twmb/franz-go/pkg/kgo.(*sink).drain

and logs from the lib:

initializing producer id
producer id initialization errored
map[err:CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed.]
InitProducerID or AddPartitionsToTxn, failing producer id
map[err:CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed.]
ignoring a fail producer id request due to current id being different
map[current_epoch:-1 current_err:CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed. current_id:-1 fail_epoch:0 fail_err:CLUSTER_AUTHORIZATION_FAILED: Cluster authorization failed. fail_id:0]

What I'm doing wrong?

twmb commented 3 years ago

Hi!

This client by default opts in to idempotent writes and currently has no way of disabling this. I've been intending to add an option to disable this because not everybody wants this, and I'll do that shortly.

Until that is done, idempotent writes require

operation=IDEMPOTENT_WRITE, permissionType=ALLOW, resource: cluster

Is this permission missing, and if so, is it possible to add it?

ain-valtin commented 3 years ago

I think the permission is missing, what I see in kafka-ui under ACLS for the topic is just "ALLOW DESCRIBE READ WRITE"... have to talk to the admin to see can it be added.

twmb commented 3 years ago

Hi there! I introduced an option to disable idempotent writes with https://github.com/twmb/franz-go/commit/3e6bcc3. I'm going to close this issue, please reopen if this does not fix things.

ain-valtin commented 3 years ago

In the meantime my admin tweaked something in the user rights and producing into the topic started to work. However the code which worked before now fails to read topic offsets - it (kmsg.NewListOffsetsRequest()) gets -1 as both oldest and newest offset for the topic's single partition...? Still the same with latest version 0.6.10

twmb commented 3 years ago

Interesting. Are you referring to the request in the consumer.go code, or are you issuing that request manually?

ListOffsets uses -1 for offsets if there is an error, and the ErrorCode field will be nonzero if so. Otherwise, -1 is also used if the requested partition does not exist.

If you're issuing the request manually, what does the ErrorCode field say?

If this is from the consumer.go code, if the error is not retriable, the error is returned in a TopicPartition in PollFetches. If the error is retriable, the load continues to retry and you wouldn't see an error. If this was from no error but ListOffsets returned -1, I think the partition is avoided being fetched.

ain-valtin commented 3 years ago

I'm issuing it manually: first I issue an kmsg.NewMetadataRequest() for the topic I'm interested in and using it's response I build kmsg.NewListOffsetsRequest() using -1 and -2 for Timestamp. Indeed the response has a error code set, turns out I do not check response for that:

"Partitions":[{"Partition":0,"ErrorCode":74,"OldStyleOffsets":null,"Timestamp":-1,"Offset":-1,"LeaderEpoch":-1}
twmb commented 3 years ago

So that error code is FENCED_LEADER_EPOCH. If I were to guess, you're adding partitions to the request using kmsg.ListOffsetsRequestTopicPartition{Partition: #, Timestamp: -1}, which leaves out the CurrentLeaderEpoch field, which then defaults to 0 and you'll get that error. The two options here are to either manually specify -1 for CurrentLeaderEpoch, or (more recommended for external users, I know I don't do this internally in the client), use kmsg.NewListOffsetsRequestTopicPartition() which will call Default before returning.

ain-valtin commented 3 years ago

Indeed I were using kmsg.ListOffsetsRequestTopicPartition{Partition: #, Timestamp: -1} as this is one-liner instead of 3 lines when using the New... constructor... switching to constructor fixed it (ie reading works now again 👍). I already use constructors in some places so guess should stick to the constructors everywhere.

BTW the timestamp 0 seems to have a special meaning too (like -1 and -2) when querying offsets but it is not mentioned in the docs?

twmb commented 3 years ago

What are you seeing with timestamp 0? I think this would behave like timestamp -1, because Kafka internally looks for the first offset that has a timestamp on or after the requested timestamp.

The constructor is recommended, but yeah a bit verbose. Unfortunately there's no nicer way to do it in Go; with Kotlin it could look something like apply.

ain-valtin commented 3 years ago

Timestamp 0 seems to return the offset of the first message present in the topic (the topic has cleanup.policy=compact). Which ofc is in line with the "Kafka internally looks for the first offset that has a timestamp on or after the requested timestamp" but not the same as -1 :)

twmb commented 3 years ago

Ahh whoops I meant -2 (earliest offsets). Cool, glad this is solved for you!