rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
12.15k stars 3.91k forks source link

Quorum Queues with reject-publish max-length commit transactions successfully even if the messages are dropped #8966

Open luos opened 1 year ago

luos commented 1 year ago

Describe the bug

Hi,

We were testing Quorum Queues with transactions, max-length, reject-publish. We've seen two unexpected behaviours. We think it is present on all versions.

Reproduction steps

  1. Create a Quorum Queue with the following arguments:
    • max-length = 5
    • x-overflow = reject-publish
  2. In a transaction, publish 3 messages and commit. These messages will be confirmed by RabbitMQ and accepted by the queue.
  3. In a transaction, publish 6 messages and commit. These messsages will be confirmed by RabbitMQ and accepted by the queue. The queue will have 9 messages, even though the limit is 5.
  4. In a transaction, publish another 6 messages. These messages will be confirmed by RabbitMQ, but not accepted by the queue. The messages will be silently dropped.

Expected behavior

Additional context

The probably cause of the second issue is the unconditional overwrite of the tx state to committing after handle_queue_actions:

https://github.com/rabbitmq/rabbitmq-server/blob/cee35f7cd2369f1063f421c704c74476d7792462/deps/rabbit/src/rabbit_channel.erl#L1717-L1719

Even though handle_queue_actions can set the tx state to be failed.

Reproduction as code:

{ok, Connection} =
    amqp_connection:start(#amqp_params_network{host = "localhost",
                                                username = "guest",
                                                password = "guest"}),
{ok, Channel} = amqp_connection:open_channel(Connection),
{ok, Connection, Channel},

% We are publishing to a queue with max-length = 5 and reject-publish

amqp_channel:call(Channel, #'queue.declare'{queue = <<"transaction-qq">>, durable = true, arguments = [
        {<<"x-queue-type">>, longstr, <<"quorum">>},
        {<<"x-max-length">>, long, 5},
        {<<"x-overflow">>, longstr, <<"reject-publish">>},
        {<<"x-delivery-limit">>, long, 100}
    ]}),

% These messages will fit into the max-length limit

#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
                    #'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
                    #amqp_msg{props = #'P_basic'{},
                                payload = list_to_binary(io_lib:format("this fits into the queue max-length ~p", [N]))})
    || N <- lists:seq(1, 3)],

amqp_channel:call(Chan, #'tx.commit'{}),

% queue length is now 3. We publish another 6 messages, which will overflow the queue and will have 9 messages (which is larger than max-length)

#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
                    #'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
                    #amqp_msg{props = #'P_basic'{},
                                payload = list_to_binary(io_lib:format("this does not but it will still go to the queue ~p", [N]))})
    || N <- lists:seq(1, 6)],

amqp_channel:call(Chan, #'tx.commit'{}),
% queue length is now 9.
ok,

% We publish these messages which will be confirmed by RabbitMQ
#'tx.select_ok'{} = amqp_channel:call(Channel, #'tx.select'{}),
[amqp_channel:call(Chan,
                    #'basic.publish'{exchange = <<"">>, routing_key = <<"transaction-qq">>},
                    #amqp_msg{props = #'P_basic'{},
                                payload = list_to_binary(io_lib:format("this will be lost but it will be confirmed ~p", [N]))})
    || N <- lists:seq(1, 6)],
% we receive tx.commit_ok
    #'tx.commit_ok'{} = amqp_channel:call(Chan, #'tx.commit'{}).
michaelklishin commented 1 year ago

Thanks for providing an executable way to reproduce and digging in.

The AMQP 0-9-1 spec does not cover additions such as x-overflow. When set to reject-publish explicitly, I agree with your analysis.

kjnilsson commented 10 months ago

reject-publish with quorum queues is documented to be subject to overflows. That said tx should handle case 2 better.