confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
44 stars 1.04k forks source link

Problem writing to topics with ksqldb and Kafka ACLs #9718

Open aachenmax opened 1 year ago

aachenmax commented 1 year ago

Hello everybody, I'm currently testing ksqldb-server and have some issues connecting ksqldb to a managed Kafka cluster (MSK) that uses ACLs. Specifically, there seems to be a problem when writing to topics with ksqldb (INSERT).

For the setup, I followed the guidelines here to configure the authorization required for ksqldb. Below is an excerpt from the current ACLs I'm using. The <ksql.service.id> is set to "test" in ksqldb-server and we prefix the output topics, e.g. ksql.output.topic.name.prefix is set to "private.test." (see also ksqldb configuration below):

ACLs:

test.ksqldb-user:
    allow-describe-configs-cluster:
      name: "kafka-cluster"
      type: CLUSTER
      pattern: LITERAL
      host: "*"
      operation: DESCRIBE_CONFIGS
      permission: ALLOW
    allow-describe-cluster:
      name: "kafka-cluster"
      type: CLUSTER
      pattern: LITERAL
      host: "*"
      operation: DESCRIBE
      permission: ALLOW
    allow-all-ksql-groups:
      name: "_confluent-ksql-test"
      type: GROUP
      pattern: PREFIXED
      host: "*"
      operation: ALL
      permission: ALLOW
    allow-all-ksql-topics:
      name: "_confluent-ksql-test"
      type: TOPIC
      pattern: PREFIXED
      host: "*"
      operation: ALL
      permission: ALLOW
    allow-all-private-topics:
      name: private.test
      type: TOPIC
      pattern: PREFIXED
      host: "*"
      operation: ALL
      permission: ALLOW
    allow-all-private-groups:
      name: private.test
      type: GROUP
      pattern: PREFIXED
      host: "*"
      operation: ALL
      permission: ALLOW
    allow-read-all-groups:
      name: "*"
      type: GROUP
      pattern: LITERAL
      host: "*"
      operation: READ
      permission: ALLOW
    allow-describe-all-topics:
      name: "*"
      type: TOPIC
      pattern: LITERAL
      host: "*"
      operation: DESCRIBE
      permission: ALLOW
    allow-describe-all-groups:
      name: "*"
      type: GROUP
      pattern: LITERAL
      host: "*"
      operation: DESCRIBE
      permission: ALLOW
    allow-describe-configs-all-topics:
      name: "*"
      type: TOPIC
      pattern: LITERAL
      host: "*"
      operation: DESCRIBE_CONFIGS
      permission: ALLOW
    allow-describe-configs-all-groups:
      name: "*"
      type: GROUP
      pattern: LITERAL
      host: "*"
      operation: DESCRIBE_CONFIGS
      permission: ALLOW
    allow-describe-transactional-id:
      name: "test"
      type: TRANSACTIONAL_ID
      pattern: LITERAL
      host: "*"
      operation: DESCRIBE
      permission: ALLOW
    allow-write-transactional-id:
      name: "test"
      type: TRANSACTIONAL_ID
      pattern: LITERAL
      host: "*"
      operation: WRITE
      permission: ALLOW

ksqldb Configuration

image: confluentinc/cp-ksqldb-server
imageTag: 7.3.0

...

ksql:
  headless: false

...

configurationOverrides: 
  "security.protocol": "SASL_SSL" 
  "sasl.mechanism": "SCRAM-SHA-512"
  "ksql.output.topic.name.prefix": "private.test."
  "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username='test.ksqldb-user' password='######';"
  "ksql.service.id": "test"

This configuration seems to work fine and ksqldb starts without any issues. The command topic is successfully created and also filled with data. Using ksql-cli I can also successfully create a stream:

ksql> CREATE STREAM test_stream (key VARCHAR KEY, message VARCHAR) WITH (kafka_topic='private.test.test_stream', value_format='JSON', partitions=1);

 Message        
----------------
 Stream created 
----------------

However, when I try to insert data into the stream, I get the following error:

ksql> INSERT INTO test_stream (key, message) VALUES ('testKey', 'Hello World!');

Failed to insert values into 'TEST_STREAM'.
Caused by: Authorization denied to Write on topic(s):
    [private.test.test_stream]. Caused by: The producer is not authorized
    to do idempotent sends. Check that you have write permissions to the specified
    topic, and disable idempotent sends by setting 'enable.idempotent=false'  if
    necessary.

I tried the suggestion and set 'enable.idempotent=false' in the configurationOverrides. This now allows inserting data into the existing stream, however, new streams cannot be created anymore:

