Closed zambrovski closed 2 years ago
I believe that the AsyncFetcher
should not just accept that the running fetch task silently closes, but should propagate the error if the fetch task is closed.
Hi Simon, thanks a lot for creating the issue. Your analysis of the problem seems correct. I'm not sure yet what would be the best way to fix it. It would be nice if it would restart the KafkaConsumer on certain errors. I'm not sure if that happens when we propagate the error. In any case, we need to be careful to clean up the earlier work.
I'll dive deeper into this when I have some time, hopefully in about a week.
Fixed, we'll probably release 4.5.4
soon which contains the fix.
Basic information
Steps to reproduce
We are using the Kafka Extension with Azure Event Hubs and ran in the following error. For some Azure reasons, we received a
TopicAuthorizationException
which caused theFetcher
to quit theFetchEventTask
. After that the processor is NOT going to the error state and the newStreamabelKafkaMessageSource
is not re-created. As a result, the message processor is idling around thinking that there are no messages (since there is no one who delivers messages into the message buffer) and no error processing is started.We analyzed the code a little and it seems that if any exception is thrown on
consumer.poll()
inside theFetchEventTask
, the fetch task is just closed. We could not find any logic re-starting the fetching again.Expected behaviour
Processor goes into an error mode and tries to recover (in our case we would detect the error and it would even be able to reconnect at some point).
Actual behaviour
On error the processor is not detecting an error in fetch task.