noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

Queue (**without deduplication**) crashing when attempting to use dead letter exchange #26

Closed damien-roche closed 5 years ago

damien-roche commented 5 years ago

This is with 0.3.4-erl-20.

I have an exchange which uses deduplication, but I also have a few exchanges without it.

I setup a policy for a queue which is bound to a normal exchange. Such as:

$ sudo rabbitmqctl set_policy pol "results" '{"dead-letter-exchange":"dead"}' --priority 1 --apply-to queues

This queue is not related to a deduplication exchange and has no deduplication attributes of its own. However, when nacking a message from this queue, it crashes with this error:

 [error] <0.2245.0> Restarting crashed queue 'results' in vhost '/'.
[error] <0.2033.0> CRASH REPORT Process <0.2033.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':fetchwhile(#Fun<rabbit_amqqueue_process.36.129944743>, #Fun<rabbit_amqqueue_process.42.129944743>, [], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.2033.0>,[],[],[],...},...}) line 225 in gen_server2:terminate/3 line 1166

Any ideas what is going on? I've even deleted any deduplication exchanges/queues and the same crash happens every time a message should be routed to dead letter, whether set through policy or when declaring the queue (using arguments x-dead-letter-exchange for eg).

noxdafox commented 5 years ago

Hello,

RabbitMQ provides a bit cumbersome way to extend queues. The plugin needs to override the original implementation and delegate any call to it which it is not supposed to handle.

This unfortunately means that any bug within the plugin's queue logic will affect the queues even when deduplication is not enabled.

Could you please provide a complete trace of the error you get? The provided information is too little to make a guess right now.

damien-roche commented 5 years ago

Hi.

That's what I was suspecting (hoping wasn't the case). Are there any other suggested methods of de-duping messages, perhaps without the plugin?

Is it not possible to signal something in these cases? All that happens in my consumer is the channel goes down (my consumer logic reconnects). Is this crash something I can determine from a consumer?

Here is a full error/trace:

