noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

Mirror crashes when policy is applied to queue with Unacknowledged messages #79

Closed luos closed 2 years ago

luos commented 2 years ago

Hi,

Recently we discovered a crash which happens when a mirroring policy is applied to a queue while it has Unacknowledged messages:

rmq3.monitored.host | ** Last message in was {'$gen_cast',{sync_start,#Ref<52878.2029881567.138674179.105547>,<52878.25798.0>}}
rmq3.monitored.host | ** When Server state == {state,{amqqueue,{resource,<<"/">>,queue,<<"quue">>},true,false,none,[{<<"x-queue-type">>,longstr,<<"classic">>}],<52878.1916.0>,[<0.19810.0>,<52891.19807.0>,<52879.19830.0>],[],['rabbit@rmq3.monitored.host','rabbit@rmq4.monitored.host','rabbit@rmq2.monitored.host'],[{vhost,<<"/">>},{name,<<"mirror">>},{pattern,<<".*">>},{'apply-to',<<"queues">>},{definition,[{<<"ha-mode">>,<<"all">>},{<<"ha-sync-batch-size">>,256},{<<"ha-sync-mode">>,<<"automatic">>}]},{priority,432}],undefined,[{<52879.19833.0>,<52879.19830.0>},{<52891.19810.0>,<52891.19807.0>},{<0.19813.0>,<0.19810.0>},{<52878.4524.0>,<52878.1916.0>}],[],live,1,[],<<"/">>,#{user => <<"guest">>},rabbit_classic_queue,#{}},<0.19822.0>,'Elixir.RabbitMQMessageDeduplication.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"quue">>},true,false,none,[{<<"x-queue-type">>,longstr,<<"classic">>}],<52878.1916.0>,[<0.19810.0>,<52891.19807.0>,<52879.19830.0>],[],['rabbit@rmq3.monitored.host','rabbit@rmq4.monitored.host','rabbit@rmq2.monitored.host'],[{vhost,<<"/">>},{name,<<"mirror">>},{pattern,<<".*">>},{'apply-to',<<"queues">>},{definition,[{<<"ha-mode">>,<<"all">>},{<<"ha-sync-batch-size">>,256},{<<"ha-sync-mode">>,<<"automatic">>}]},{priority,432}],undefined,[{<52879.19833.0>,<52879.19830.0>},{<52891.19810.0>,<52891.19807.0>},{<0.19813.0>,<0.19810.0>},{<52878.4524.0>,<52878.1916.0>}],[],live,1,[],<<"/">>,#{user => <<"guest">>},rabbit_classic_queue,#{}},{passthrough,rabbit_variable_queue,{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},0,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@rmq3.monitored.host/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/5KWL19MWJ9UJH8XJBI32W7QDJ",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.2.75160458>,#Fun<rabbit_variable_queue.3.75160458>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"quue">>}},{{client_msstate,...},...},...}}},...}
rmq3.monitored.host | ** Reason for termination ==
rmq3.monitored.host | ** {undef,[{rabbit_priority_queue,info,[[{{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"quue">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<>>]},<<10,138,64,159,205,118,42,135,18,209,225,51,148,176,141,215>>,false},{message_properties,undefined,false,0}},{{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"quue">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<>>]},<<193,135,208,163,13,98,182,166,56,243,135,199,205,130,224,103>>,false},{message_properties,undefined,false,0}},{{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"quue">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<>>]},<<191,39,248,235,205,4,107,71,140,236,8,135,116,65,23,95>>,false},{message_properties,undefined,false,0}},{{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"quue">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<>>]},<<106,250,253,2,168,125,130,32,245,195,222,20,238,164,179,225>>,false},{message_properties,undefined,false,0}},{{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"quue">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<>>]},<<183,172,94,32,158,41,181,17,235,175,2,210,99,205,2,242>>,false},{message_properties,undefined,false,0}},{{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"quue">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,...},...},...},...},...],...],...},...]}

The reason:

{undef,[{rabbit_priority_queue,info,

The result is that no queue mirrors are started.

The reproduction is the following:

  1. Create a classic queue without mirroring policy
  2. Publish a few messages (in my case 10 000)
  3. Consume the messages with prefetch=2500 but do not acknowledge them
  4. Apply the mirroring policy ha-mode=all, ha-sync-mode=automatic
  5. Observe the crash in the logs, no mirrors are started for the queue

The reason this is happening is that after a queue mirror is synchronised it will call zip_msgs_and_acks if there are messages in the Unacknowledged state.

https://github.com/rabbitmq/rabbitmq-server/blob/9aa4ed611cb33e810afc12c5160a7258c4473dc0/deps/rabbit/src/rabbit_mirror_queue_sync.erl#L465-L468

Please find the fix here.

I could only test it from a branch out of 0.5.0 and RabbitMQ 3.8.26, because if I made a release out of 0.5.2 was crashing with out of memory. Probably there is some incompatibility between 3.8.26 and 0.5.2?

noxdafox commented 2 years ago

Hi,

thanks for your contribution! Mirrored queues are not supported by the plugin. Are you aware of it? What did happen after you enabled the mirroring policy?

luos commented 2 years ago

Hi,

Actually only the exchange part is in use, but because of the backing queue swap, the code is actually running. I was not aware that they are not supported.

Nothing happened, just the mirrors crashed. Other queues worked just fine.

noxdafox commented 2 years ago

Oh ok! Then nevermind.

Right now, queue-level deduplication only works with standard and priority queues. Mirrored, Quorum queues and streams are not supported.

This is not a problem if you use exchange-level deduplication. I will try to make a release this WE.

luos commented 2 years ago

Thank you for the quick resolution! Thankfully, we are not using this plugin for the queues. :)

In any case, the workaround is just to apply the mirroring policy before the queues have any traffic, otherwise there may be already Unacked messages. Apart from this, everything works OK.

noxdafox commented 2 years ago

Version 0.5.3 released with fix. Thanks for your contribution!

For the record, queue-level deduplication works Ok, it just supports traditional and priority queues due to lack of interfaces from the broker side for new queues/streams. I am planning to start a conversation in regards with RMQ devs but, as it will require a fair amount of time, I had not the chance to do it yet.