AbsaOSS / hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Apache License 2.0
44 stars 13 forks source link

Add deduplication logic for kafka sink in case of spark task retries #240

Open kevinwallimann opened 3 years ago

kevinwallimann commented 3 years ago

Problem description Spark does not provide an exactly-once behaviour for the Kafka sink, but only at-least-once, and will probably never do so (https://github.com/apache/spark/pull/25618). Under certain assumptions (no concurrent producers, only 1 destination topic, not too big micro-batches, messages don't change between retries), idempotency can still be achieved. See #177.

The DeduplicateKafkaSinkTransformer only addresses retries on an application level. However, retries (and therefore duplicates) may happen on lower levels as well, namely:

Solution It might be possible to implement a org.apache.kafka.clients.producer.ProducerInterceptor which would deduplicate retries messages in a similar way as the DeduplicateKafkaSinkTransformer. This would capture duplicates in a scenario where the retry happens on a different executor than the original executor (memory exceeded scenario). In addition, the interceptor should keep a lookup set in memory to capture duplicates that occurred due to retries on the same executor (TopicAuthorizationException scenario). A reattempt could be recognized through the attempt nr of the Spark task. If messages cannot be dropped through the ProducerInterceptor, they could at least be redirected to a trash-topic