ksql> CREATE STREAM test_stream2 (key VARCHAR KEY, message VARCHAR) WITH (kafka_topic='private.test.test_stream2', value_format='JSON', partitions=1);

Cannot set a transactional.id without also enabling idempotence.

To double check, whether this is another authorization issue, I also tried a plain kafka producer client using confluent_kafka in Python and the same credentials as above to write data into the topic. This also works without any problems.

It would be great to get some hints on maybe missing configuration or ACLs to get ksqldb to work as expected or to know if others have encountered similar issues.

IanWhitney commented 1 year ago

Also interested in this one. We had a ksql stream start failing last week and the Authorizer logs pointed us to this missing ACL. Once we added the ACL the ksql stream resumed working.

It looks like it started when we pulled the ksqldb-cli container with sha c313611af45528c03d4e79df2f7f91ec98ce3a8bc6ed0804ee8bf2d6053670a6 on November 21st. Before that we did not need to declare the Idempotent Write acl.

aachenmax commented 1 year ago

@IanWhitney Which version did you use before that showed no issues?

I experimented a bit more and also tried setting the transactional.id explicitly and re-enabled idempotence, such as:

configurationOverrides: 
...
  "transactional.id": "test"
  "enable.idempotence": "true"

This again makes it possible to create streams without any problems. When trying to insert into the stream, a new error message is shown:

ksql> CREATE STREAM test_stream3 (key VARCHAR KEY, message VARCHAR) WITH (kafka_topic='private.test.test_stream3', value_format='JSON', partitions=1);

 Message        
----------------
 Stream created 
---------------- 

ksql> INSERT INTO test_stream3 (key, message) VALUES ('testKey', 'Hello World!');

Failed to insert values into 'TEST_STREAM3'.
Caused by: Cannot add partition private.test.test_stream3-0 to
    transaction before completing a call to initTransactions
IanWhitney commented 1 year ago

That's a good question. But the logs that track that only go back 90 days and I'm not seeing anything. So our previous version must have been > 90 days old. Which could still include 27.2, since that was released 4 months ago.

If necessary I could downgrade a non-prod server and track it down to a specific version.

rafael-natali-bzy commented 1 year ago

Hello @IanWhitney and @aachenmax I'm facing a similar issue with the 0.27.2 version.

We are able to create the streams; however, when we try to select the ksql tables via cli I can see the following error in the Brokers:

[2022-12-19 14:14:19,082] INFO Principal = User:<redacted> is Denied Operation = IdempotentWrite from host = 127.0.0.6 on resource = Cluster:LITERAL:kafka-cluster
for request = InitProducerId with resourceRefCount = 1 (kafka.authorizer.logger)

and in the ksqlDB:

Error encountered trying to send record to topic _confluent-ksql-dev_ksqldb_transient_transient_TEST_BEA_2920455516171037256_1671445839475-KsqlTopic-Reduce-changelog

I added the IdempotentWrite ACL to the cluster but, the error persists:

(principal=User:svc_mda_user, host=*, operation=IDEMPOTENT_WRITE, permissionType=ALLOW)

I found this documentation that mention to add the --idempotent to the topic. However, each time we run a select the ksql creates a new internal topic. So, I cannot give permission directly to the internal topic.

I didn't find an alternative for this, if you have any suggestion I'd really appreciate.

Thanks.

rafael-natali-bzy commented 1 year ago

sorry @IanWhitney and @aachenmax!

There was a typo in the ACL command I was using :flushed:

d-t-w commented 1 year ago

Hi @aachenmax, what version of Kafka is running on the broker?

Kafka 3.2.0 changed producer behaviour to be idempotent by default. (I'm guessing that ksqlDB uses kafka-clients 3.2.0+ as a dependency, and perhaps does not specifically set non-idempotent producers).

If your brokers are v2.8.0 or older, and you have ACL configured, then you will now have to set the IDEMPOTENT_WRITE ACL or your previously non-idempotent producers who are now idempotent will not be able to produce any more.

In brokers v2.8.0+ that ACL is on by default, I believe.

This would also tie in with @IanWhitney's report of only requiring this ACL after updating to a specific version of ksql.

This would also explain why your write via python to the topic succeeded, as I think this change was only implemented in the Java libraries (not sure if/when that will apply to the python libs).

More information here: https://kpow.io/articles/kafka-producer-breaking-change/

IanWhitney commented 1 year ago

Our entire cluster is currently running Confluent 5.4.x, which puts us at Kafka 2.4 across the board. So that lines up.