rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
11.84k stars 3.9k forks source link

Fix message IDs settlement order #11560

Closed ansd closed 6 days ago

ansd commented 1 week ago

What?

This commit fixes issues that were present only on main branch and were introduced by #9022.

  1. Classic queues (specifically rabbit_queue_consumers:subtract_acks/3) expect message IDs to be (n)acked in the order as they were delivered to the channel / session proc. Hence, the lists:usort(MsgIds0) in rabbit_classic_queue:settle/5 was wrong causing not all messages to be acked adding a regression to also AMQP 0.9.1.
  2. The order in which the session proc requeues or rejects multiple message IDs at once is important. For example, if the client sends a DISPOSITION with first=3 and last=5, the message IDs corresponding to delivery IDs 3,4,5 must be requeued or rejected in exactly that order. For example, quorum queues use this order of message IDs in https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_fifo.erl#L226-L234 to dead letter in that order.

    How?

The session proc will settle (internal) message IDs to queues in ascending (AMQP) delivery ID order, i.e. in the order messages were sent to the client and in the order messages were settled by the client.

This commit chooses to keep the session's outgoing_unsettled_map map data structure.

An alternative would have been to use a queue or lqueue for the outgoing_unsettled_map as done in

Whether a queue (as done by rabbit_channel) or a map (as done by rabbit_amqp_session) performs better depends on the pattern how clients ack messages.

A queue will likely perform good enough because usually the oldest delivered messages will be acked first. However, given that there can be many different consumers on an AQMP 0.9.1 channel or AMQP 1.0 session, this commit favours a map because it will likely generate less garbage and is very efficient when for example a single new message (or few new messages) gets acked while many (older) messages are still checked out by the session (but by possibly different AMQP 1.0 receivers).