exasol / kafka-connector-extension

Exasol Kafka Extension for accessing Apache Kafka
MIT License
4 stars 7 forks source link

Kafka consumer fails to break polling loop when it has to reset the offset, but can't read any new records #41

Closed jwarlander closed 3 years ago

jwarlander commented 3 years ago

If we try to import from a Kafka topic, using CONSUME_ALL_OFFSETS = 'true', that has had messages in it but that were all expired through eg. retention time, we will be stuck in a loop (at least until some new messages appear).

This may sound reasonable (waiting for messages), but in our case we're trying to set up a scheduled import from a pre-production topic, which may or may not get new messages on any given day (or week). We just want the import to bail out after the first poll attempt if there's nothing to be found.

Here's what happens, more or less:

  1. We start with current offset = 0 (start offset), as table in Exasol is empty
  2. We do poll, consumer returns empty records
  3. And meanwhile consumer resets offset to the topic start offset (which is = end offset in this case due to expired records)
  4. Since empty records returned, we compare our current offset with -1 and 0 (start offset) and stay again on 0
  5. And loop back to (2)
morazow commented 3 years ago

Hey @jwarlander,

Thanks for the update! I see how this can be problem when resetting offsets. As discussed I am going to try to setup a test case for offset reset strategy to cover the record expiry.

jwarlander commented 3 years ago

Great!

The way to fix it, once you've got a failing test, probably looks something like this:

diff --git a/src/main/scala/com/exasol/cloudetl/kafka/consumer/KafkaRecordConsumer.scala b/src/main/scala/com/exasol/cloudetl/kafka/consumer/KafkaRecordConsumer.scala
index ff6cf04..67dedff 100644
--- a/src/main/scala/com/exasol/cloudetl/kafka/consumer/KafkaRecordConsumer.scala
+++ b/src/main/scala/com/exasol/cloudetl/kafka/consumer/KafkaRecordConsumer.scala
@@ -49,7 +49,10 @@ class KafkaRecordConsumer(
         val records = consumer.poll(timeout)
         recordCount = records.count()
         totalRecordCount += recordCount
-        recordOffset = math.max(recordOffset, emitRecords(iterator, records))
+        recordOffset = emitRecords(iterator, records) match {
+          case -1     => getPartitionCurrentOffset()
+          case offset => offset
+        }
         logger.info(
           s"Polled '$recordCount' records, total '$totalRecordCount' records for partition " +
             s"'$partitionId' in node '$nodeId' and vm '$vmId'."
@@ -114,6 +117,13 @@ class KafkaRecordConsumer(
       .isConsumeAllOffsetsEnabled() && recordOffset < partitionEndOffset) ||
       (recordCount >= minRecordsPerRun && totalRecordCount < maxRecordsPerRun)

+  private[this] def getPartitionCurrentOffset(): Long = {
+    val topicPartition = new TopicPartition(topic, partitionId)
+    val currentOffset = consumer.position(topicPartition) - 1
+    logger.info(s"The current record offset for partition '$partitionId' is '$currentOffset'.")
+    currentOffset
+  }
+
   private[this] def getPartitionEndOffset(): Long = {
     val topicPartition = new TopicPartition(topic, partitionId)
     val partitionEndOffsets = consumer.endOffsets(Arrays.asList(topicPartition))
morazow commented 3 years ago

Fixed in #42