> 2019-01-09 11:08:43.334 [error] <0.4202.0> Restarting crashed queue 'results' in vhost '/'.
> 2019-01-09 11:08:43.334 [error] <0.3984.0> ** Generic server <0.3984.0> terminating
> ** Last message in was {notify_down,<0.4052.0>}
> ** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.3984.0>,[],[],[],undefined,undefined,[],[],crashed,2,[],<<"/">>,#{user => <<"guest">>}},none,true,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.3984.0>,[],[],[],undefined,undefined,[],[],crashed,2,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},1,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/3MOP8SFTC1FGOYT1OQBIU1T50",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"results">>}},{{client_msstate,<0.366.0>,<<232,156,174,124,178,250,40,184,238,42,120,208,20,210,106,144>>,#{},{state,#Ref<0.4282032052.1860567042.187513>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.371.0>,#Ref<0.4282032052.1860567042.187518>,#Ref<0.4282032052.1860567041.186610>,#Ref<0.4282032052.1860567042.187519>,#Ref<0.4282032052.1860567042.187520>,{4000,800}},{client_msstate,<0.363.0>,<<160,206,87,153,252,30,7,241,6,230,21,177,18,52,127,33>>,#{},{state,#Ref<0.4282032052.1860567041.186575>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.364.0>,#Ref<0.4282032052.1860567041.186576>,#Ref<0.4282032052.1860567041.186573>,#Ref<0.4282032052.1860567041.186577>,#Ref<0.4282032052.1860567041.186578>,...}},...}}},...}
> ** Reason for termination ==
> ** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',requeue,[[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.3984.0>,[],[],[],undefined,undefined,[],[],crashed,2,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},1,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/3MOP8SFTC1FGOYT1OQBIU1T50",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"results">>}},{{client_msstate,<0.366.0>,<<232,156,174,124,178,250,40,184,238,42,120,208,20,210,106,144>>,#{},{state,#Ref<0.4282032052.1860567042.187513>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.371.0>,#Ref<0.4282032052.1860567042.187518>,#Ref<0.4282032052.1860567041.186610>,#Ref<0.4282032052.1860567042.187519>,#Ref<0.4282032052.1860567042.187520>,{4000,800}},{client_msstate,<0.363.0>,<<160,206,87,153,252,30,7,241,6,230,21,177,18,52,127,33>>,#{},{state,#Ref<0.4282032052.1860567041.186575>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.364.0>,#Ref<0.4282032052.1860567041.186576>,#Ref<0.4282032052.1860567041.186573>,#Ref<0.4282032052.1860567041.186577>,#Ref<0.4282032052.1860567041.186578>,{4000,800}}},true,0,4096,0,0,0,0,0,0,infinity,0,0,0,0,0,0,{rates,0.08608385627104598,0.08608385627104598,0.08608385627104598,0.08608385627104598,...},...}}],...},...]}
> 2019-01-09 11:08:43.335 [error] <0.3984.0> CRASH REPORT Process <0.3984.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.3984.0>,[],[],[],...},...}) line 279 in gen_server2:terminate/3 line 1166
> 2019-01-09 11:08:43.335 [error] <0.851.0> Supervisor {<0.851.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],none,[],[],[],undefined,undefined,...}, declare, <0.850.0>) at <0.3984.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.3984.0>,[],[],[],...},...}) line 279 in context child_terminated
> 2019-01-09 11:08:43.337 [error] <0.3985.0> ** Generic server <0.3985.0> terminating
> ** Last message in was {notify_down,<0.4059.0>}
> ** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"bulk_uptime_checks">>},true,false,none,[],<0.3985.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"guest">>}},none,true,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"bulk_uptime_checks">>},true,false,none,[],<0.3985.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},0,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/74KYYG532VV24470DYFGC7T30",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"bulk_uptime_checks">>}},{{client_msstate,<0.366.0>,<<73,146,104,10,126,110,113,0,194,157,242,103,191,238,88,218>>,#{},{state,#Ref<0.4282032052.1860567042.187513>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.371.0>,#Ref<0.4282032052.1860567042.187518>,#Ref<0.4282032052.1860567041.186610>,#Ref<0.4282032052.1860567042.187519>,#Ref<0.4282032052.1860567042.187520>,{4000,800}},{client_msstate,<0.363.0>,<<29,63,153,207,161,204,26,168,220,191,176,21,99,81,232,207>>,#{},{state,#Ref<0.4282032052.1860567041.186575>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.364.0>,#Ref<0.4282032052.1860567041.186576>,#Ref<0.4282032052.1860567041.186573>,#Ref<0.4282032052.1860567041.186577>,...}},...}}},...}
> ** Reason for termination ==
> ** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',requeue,[[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"bulk_uptime_checks">>},true,false,none,[],<0.3985.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},0,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/74KYYG532VV24470DYFGC7T30",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"bulk_uptime_checks">>}},{{client_msstate,<0.366.0>,<<73,146,104,10,126,110,113,0,194,157,242,103,191,238,88,218>>,#{},{state,#Ref<0.4282032052.1860567042.187513>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.371.0>,#Ref<0.4282032052.1860567042.187518>,#Ref<0.4282032052.1860567041.186610>,#Ref<0.4282032052.1860567042.187519>,#Ref<0.4282032052.1860567042.187520>,{4000,800}},{client_msstate,<0.363.0>,<<29,63,153,207,161,204,26,168,220,191,176,21,99,81,232,207>>,#{},{state,#Ref<0.4282032052.1860567041.186575>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.364.0>,#Ref<0.4282032052.1860567041.186576>,#Ref<0.4282032052.1860567041.186573>,#Ref<0.4282032052.1860567041.186577>,#Ref<0.4282032052.1860567041.186578>,{4000,800}}},true,0,4096,0,0,0,0,0,0,infinity,0,0,0,0,0,0,{rates,0.0,0.0,0.0,0.0,-576457896158870550},{0,nil},...}}],...},...]}
> 2019-01-09 11:08:43.338 [error] <0.4203.0> Restarting crashed queue 'bulk_uptime_checks' in vhost '/'.
> 2019-01-09 11:08:43.338 [error] <0.3985.0> CRASH REPORT Process <0.3985.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"bulk_uptime_checks">>},true,false,none,[],<0.3985.0>,...},...}) line 279 in gen_server2:terminate/3 line 1166
> 2019-01-09 11:08:43.338 [error] <0.846.0> Supervisor {<0.846.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"bulk_uptime_checks">>},true,false,none,[],none,[],[],[],undefined,...}, declare, <0.845.0>) at <0.3985.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':requeue([], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"bulk_uptime_checks">>},true,false,none,[],<0.3985.0>,...},...}) line 279 in context child_terminated
noxdafox commented 5 years ago

