rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
169 stars 20 forks source link

consumer auto commit flush interval reset by processed messages #337

Closed Darthmineboy closed 3 months ago

Darthmineboy commented 3 months ago

Is your feature request related to a problem? Please describe.

The stream consumer has an auto commit strategy that can be enabled, which defaults to commit the offset every 10.000 messages or 5 seconds (flush interval). The timer for the flush interval is however reset every time a message is processed. The problem is that if messages are constantly received before the flush interval, then the flush interval is never triggered.

If we have one message being processed every second, then the default flush interval of 5 seconds is never hit and the offset is only committed when closing the consumer gracefully or once 10.000 messages have been processed. If the connection to the broker is lost or the consumer was not gracefully closed, then this may result in a lot of messages being replayed, up to 10.000 by default.

Describe the solution you'd like

The offset should be committed at the flush interval regardless of incoming messages.

Describe alternatives you've considered

For low volume systems the messageCountBeforeStorage can be drastically lowered, however under most unfortunate circumstances the offset may not be committed for the duration of messageCountBeforeStorage * flushInterval. If I lower messageCountBeforeStorage to 100 and keep flushInterval at 5 seconds, then the offset may not be committed for up to 500 seconds. This is too long and bad for performance to commit the offset so often.

Additional context

No response

Darthmineboy commented 3 months ago

I will try to fix this

Gsantomaggio commented 3 months ago

Thank you @Darthmineboy

Gsantomaggio commented 3 months ago

The timer for the flush interval is however reset every time a message is processed.

oh! wow this bug deserves a meme store

Darthmineboy commented 3 months ago

The timer for the flush interval is however reset every time a message is processed.

oh! wow this bug deserves a meme store

haha that's great