NimzyMaina / flask_kafka

Flask Kafka consumer full implementation example. Ideal for a microservices architecture.
43 stars 7 forks source link

Exception handling for flask-kafka #3

Closed ManashJKonwar closed 3 years ago

ManashJKonwar commented 3 years ago

Hello @NimzyMaina Thanks for creating this awesome module. However, after consistently using it, I realized that during kafka led errors such as CommitFailedError, the consumer stops working and gets hung up and also stops consuming next messages in the queue. I want to log this exception for certain messages and then move steadily to consume the next message. Can you please guide me on this? CommitFailedError_Author

NimzyMaina commented 3 years ago

@ManashJKonwar The consumer is designed to stop consuming after a critical failure occurs. But from the error log you have posted, I can see that the error comes from the underlying library kafka-python. From the error log, it says you should play around with max_poll_records or max_poll_interval_ms. You can pass the same arguments to FlaskKafka, it will just pass them over to kafka-python

From the kafka-python docs Kafka Consumer, the default value for max_poll_records is 500 & max_poll_interval_ms is 300000 (5 mins). This is quite large especially if your consumers are doing a lot of work. I would reduce it to a smaller value. Feel free to experiment with what works for you.

bus = FlaskKafka(INTERRUPT_EVENT,
                 bootstrap_servers=",".join(["localhost:9092"]),
                 group_id="consumer-grp-id",
                 max_poll_records=10
                 )

Please let me know if this solves your problem.