The loglines:

** Reason for termination ==
** ** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',requeue,[[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"bulk_uptime_checks">>},true,false,none,[],<0.3985.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},0,{0,nil},{0,nil},{0,nil},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/74KYYG532VV24470DYFGC7T30",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"bulk_uptime_checks">>}},{{client_msstate,<0.366.0>,<<73,146,104,10,126,110,113,0,194,157,242,103,191,238,88,218>>,#{},{state,#Ref<0.4282032052.1860567042.187513>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.371.0>,#Ref<0.4282032052.1860567042.187518>,#Ref<0.4282032052.1860567041.186610>,#Ref<0.4282032052.1860567042.187519>,#Ref<0.4282032052.1860567042.187520>,{4000,800}},{client_msstate,<0.363.0>,<<29,63,153,207,161,204,26,168,220,191,176,21,99,81,232,207>>,#{},{state,#Ref<0.4282032052.1860567041.186575>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient",<0.364.0>,#Ref<0.4282032052.1860567041.186576>,#Ref<0.4282032052.1860567041.186573>,#Ref<0.4282032052.1860567041.186577>,#Ref<0.4282032052.1860567041.186578>,{4000,800}}},true,0,4096,0,0,0,0,0,0,infinity,0,0,0,0,0,0,{rates,0.0,0.0,0.0,0.0,-576457896158870550},{0,nil},...}}],...},...]}

Suggests me this is a duplicate of issue #22 which was fixed in commit 046a36a3b1d6fa859f4b142cf159f43ef58d61c5 and released in version 0.3.6. Could you please give it a try?

damien-roche commented 5 years ago

I've just upgraded to 0.3.6 (confirmed through rabbitmq-plugins list, restarted server) but it still appears to crash the queue. This is happening when it should be routing to a dead letter through a policy or defined in arguments when declaring queue:

> 2019-01-09 19:17:09.400 [error] <0.900.0> Restarting crashed queue 'results' in vhost '/'.
> 2019-01-09 19:17:09.401 [error] <0.419.0> ** Generic server <0.419.0> terminating
> ** Last message in was {'$gen_cast',{reject,false,[0],<0.863.0>}}
> ** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.419.0>,[],[],[],[{vhost,<<"/">>},{name,<<"pol">>},{pattern,<<"results">>},{'apply-to',<<"queues">>},{definition,[{<<"alternate-exchange">>,<<"dead">>},{<<"dead-letter-exchange">>,<<"dead">>},{<<"dead-letter-routing-key">>,<<"dead">>}]},{priority,10}],undefined,[],[],live,3,[],<<"/">>,#{user => <<"guest">>}},none,true,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.419.0>,[],[],[],undefined,undefined,[],undefined,live,2,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},1,{0,nil},{0,nil},{1,{0,{msg_status,0,<<194,109,141,110,41,204,17,93,130,28,141,247,79,232,27,6>>,{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"results">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<"red">>]},<<194,109,141,110,41,204,17,93,130,28,141,247,79,232,27,6>>,false},false,true,false,false,queue_index,{message_properties,undefined,false,3}},nil,nil}},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/3MOP8SFTC1FGOYT1OQBIU1T50",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"results">>}},{{client_msstate,<0.371.0>,<<208,22,179,222,16,110,126,58,79,51,86,112,176,48,135,237>>,#{},{state,#Ref<0.3491105379.333316097.155864>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",...},...},...}}},...}
> ** Reason for termination ==
> ** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',ackfold,[#Fun<rabbit_amqqueue_process.42.129944743>,[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.419.0>,[],[],[],undefined,undefined,[],undefined,live,2,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},1,{0,nil},{0,nil},{1,{0,{msg_status,0,<<194,109,141,110,41,204,17,93,130,28,141,247,79,232,27,6>>,{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"results">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<"red">>]},<<194,109,141,110,41,204,17,93,130,28,141,247,79,232,27,6>>,false},false,true,false,false,queue_index,{message_properties,undefined,false,3}},nil,nil}},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/3MOP8SFTC1FGOYT1OQBIU1T50",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"results">>}},{{client_msstate,<0.371.0>,<<208,22,179,222,16,110,126,58,79,51,86,112,176,48,135,237>>,#{},{state,#Ref<0.3491105379.333316097.155864>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.374.0>,#Ref<0.3491105379.333316097.155865>,#Ref<0.3491105379.333316097.155854>,#Ref<0.3491105379.333316097.155866>,#Ref<0.3491105379.333316097.155867>,{4000,800}},{client_msstate,<0.368.0>,<<"¨¾Ó~÷ãû4\bà*©¸Ð­D">>,#{},{state,#Ref<0.3491105379.333316097.155819>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9..."},...}},...}},...],...},...]}
> 2019-01-09 19:17:09.401 [error] <0.419.0> CRASH REPORT Process <0.419.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':ackfold(#Fun<rabbit_amqqueue_process.42.129944743>, [], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.419.0>,[],[],[],...},...}, [0]) line 299 in gen_server2:terminate/3 line 1166
> 2019-01-09 19:17:09.402 [error] <0.418.0> Supervisor {<0.418.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.5264.0>,[],[],[],undefined,...}, recovery, <0.417.0>) at <0.419.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':ackfold(#Fun<rabbit_amqqueue_process.42.129944743>, [], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[],<0.419.0>,[],[],[],...},...}, [0]) line 299 in context child_terminated

With routing declared in arguments for queue directly:

> 2019-01-09 19:22:35.295 [error] <0.1600.0> Restarting crashed queue 'results' in vhost '/'.
> 2019-01-09 19:22:35.296 [error] <0.1558.0> ** Generic server <0.1558.0> terminating
> ** Last message in was {'$gen_cast',{reject,false,[0],<0.1572.0>}}
> ** When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[{<<"x-dead-letter-exchange">>,longstr,<<"dead">>}],<0.1558.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},none,true,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[{<<"x-dead-letter-exchange">>,longstr,<<"dead">>}],<0.1558.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},1,{0,nil},{0,nil},{1,{0,{msg_status,0,<<9,249,79,179,215,72,252,252,18,94,150,209,58,81,226,54>>,{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"results">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<"red">>]},<<9,249,79,179,215,72,252,252,18,94,150,209,58,81,226,54>>,false},false,true,false,false,queue_index,{message_properties,undefined,false,3}},nil,nil}},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/3MOP8SFTC1FGOYT1OQBIU1T50",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"results">>}},{{client_msstate,<0.371.0>,<<191,22,70,16,151,172,19,132,174,192,73,230,200,123,233,245>>,#{},{state,#Ref<0.3491105379.333316097.155864>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.374.0>,#Ref<0.3491105379.333316097.155865>,#Ref<0.3491105379.333316097.155854>,#Ref<0.3491105379.333316097.155866>,#Ref<0.3491105379.333316097.155867>,...},...},...}}},...}
> ** Reason for termination ==
> ** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',ackfold,[#Fun<rabbit_amqqueue_process.42.129944743>,[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[{<<"x-dead-letter-exchange">>,longstr,<<"dead">>}],<0.1558.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},1,{0,nil},{0,nil},{1,{0,{msg_status,0,<<9,249,79,179,215,72,252,252,18,94,150,209,58,81,226,54>>,{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"results">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<"red">>]},<<9,249,79,179,215,72,252,252,18,94,150,209,58,81,226,54>>,false},false,true,false,false,queue_index,{message_properties,undefined,false,3}},nil,nil}},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/3MOP8SFTC1FGOYT1OQBIU1T50",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"results">>}},{{client_msstate,<0.371.0>,<<191,22,70,16,151,172,19,132,174,192,73,230,200,123,233,245>>,#{},{state,#Ref<0.3491105379.333316097.155864>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",<0.374.0>,#Ref<0.3491105379.333316097.155865>,#Ref<0.3491105379.333316097.155854>,#Ref<0.3491105379.333316097.155866>,#Ref<0.3491105379.333316097.155867>,{4000,800}},{client_msstate,<0.368.0>,<<212,29,121,30,237,113,35,124,139,202,131,111,67,177,106,154>>,#{},{state,#Ref<0.3491105379.333316097.155819>,...},...}},...}},...],...},...]}
> 2019-01-09 19:22:35.296 [error] <0.1558.0> CRASH REPORT Process <0.1558.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':ackfold(#Fun<rabbit_amqqueue_process.42.129944743>, [], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[{<<"x-dead-letter-exc...">>,...}],...},...}, [0]) line 299 in gen_server2:terminate/3 line 1166
> 2019-01-09 19:22:35.297 [error] <0.1557.0> Supervisor {<0.1557.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[{<<"x-dead-letter-exchange">>,...}],...}, declare, <0.1556.0>) at <0.1558.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':ackfold(#Fun<rabbit_amqqueue_process.42.129944743>, [], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[{<<"x-dead-letter-exc...">>,...}],...},...}, [0]) line 299 in context child_terminated
noxdafox commented 5 years ago

