Please check if the PR fulfills these requirements
[x] The commit messages are descriptive
[ ] Tests for the changes have been added (for bug fixes/features)
[ ] Docs have been added/updated (for bug fixes/features)
[x] An issue has been created for the pull requests. Some issues might require the previous discussion.
What kind of change does this PR introduce? (Bug fix, feature, docs update, ...)
Bug fix #552
What is the current behavior? (You can also link to an open issue here)
When using Kafka Backend, after the message puller can retrieve some messages ( records.count() > 0 ), it goes to callback.initialLoadFinish(), even in case there are more available messages in the topic.
What is the new behavior (if this is a feature change)?
In case there are messages (records.count() > 0), the application will reset the counter (times = 0) and will try to run the iteration again to fetch more messages.
Only in the case that no messages are available to fetch ( records.count() == 0 ), the application will do several retries (times >= config.getKafkaBackendConsumerRetries()) and then continue to callback.initialLoadFinish();
Does this PR introduce a breaking change? (What changes might users need to make in their application due to this PR?)
No breaking changes. However, it will take a little more time (depending on KafkaBackendConsumerRetries config) to ensure that the application retrieves all available messages from the topic in case of using Kafka Backend.
… continuing to build the topology plan
Please check if the PR fulfills these requirements
[x] The commit messages are descriptive
[ ] Tests for the changes have been added (for bug fixes/features)
[ ] Docs have been added/updated (for bug fixes/features)
[x] An issue has been created for the pull requests. Some issues might require the previous discussion.
What kind of change does this PR introduce? (Bug fix, feature, docs update, ...)
Bug fix #552
When using Kafka Backend, after the message puller can retrieve some messages (
records.count() > 0
), it goes tocallback.initialLoadFinish()
, even in case there are more available messages in the topic.In case there are messages (
records.count() > 0
), the application will reset the counter (times = 0
) and will try to run the iteration again to fetch more messages. Only in the case that no messages are available to fetch (records.count() == 0
), the application will do several retries (times >= config.getKafkaBackendConsumerRetries()
) and then continue tocallback.initialLoadFinish();
Does this PR introduce a breaking change? (What changes might users need to make in their application due to this PR?) No breaking changes. However, it will take a little more time (depending on
KafkaBackendConsumerRetries
config) to ensure that the application retrieves all available messages from the topic in case of using Kafka Backend.Other information: