zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
343 stars 140 forks source link

Close consumer when it cannot connect to the broker #175

Open egast opened 4 years ago

egast commented 4 years ago

Add option to close the consumer when it is unable to connect to the broker. This should be configurable i.e. after some timeout and/or after x amount of retries.

saraiva132 commented 4 years ago

I would very much love to see this! It is extremely annoying to have a consumer hang when it cannot connect to a broker.

WDYT @iravid ?

svroonland commented 4 years ago

As a workaround, have you considered a .timeout on the stream?

svroonland commented 4 years ago

@egast @saraiva132 Are you by any chance familiar which how to get connection status or something from the org.apache.kafka.clients.consumer.Consumer or some settings that control the behavior on connection timeout?

egast commented 4 years ago

@svroonland I don't know if you can get the Kafka connection status from a KafkaConsumer. I think the connection retry behavior is done by the consumer client. I know that Akka Kafka Streams does offer the "close consumer if it cannot connect to Kafka", so I took a quick look at how they do it and from what I understand they just periodically try to get the topic metadata to see if Kafka is still available. Maybe something similar can be implemented for zio-kafka.

svroonland commented 8 months ago

We could perhaps do something with the lack of a partitions assigned event from the RebalanceListener.

erikvanoosten commented 8 months ago

We could perhaps do something with the lack of a partitions assigned event from the RebalanceListener.

This would exclude use case that have extra instances standing by. This includes redeployments where you first start new instances, and then stop the old instances.

svroonland commented 8 months ago

Not sure, but do those instances get an empty list of assigned partitions perhaps?

erikvanoosten commented 8 months ago

Not sure, but do those instances get an empty list of assigned partitions perhaps?

Not sure either. It might be that the rebalance listener would not be invoked at all.

svroonland commented 1 month ago

I like of idea of building a heartbeat mechanism using the listTopics method, as @egast suggested.

I don't think it's always desirable to fail the stream when the consumer cannot connect for some period. In long running applications you may just want to wait until the broker is back online. But it could also signal that you have a wrong configration and the connection cannot be established because of that, in which case it would be good to fail fast. There's also errors that can occur for a specific subscription, i.e. when not authorized for some topic. I don't know if we can detect those and fail the stream, while potentially keeping streams for other subscriptions running.

In any case we could have a diagnostic event Connected / Disconnected, which would allow the user to decide how to handle connection issues. A new metric 'connection state', like we have for subscription state would also allow for monitoring and alerting.