What version of Erlang and RabbitMQ are you using?

damien-roche commented 5 years ago

I'm on RMQ 3.7.8 and Erlang OTP 20 (erts-9.2).

noxdafox commented 5 years ago

Note that this time the reason for the crash is way different:

** Reason for termination ==
** {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',ackfold,[#Fun<rabbit_amqqueue_process.42.129944743>,[],{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"results">>},true,false,none,[{<<"x-dead-letter-exchange">>,longstr,<<"dead">>}],<0.1558.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{0,{[],[]}},{0,{[],[]}},1,{0,nil},{0,nil},{1,{0,{msg_status,0,<<9,249,79,179,215,72,252,252,18,94,150,209,58,81,226,54>>,{basic_message,{resource,<<"/">>,exchange,<<>>},[<<"results">>],{content,60,{'P_basic',undefined,undefined,[],1,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},none,none,[<<"red">>]},<<9,249,79,179,215,72,252,252,18,94,150,209,58,81,226,54>>,false},false,true,false,false,queue_index,{message_properties,undefined,false,3}},nil,nil}},{qistate,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/queues/3MOP8SFTC1FGOYT1OQBIU1T50",{#{},[]},undefined,0,32768,#Fun<rabbit_variable_queue.10.44586449>,#Fun<rabbit_variable_queue.11.44586449>,{0,nil},{0,nil},[],[],{resource,<<"/">>,queue,<<"results">>}},{{client_msstate,<0.371.0>,<<191,22,70,16,151,172,19,132,174,192,73,230,200,123,233,245>>,#{},{state,#Ref<0.3491105379.333316097.155864>,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent"},rabbit_msg_store_ets_index,"/var/lib/rabbitmq/mnesia/rabbit@deathstar/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent",0.374.0>,#Ref<0.3491105379.333316097.155865>,#Ref<0.3491105379.333316097.155854>,#Ref<0.3491105379.333316097.155866>,#Ref<0.3491105379.333316097.155867>,{4000,800}},{client_msstate,<0.368.0>,<<212,29,121,30,237,113,35,124,139,202,131,111,67,177,106,154>>,#{},{state,#Ref<0.3491105379.333316097.155819>,...},...}},...}},...],...},...]}

I had a quick look at the code and did not notice anything evident. Therefore, I'll need to reproduce this issue to dig further.

Could you please provide a sequence of commands to reproduce the issue?

That would really simplify the investigations.

damien-roche commented 5 years ago

Sure, here is how I'm reproducing:

$ rabbitmqadmin declare exchange name=dead-letter type=fanout
=> exchange declared

$ rabbitmqadmin declare exchange name=my-exchange type=direct
=> exchange declared

$ rabbitmqadmin declare queue name=my-queue durable=true
=> queue declared

$ rabbitmqadmin declare binding source="my-exchange" destination_type="queue" destination="my-queue" routing_key="my-route"
=> binding declared

$ rabbitmqctl set_policy DLX "my-queue" '{"dead-letter-exchange":"dead-letter"}' --priority 1 --apply-to queues
=> Setting policy "DLX" for pattern "my-queue" to "{"dead-letter-exchange":"dead-letter"}" with priority "1" for vhost "/" ...

$ rabbitmqadmin publish exchange=my-exchange routing_key="my-route" payload="hello"
=> Message published

Error (different from above -- sadly cannot seem to reproduce that error):

2019-01-09 22:28:31.520 [error] <0.7211.0> CRASH REPORT Process <0.7211.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':fetchwhile(#Fun<rabbit_amqqueue_process.36.129944743>, #Fun<rabbit_amqqueue_process.42.129944743>, [], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"my-queue">>},true,false,none,[],<0.7211.0>,[],[],[],...},...}) line 234 in gen_server2:terminate/3 line 1166
2019-01-09 22:28:31.521 [error] <0.6764.0> Supervisor {<0.6764.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"my-queue">>},true,false,none,[],none,[],[],[],undefined,undefined,...}, declare, <0.6763.0>) at <0.7211.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':fetchwhile(#Fun<rabbit_amqqueue_process.36.129944743>, #Fun<rabbit_amqqueue_process.42.129944743>, [], {dqstate,{amqqueue,{resource,<<"/">>,queue,<<"my-queue">>},true,false,none,[],<0.7211.0>,[],[],[],...},...}) line 234 in context child_terminated

