AbsaOSS / hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Apache License 2.0
44 stars 13 forks source link

DeduplicateKafkaSinkTransformer should work with fields from payload #239

Open kevinwallimann opened 3 years ago

kevinwallimann commented 3 years ago

When changing https://github.com/AbsaOSS/hyperdrive/blob/develop/driver/src/test/scala/za/co/absa/hyperdrive/driver/drivers/KafkaToKafkaDeduplicationAfterRetryDockerTest.scala#L73-L74

to

      "transformer.[kafka.deduplicator].source.id.columns" -> "value.record_id",
      "transformer.[kafka.deduplicator].destination.id.columns" -> "value.record_id"

the test fails with the following exception:

org.apache.spark.SparkException: Malformed records are detected in record parsing.
Caused by: org.apache.avro.AvroTypeException: Found null, expecting string

Even if this behavior is documented, it may never have been working.