AbsaOSS / hyperdrive

Extensible streaming ingestion pipeline on top of Apache Spark
Apache License 2.0
44 stars 13 forks source link

Set starting offsets earliest when checkpoint folder is empty #203

Closed kevinwallimann closed 3 years ago

kevinwallimann commented 3 years ago

Situation Currently, startingOffsets is set to earliest by KafkaStreamReader if the checkpoint folder doesn't exist, so the query ingests from the beginning at the first microbatch. However, earliest is not set if the already folder exists, but is empty. This is inconsistent behaviour and should be fixed. https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/reader/kafka/KafkaStreamReader.scala#L98

Desired outcome If the checkpoint folder exists, but is empty, startingOffsets should be set to earliest