As an aside, if I apply the policy to the exchange, I receive no errors when nacking:

$ rabbitmqctl set_policy DLX "my-exchange" '{"dead-letter-exchange":"dead-letter"}' --priority 1 --apply-to exchanges
=> Setting policy "DLX" for pattern "my-exchange" to "{"dead-letter-exchange":"dead-letter"}" with priority "1" for vhost "/" ...

$ rabbitmqadmin publish exchange=my-exchange routing_key="my-route" payload="hello"
=> Message published

$ rabbitmqadmin get queue=my-queue ackmode=reject_requeue_false
+-------------+-------------+---------------+---------+---------------+------------------+------------+-------------+
| routing_key |  exchange   | message_count | payload | payload_bytes | payload_encoding | properties | redelivered |
+-------------+-------------+---------------+---------+---------------+------------------+------------+-------------+
| my-route    | my-exchange | 0             | hello   | 5             | string           |            | False       |

..though no errors are produced when nacking, messages do not end up in my dead letter exchange (though unsure if that is misconfiguration on my part based on commands above).

damien-roche commented 5 years ago

I finally have some messages routing to dead letter by disabled rabbitmq_message_deduplication and declaring x-dead-letter-exchange in args when declaring queue:

$ rabbitmqadmin declare queue name=queue-with-args durable=true \
     'arguments={"x-dead-letter-exchange":"dead-letter"}'
=> queue declared

When nacking messages on this queue they are, as expected, sent to the dead letter exchange.

However, when I enable rabbitmq_message_deduplication I'm back to the error in my previous reply.

noxdafox commented 5 years ago

Found the issue which affects several functions.

def ackfold(function, A, state = dqstate(queue_state: qs), acks) do
  ...
end

Note how the second parameter of the above function is a capital letter A. It is a copy-paste from rabbitmq-common function definitions which are in Erlang. In Erlang variables are supposed to start with capital letter. But in Elixir Capital letters have completely different meaning. Hence, the function_clause error.

Will fix it and make a new release.

damien-roche commented 5 years ago

Good find! I'll give it a try when I get hands on release.

noxdafox commented 5 years ago

New release 0.3.7 is ready, please let me know if it fixes your issue.

damien-roche commented 5 years ago

Working great now! :+1: :tada:

Thanks :)

noxdafox commented 5 years ago

np, thanks for reporting this!