kafka-dev / kafka

A distributed publish/subscribe messaging service
http://sna-projects.com/kafka
Apache License 2.0
549 stars 300 forks source link

Why Kafka Streams require DESCRIBE access to other topics when kafka cluster is configured with SASL_SSL? #65

Open ankitdeora opened 4 years ago

ankitdeora commented 4 years ago

I have a 3 Node Kafka cluster (Kafka Version 1.1.1) with SASL_SSL security enabled using pluggable JAAS configurations. When Kafka Producers and consumers with proper security configurations communicate with secured broker then write/read operations works without any issue, when I use Kafka streams then I am seeing a weird behaviour even if the Stream application is working fine on client side, I see an error log in the server side where threads corresponding to stream application are making a describe request to all other topics on the broker. My Stream application is very simple which copies data from a topic "testA" to another topic "testB". Why are Kafka stream threads on broker side trying to describe other topics which steam application is not even authorised to describe. Ideally the stream application should only describe the topics "testA" and "testB", if my understanding is correct.

The error log is :

[2020-01-16 09:28:02,235] ERROR Requested Operation : DESCRIBE for Resource : Topic:dmp-tgd8uye1am cannot be authorized due to invalid/missing claims in the JWT_TOKEN claims : { Topic:testA=[READ, WRITE, DESCRIBE], Topic:testB=[READ, WRITE, DESCRIBE], Group:*=[READ, WRITE, DESCRIBE] } (com.fico.dmp.core.security.providers.kafka.authorization.DmpJwtAuthorizer)

The JWT_TOKEN is set by the Client side with permissions for "testA" and "testB". Why in the above error log stream thread is making describe call to "dmp-tgd8uye1am" topic ?

This token is validated on the server side involving 2 steps a) Authentication b) Authorisation

a) Authentication with SaslServer class : This step is successful.

b) Authorisation with DmpJwtAuthorizer class (implementation details below) : This is the step where the weird logs show up. I can see that in the broker process logs there are multiple threads corresponding to stream application doing authorisation. The threads which request describe access for "testA" and "testB" passes. And the remaining threads request for DESCRIBE access on other topics like "dmp-tgd8uye1am" which stream application has nothing to do with.

The broker security configurations are as follows :

  1. Kafka-server-start.sh

    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.1.1/kafka-config/kafka_server_jaas.conf"
  2. kafka_server_jaas.conf

    KafkaServer {
    com.fico.dmp.core.security.providers.kafka.authentication.DmpJwtLoginModule required
    username="kafkabroker"
    password="HWBcKgfL5bQN"
    user_kafkabroker="HWBcKgfL5bQN";
    };
    Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="kafka"
    password="HWBcKgfL5bQN";
    };
  3. Server.properties

    
    listeners=SSL://:9093,SASL_SSL://:9094
    advertised.listeners=SSL://:9093,SASL_SSL://:9094
    sasl.enabled.mechanisms=PLAIN
    authorizer.class.name=com.core.security.providers.kafka.authorization.DmpJwtAuthorizer
    principal.builder.class=com.core.security.providers.kafka.authentication.DmpJwtPrincipalBuilder
    security.inter.broker.protocol=SSL
    ssl.client.auth=required
    ssl.truststore.location=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.amzn2.0.1.x86_64/jre/lib/security/cacerts
    ssl.truststore.password=******
    ssl.keystore.location=/opt/kafka_2.11-1.1.1/kafka-config/keystore
    ssl.keystore.password=*****

default.replication.factor=3 min.insync.replicas=2 auto.leader.rebalance.enable=true


The KafkaStream client configs and working logic are as follows :

configStreamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().configStreamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); configStreamProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * configStreamProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, configStreamProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, configStreamProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2"); configProperties.put("security.protocol", "SASL_SSL"); configProperties.put("sasl.mechanism", "PLAIN"); configProperties.put("sasl.jaas.config", "com.core.security.providers.kafka.authentication.DmpJwtClient required;");

final StreamsBuilder builder = new StreamsBuilder();

String inputTopic = (String) inputOptions.get("testA"); String outputTopic = (String) outputOptions.get("testB");

builder.stream(inputTopic).to(outputTopic);

final KafkaStreams streams = new KafkaStreams(builder.build(), configStreamProperties);

streams.cleanUp(); streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));


DmpJwtAuthorizer Class on Server Side :

public class DmpJwtAuthorizer extends kafka.security.auth.SimpleAclAuthorizer {

// SimpleAclAuthorizer is part of Kafka libraries and extends the kafka.security.auth.Authorizer class

private static final Logger logger = LoggerFactory.getLogger(DmpJwtAuthorizer.class);

@Override
public boolean authorize(Session session, Operation operation, Resource resource) {
    boolean authorized = false;

    DmpKafkaPrincipal principal = (DmpKafkaPrincipal) session.principal();
    String user = principal.getName();
    Map<Resource, List<AclOperation>> allowedResources = principal.getAllowedResources();

    if ((user != null) && (!user.isEmpty())) {
        Iterator<Entry<Resource, List<AclOperation>>> iterator = allowedResources.entrySet().iterator();

        while(iterator.hasNext() && !authorized) {
            Entry<Resource, List<AclOperation>> entry = iterator.next();
            Resource allowedResource = entry.getKey();
            List<AclOperation> allowedOperations = entry.getValue();

            AclOperation requestedACLOperation = operation.toJava();

            if(resource.resourceType().name().equals(allowedResource.resourceType().name())
                    && isMatchingResourceName(resource, allowedResource)
                    && allowedOperations.contains(requestedACLOperation)){

                authorized = true;
            }
        }

        if (!authorized) {

          logger.error("User : {} Requested Operation : {} for Resource : {} cannot be authorized due to invalid/missing claims in the JWT_TOKEN claims : {} THREAD NAME : {} THREAD ID : {}", user, operation.toJava(), resource, allowedResources, Thread.currentThread().getName(), Thread.currentThread().getId());
        }
    }

    return authorized;
}

public boolean isMatchingResourceName(Resource requestedResource, Resource allowedResource){
    String requestedResourceName = requestedResource.name();
    String allowedResourceName = allowedResource.name();

    String regex = ("\\Q" + allowedResourceName + "\\E").replace("*", "\\E.*\\Q");
    return requestedResourceName.matches(regex);
}

}