rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
11.86k stars 3.9k forks source link

Implement consumer timeouts in the quorum queue #11603

Open kjnilsson opened 6 days ago

kjnilsson commented 6 days ago

Is your feature request related to a problem? Please describe.

Consumer timeouts in the queue is likely to be more reliable than implemented in the channel and allows for greater flexibility and negates the need to implement for every protocol

Describe the solution you'd like

Consumer timeouts design

rabbit_fifo will record the checkout time for each message that is assigned to a consumer. This timestamp will be used to detect messages that have been kept longer than the consumer timeout configured. We do not want to use the RA timeout effect as to do so we'd need to either do expensive and frequent calculations over the full set of checked out messages or keep lots of timers (one per message).

Instead we'd schedule an aux event every minute which will do a scan over the checked set and if any consumer has messages with expired timeouts and if so commit a new command eval_consumer_timeouts to do this work and return messages. This means that will be evaluated some time after the expiry but no more than ~60s

Consumers that let any of their message locks expire should not be assigned any further messages until they send some kind of command (settlement, lock renewal etc) to show that they are live and responding. They should be treated as "suspected" until it is known that they can reply.

This mean we can probably get rid of the (undesirable but necessary with mnesia) behaviour where when the queue received a DOWN notification with the reason noconnection it would immediately return all messages. With mnesia this was reasonably correct. If there were cluster disconnected (even shortly) typically the rabbit application would restart itself in mysterious ways with the ultimate result that channels were terminated. With khepri this will no longer be the case and the cluster should be able to function normally even if there are short term cluster disconnections.

So going forward when a QQ receives a noconnection for a consumer process it will only mark it as disconnected (so that new messages are not assigned until it comes back) and let the consumer timeout handle the message return in due course. This means it should be able to handle the case of short term disconnections / reconnections in the cluster without messages being returned unnecessarily.

If the consumer is already in cancelled state (cancelled but with pending messages) then all pending messages will be returned and the consumer will be removed. This is the safest option there are potentially faulty clients in the wild that will never ack pending messages after a cancellation.

This also means that locks should be relatively short (max 5 mins but ideally lower).

Single Active Consumer consumers that let their messages time out will have all pending messages returned, as well as being replaced. This is to ensure ordering invariants with SAC.

Protocol impl:

AMQP can provide a management extension command to renew locks for a messages. AMQP legacy can configure an auto renew function (that is done by the channel process / queue type) where it will auto renew the lock n number of times on behalf of the client. This is because the legacy protocol (and other protocols such as MQTT / STOMP) don't have any options for implementing lock renewal.

For AMQP legacy we can default to renew locks to the total of the current consumer_timeout configuration.

Q: Can we do lock renewal without going through Raft log?

When a messages reaches timeout the queue will notify the consumer process with a new Ra event {message_timeout, consumer_tag(), [MsgIds]} - how this is handled may depend on the protocol implementation. AMQP can emit the released or modified outcome. Other protocols don't have the same mechanism so for AMQP legacy it is probably best to terminate the channel or initiate a broker side consumer cancellation.

Describe alternatives you've considered

.

Additional context

.