Closed jwarlander closed 3 years ago
Hey @jwarlander,
Nice find! Yes, indeed it will waiting infinitely if there is no message.
I think good solution would be to set the lastRecordOffset to -1 to indicate no records were available, and update loop condition to (properties.isConsumeAllOffsetsEnabled() && recordOffset >= 0 && recordOffset < partitionEndOffset)
. This should also cover empty topic case. What do you think?
I am going to send a fix soon.
Sounds like a good plan! Do you think you'll be able to add integration tests for both of these conditions? Eg. "already caught up" and "empty topic".
Yes, definitely. I will ask your review on again.
If I run an import with CONSUME_ALL_OFFSETS = 'true' on a topic where I'm already caught up, the polling loop will keep spinning until a new message appears on the topic, as the
lastRecordOffset
is initialized to 0 instead of the currentpartitionEndOffset
-- and then, if no records appeared during the poll, 0 would be compared to thepartitionEndOffset
inshouldContinue()
:https://github.com/exasol/kafka-connector-extension/blob/7fc77ad6b3f96cbae4993b52b5f386e30f7b76af/src/main/scala/com/exasol/cloudetl/kafka/consumer/KafkaRecordConsumer.scala#L70-L95
The fix would probably be to just initialize
lastRecordOffset
to the value ofpartitionEndOffset
before the polling loop.However, not sure what happens if the topic partition is actually empty? Would the Kafka consumer return the end offset as 0? In this case
partitionEndOffset
would become -1,lastRecordOffset
would also be initialized to -1, and all is fine.