AbsaOSS / hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Apache License 2.0
44 stars 13 forks source link

Fix Deduplicator config #232

Closed kevinwallimann closed 3 years ago

kevinwallimann commented 3 years ago

Description

After the SASL_SSL upgrade, the deduplicator failed with error messages like

2021-06-11 16:06:21 DEBUG NetworkClient:916 - [Consumer clientId=hyperdrive_consumer_ab0eabf9-8af1-43da-8466-09527850170e, groupId=hyperdrive_group_10b826ce-21fa-44a8-a9b9-25f544d1806c] Give up sending metadata request since no node is available

Unexpectedly, the corresponding kafka config log output showed

    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT

Obviously, security.protocol should have been SASL_SSL, explaining the error message.

Analysis

Indeed, the application of the kafka extra config in the deduplicator was implemented incorrectly. Here, https://github.com/AbsaOSS/hyperdrive/blob/v4.4.0/ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/TestDeduplicateKafkaSinkTransformerObject.scala#L48-L56 the test expects the extra kafka options to be

      "kafka.security.protocol" -> "SASL_PLAINTEXT",
      "failOnDataLoss" -> "false"

However, this is wrong. First, the kafka. prefix is only applicable if passed to spark, which distinguishes the kafka related config and then internally strips this prefix when configuring the consumer. Here, the consumer is directly configured, so it should be just "security.protocol" instead of "kafka.security.protocol". Second, failOnDataLoss is irrelevant. It is a Spark property, not a Kafka property, hence it makes no sense to configure it on the Kafka consumer.

In addition to fixing the kafka properties, the extra properties for the schema registry (basic.auth.user.info and basic.auth.credentials.source) should be configured for the KafkaAvroDeserializer