Closed LeonardoBonacci closed 3 years ago
JulieOps is also adding the topic ALL operation with the prefix. https://github.com/kafka-ops/julie/blob/2f46ee0962802e3c656590c7a8308ad58775587e/src/main/java/com/purbon/kafka/topology/roles/acls/AclsBindingsBuilder.java#L176
There is also an integration test verifying the case with Streams internal topics. @LeonardoBonacci can you explain if anything is missing from this test case to verify Streams internal topic ACLs?
Hi @LeonardoBonacci, I am a bit confused with the issue your described here. As @akselh already mentioned, you can as well see from https://docs.confluent.io/platform/current/streams/developer-guide/security.html#required-acl-setting-for-secure-ak-clusters the acls need for a Kafka Stream application can be sumarized as:
# Allow Streams to read the input topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --topic input-topic1 --topic input-topic2
# Allow Streams to write to the output topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation WRITE --topic output-topic1 --topic output-topic2
# Allow Streams to manage its own internal topics:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DELETE --operation WRITE --operation CREATE --resource-pattern-type prefixed --topic team1-streams-app1
# Allow Streams to manage its own consumer groups:
bin/kafka-acls ... --add --allow-principal User:team1 --operation READ --operation DESCRIBE --resource-pattern-type prefixed --group team1-streams-app1
Note that here the application.id is team1-streams-app1, this acls will be created from within the link as provided.
List<AclBinding> acls = new ArrayList<>();
readTopics.forEach(
topic ->
acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.READ)));
writeTopics.forEach(
topic ->
acls.add(
buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.WRITE)));
acls.add(buildTopicLevelAcl(principal, prefix, PatternType.PREFIXED, AclOperation.ALL));
acls.add(buildGroupLevelAcl(principal, prefix, PatternType.PREFIXED, AclOperation.READ));
does this makes sense?
docker-compose exec kafka kafka-acls --authorizer-properties zookeeper.connect=zookeeper:2181 --list --principal User:Streams0
ACLs for principal `User:Streams0`
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=context.company.env.source.projectA, patternType=PREFIXED)`:
(principal=User:Streams0, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topicB, patternType=LITERAL)`:
(principal=User:Streams0, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=context.company.env.source.projectA, patternType=PREFIXED)`:
(principal=User:Streams0, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topicD, patternType=LITERAL)`:
(principal=User:Streams0, host=*, operation=WRITE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topicA, patternType=LITERAL)`:
(principal=User:Streams0, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topicC, patternType=LITERAL)`:
(principal=User:Streams0, host=*, operation=WRITE, permissionType=ALLOW)
are all ACLs generated with our custom example and as you can see from https://github.com/kafka-ops/julie/blob/master/src/test/java/com/purbon/kafka/topology/integration/StreamsAclIT.java this is a full IT test suite for Streams.
I guess, something else might be happening.
Apologies to be the source of confusion. I will try to supply an example to reproduce. There are two use-cases, with and without applicationId in the descriptor.yaml. First, without applicationId.
Using principal mapping, this descriptor...
context: "test" projects:
with plans.yaml
plans: default: alias: "default" config: replication.factor: "3" num.partitions: "2"
generates the following ACL's:
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test.projectX.topic-two, patternType=LITERAL)`:
(principal=User:215872, host=*, operation=WRITE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test.projectX, patternType=PREFIXED)`:
(principal=User:215872, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test.projectX.topic-one, patternType=LITERAL)`:
(principal=User:215872, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test.projectX, patternType=PREFIXED)`:
(principal=User:215872, host=*, operation=READ, permissionType=ALLOW)
running this simple KStream topology/app
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "some-app-id");
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("test.projectX.topic-one",
Consumed.with(Serdes.String(), Serdes.String()))
.print(Printed.toSysOut());
results in this error..
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: some-app-id
I'm starting to understand.. .
run the Kstream with this
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test.projectX.something");
and it works.
Closing this issue, as it seems the result of my own misunderstanding. Apologies!
A stream app needs ACL's to create internal topics.
According to the docs
The resource name (prefix) is by default the topic name prefix in the project. This is not what I observe to be necessary/needed. What happens is described in the post. The prefix of internal topics is the application id as specified in the KStream properties.
As an example, the following KStream config:
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "theIntegStreamsApp");
needs this ACL to work with an state store + internal topic
./kafka-acls \ --bootstrap-server some-host:9092 \ --command-config path-to-config \ --add --allow-principal User:123456 \ --operation All --topic theIntegStreamsApp \ --resource-pattern-type prefixed
Two flavors in julie's descriptor.yaml:
Adding applicationId to the descriptor.yaml only adds this read ACL, and is independent of the issue described here.
Solution With application id specified in the yaml the solution is straightforward: use that to create the appropriate ACL. Without application id specified....? Suggestions?