faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Does Faust Streaming Consumer retry the offset commit if it fails? #611

Open arcanjo45 opened 8 months ago

arcanjo45 commented 8 months ago

Checklist

Steps to reproduce

N/A

Expected behavior

After investigating my consumer logs I found out the following error:

[ERROR] OffsetCommit failed for group gaming-processor on partition TopicPartition(topic='my_topic', partition=3) with offset OffsetAndMetadata(offset=29426192, metadata=''): UnknownTopicOrPartitionError

This error occurred when my kafka broker leader pod on GKE was rotating. I think this error is expected while rebalancing is occurring my question is if the application is going to retry to commit this offset again or will it only commit the next offset in the next commit wave? Additionally I would like to know if this can cause the consumer to re-read older messages that were already consumed or to lose any message that wasn't yet consumed.

Actual behavior

N/A

Full traceback

N/A

Versions

wbarnha commented 7 months ago

It varies, it seems. For example, that of a worker, auto commits are disabled: https://github.com/faust-streaming/faust/blob/master/faust/transport/drivers/aiokafka.py#L525-L552 but for a client-only consumer, it's enabled: https://github.com/faust-streaming/faust/blob/master/faust/transport/drivers/aiokafka.py#L566.

Then the logic for marking message as ack'ed is in https://github.com/faust-streaming/faust/blob/master/faust/transport/consumer.py#L809-L829. The logic for how commits are handled is very unpleasant, and to be honest, I generate my own AIOKafkaConsumer for when I want to be explicit on how consumed messages are handled when they fail their internal transactions with the logic followed by my consumer. There really needs to be better documentation for this...

I'll get back to you, I need to re-read the internals of this, myself.