Esri / kafka-for-geoevent

ArcGIS GeoEvent Server sample Kafka connectors for connecting to Apache Kafka message servers.
Apache License 2.0
14 stars 10 forks source link

Add additional SSL mechanisms #21

Open QuinnBast opened 4 years ago

QuinnBast commented 4 years ago

The Kafka connector for GeoEvent only supports two communication methods currently, however there are a number of other SSL mechanisms that could be supported.

For kafka there are four possible security protocols which can be defined for the CommonClientConfigs.SECURITY_PROTOCOL_CONFIG property. These are: PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL

If the selected protocol is either SASL_PLAINTEXT or SASL_SSL, additional configurations are required for sasl, specifically, the SaslConfigs.SASL_MECHANISM property can be one of the following values: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, or SCRAM-SHA-512.

These configuration properties are then possible to be configured: If PLAIN or SCRAM was selected:

sasl_plain_username (str) – username for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_plain_password (str) – password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.

If GSSAPI is selected:

sasl_kerberos_service_name (str) – Service name to include in GSSAPI sasl mechanism handshake. Default: ‘kafka’ sasl_kerberos_domain_name (str) – kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers

If OAUTHBEARER is selected:

sasl_oauth_token_provider (AbstractTokenProvider) – OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None

Source

The ConsumerConfig enum does not provide configuration strings for SASL, however, they do exist in other enum classes. Based off of this document an example of configuration for SCRAM-SHA-256 is shown below:

Properties properties=new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

// securityProtocol variable pullled from UI.
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol );

if (securityProtocol == "SASL_SSL" || securityProtocol == "SASL_PLAIN") {
    // saslMechanism variable pulled from UI
    properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism);

    if (saslMechanism == "Plain" || saslMechanism == "SCRAM-SHA-256" || saslMechanism == "SCRAM-SHA-512") {

        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");

    }
    // other mechanism if statements... (OAUTH & Kerberos)
}
// other protocol if statements (SSL, plaintext)

// end conditionals and create the consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

This items would need to be implemented here to configure SSL.