lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.35k stars 175 forks source link

Kafka broker outage takes down processor even though there are other active brokers #457

Closed akshatraika-moment closed 1 month ago

akshatraika-moment commented 1 month ago

Hi! I am seeing an issue with the library with handling broker outages & I think I am doing something obviously wrong here.

I have kafka topic with replication factor of 3, min.insync.replicas as 2 & 32 partitions in a kafka cluster of 3 brokers running in AWS MSK. I have a microservice built with goka to build tables ingesting data from that 32 partition stream. The table topic is also set up using the same configuration of replication factor of 3, min.insync.replicas as 2 & 32 partitions.

As one would expect, once in a while MSK has a broker outage when they do emergency rolling updates to the cluster. When this happens, 1 of the 3 brokers becomes unavailable. In this situation, I would expect any kafka client to be able to still continue processing given the topic configuration. But, I see the following errors & goka stops processing all together:

error in errgroup Wait: error in processor runner: 1 error occurred:
    * error consuming from group consumer: 1 error occurred:
    * error setting up (partition=0): Setup failed. Cannot start processor for partition 4: 1 error occurred:
    * Error getting newest offset for topic/partition my-service-table/4: kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes

This ends up killing the service & exiting.

I also see tons of these logs but these dont lead to shut down:

* kafka: error while consuming my-service-table/19: EOF

2024/07/25 21:48:30 [View my-service-table > PartTable-19] Error while starting up: 1 error occurred:

2024/07/25 21:48:30 [View my-service-table > PartTable-28] Will retry in 0 seconds (retried 1 times so far)

2024/07/25 21:48:26 [View my-service-table > PartTable-31] Error while starting up: Error getting oldest offset for topic/partition my-service-table/31: write tcp some_ip_address>some_ip_addres_of_broker:9096: write: broken pipe

2024/07/25 21:48:26 [Processor my-service] error during execution of consumer group: kafka: error while consuming my-topic/17: EOF

We use librdkafka (confluent-kafka-go) in all of our other services where we handle this pretty smoothly but just continuing to process & only error log the kafka broker errors. I would imagine sarama + goka to be able to do the same but I am not getting a clear answer online.

I would really appreciate some help on this!

Kafka broker version: 3.2.0

goka version: v1.1.12

sarama version: v1.43.2

akshatraika-moment commented 1 month ago

This was fixed by updating the sarama Kafak version to 3.2. Unlike librdkafka, in sarama, you need to set the version of your kafka brokers yourself to use the correct library version :")