Closed kevinwallimann closed 3 years ago
Problem description Currently, the DeduplicateKafkaSinkTransformer only extracts the fields for deduplication after all records have been consumed.
DeduplicateKafkaSinkTransformer
https://github.com/AbsaOSS/hyperdrive/blob/bab72ccce509ff69a5defd941e1d4a99e660a8a0/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformer.scala#L83-L85
and
https://github.com/AbsaOSS/hyperdrive/blob/bab72ccce509ff69a5defd941e1d4a99e660a8a0/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformer.scala#L90-L94
Obviously, this can quickly lead to unnecessary out-of-memory errors.
Solution The field extraction logic should be invoked right after polling, such that memory can be freed immediately after one round of polling. https://github.com/AbsaOSS/hyperdrive/blob/bab72ccce509ff69a5defd941e1d4a99e660a8a0/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/KafkaUtil.scala#L80-L81
Problem description Currently, the
DeduplicateKafkaSinkTransformer
only extracts the fields for deduplication after all records have been consumed.https://github.com/AbsaOSS/hyperdrive/blob/bab72ccce509ff69a5defd941e1d4a99e660a8a0/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformer.scala#L83-L85
and
https://github.com/AbsaOSS/hyperdrive/blob/bab72ccce509ff69a5defd941e1d4a99e660a8a0/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/deduplicate/kafka/DeduplicateKafkaSinkTransformer.scala#L90-L94
Obviously, this can quickly lead to unnecessary out-of-memory errors.
Solution The field extraction logic should be invoked right after polling, such that memory can be freed immediately after one round of polling. https://github.com/AbsaOSS/hyperdrive/blob/bab72ccce509ff69a5defd941e1d4a99e660a8a0/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/KafkaUtil.scala#L80-L81