siddhi-io / siddhi-io-kafka

Extension that can be used to receive events from a Kafka cluster and to publish events to a Kafka cluster
https://siddhi-io.github.io/siddhi-io-kafka/
Apache License 2.0
18 stars 50 forks source link

[KafkaSink] Send events to error store when the broker is not available #143

Closed dilini-muthumala closed 3 years ago

dilini-muthumala commented 3 years ago

Description: Below analysis done by Janith Weeransighe (@janithcmw)

A user has raised concerns about implementing a retry logic for the siddhi Kafka sink when the broker is not available.

When analyzing the requirement based on the current implementation, it was observed that, if the Kafka broker/s is not available for a certain time(more than the Kafka level retry times) the particular event will be dropped even from the siddhi level.

Also as per the current behaviour of the error handling it seems that the siddhi level error handling cannot be used.

In further analysis, it was observed that there will be a timeout exception as follows from the Kafka level if the brokers are not available.

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kafka_result_topic-0: 20036 ms has passed since batch creation plus linger time

Then by changing the codebase of the Kafka extension, it was possible to implement the siddhi error handling(tried the fault stream) by catching the timeout exception and throwing the siddhi "ConnectionUnavailableException". This was a success only for synchronous mode and to enable the synchronous mode the following parameter was configured in the siddhi sink level.

is.synchronous='true'

For the asynchronous, this cannot be implemented because the publishing process was handled via a separate thread which is triggered by the siddhi Sink publisher level. But even in the asynchronous mode, was able to trap the same timeout exception as well.

Please share your thoughts on this.

Affected Product Version:

OS, DB, other environment details and versions:
N/A

Steps to reproduce: N/A