noxdafox / rabbitmq-message-deduplication

RabbitMQ Plugin for filtering message duplicates
Mozilla Public License 2.0
277 stars 34 forks source link

Cannot ack a message from deduplicated queue #34

Closed Civilis317 closed 5 years ago

Civilis317 commented 5 years ago

Seen in version 0.40 of the deduplication plugin, running in rabbitmq 3.7.4. I want to deduplicate at queue level, so I created a direct exchange and bound queue to it with a routing key and the "x-message-deduplication" argument. Publishing went fine, duplicated messages were filtered out. However when I activated a subscriber to the queue the messages showed up but could not be acked and eventually the queue ended up in status crashed. Also when I try to "get" a message through the management gui with Ack mode "ack message requeue false" the same thing happens.

Could you please look into this (or tell me what I did wrong in the setup)?

Beneath is the logging I took from the terminal window where my rabbitmq was running in a docker-compose setup. rabbitmq | 2019-05-31 12:32:09.190 [error] <0.664.0> Restarting crashed queue 'q_tmbo_trigger' in vhost '/'. rabbitmq | 2019-05-31 12:32:09.190 [error] <0.623.0> Generic server <0.623.0> terminating rabbitmq | Last message in was {basic_get,<0.662.0>,true,<0.661.0>} rabbitmq | When Server state == {q,{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},none,false,rabbit_priority_queue,{passthrough,'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',{dqstate,{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},{vqstate,{0,{[],[]}},{0,{[],[]}},{delta,undefined,0,0,undefined},{3,{[{msg_status,3,<<78,86,250,11,140,232,135,45,211,172,106,145,123,226,196,255>>,{basic_message,{resource,<<"/">>,exchange,<<"x_gpl_sync">>},[<<"tmbo_trigger">>],{content,60,none,<<248,0,16,97,112,112,108,105,99,97,116,105,111,110,47,106,115,111,110,5,85,84,70,45,56,0,0,0,88,10,95,95,84,121,112,101,73,100,95,95,83,0,0,0,40,111,114,103,46,98,111,105,112,46,103,112,108,46,99,111,109,109,111,110,115,46,97,109,113,112,46,84,114,105,103,103,101,114,77,101,115,115,97,103,101,22,120,45,100,101,100,117,112,108,105,99,97,116,105,111,110,45,104,101,97,100,101,114,108,0,0,0,0,2,20,51,166,2,0>>,rabbit_framing_amqp_0_9_1,[<<"{\"dossierId\":34878374,\"dossierType\":\"TM\"}">>]},<<78,86,250,11,140,232,135,45,211,172,106,145,123,226,196,255>>,true},true,true,false,true,queue_index,{message_properties,undefined,false,41}}],[{msg_status,1,<<49,5,177,104,246,24,135,19,180,254,189,164,194,190,125,38>>,{basic_message,{resource,<<"/">>,exchange,<<"x_gpl_sync">>},[<<"tmbo_trigger">>],{content,60,none,<<248,0,16,97,112,112,108,105,99,97,116,105,111,110,47,106,115,111,110,5,85,84,70,45,56,0,0,0,88,10,95,95,84,121,112,101,73,100,95,95,83,0,0,0,40,111,114,103,46,98,111,105,112,46,103,112,108,46,99,111,109,109,111,110,115,46,97,109,113,112,46,84,114,105,103,103,101,114,77,101,115,115,97,103,101,22,120,45,100,101,100,117,112,...>>,...},...},...},...]}},...}}},...} rabbitmq | Reason for termination == rabbitmq | {function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,299}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,761}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1171}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]} rabbitmq | 2019-05-31 12:32:09.190 [error] <0.623.0> CRASH REPORT Process <0.623.0> with 0 neighbours exited with reason: no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':maybe_delete_cache_entry({amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplica...">>,...}],...}, 232321) line 426 in gen_server2:terminate/3 line 1166 rabbitmq | 2019-05-31 12:32:09.191 [error] <0.361.0> Supervisor {<0.361.0>,rabbit_amqqueue_sup} had child rabbit_amqqueue started with rabbit_prequeue:start_link({amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplica...">>,...}],...}, recovery, <0.360.0>) at <0.623.0> exit with reason no function clause matching 'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue':maybe_delete_cache_entry({amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplica...">>,...}],...}, 232321) line 426 in context child_terminated rabbitmq | 2019-05-31 12:32:09.191 [error] <0.662.0> Generic server <0.662.0> terminating rabbitmq | Last message in was {'$gen_cast',{method,{'basic.get',0,<<"q_tmbo_trigger">>,true},none,noflow}} rabbitmq | When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,0.659.0>,<0.659.0>,<0.652.0>,<<"<rabbit@d2c9db87eea7.1.652.0">>,{lstate,<0.661.0>,false},none,1,{[],[]},{user,<<"admin">>,[administrator],[{rabbit_auth_backend_internal,none}]},<<"/">>,<<>>,#{},{state,{dict,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},erlang},#{},#{},{set,0,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},<0.656.0>,{state,fine,5000,#Ref<0.581001283.1405616132.118530>},false,1,{{0,nil},{0,nil}},[],[],{{0,nil},{0,nil}},[{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,bool,true},{<<"consumer_cancel_notify">>,bool,true},{<<"connection.blocked">>,bool,true},{<<"authentication_failure_close">>,bool,true}],none,0,none,flow,[]} rabbitmq | Reason for termination == rabbitmq | {{{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,299}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,761}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1171}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.623.0>,{basic_get,<0.662.0>,true,<0.661.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,508}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1139}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]} rabbitmq | 2019-05-31 12:32:09.191 [error] <0.662.0> CRASH REPORT Process <0.662.0> with 1 neighbours exited with reason: {{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},...]},...} in gen_server2:call/3 line 329 in gen_server2:terminate/3 line 1166 rabbitmq | 2019-05-31 12:32:09.192 [error] <0.660.0> Supervisor {<0.660.0>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(1, <0.659.0>, <0.659.0>, <0.652.0>, <"<rabbit@d2c9db87eea7.1.652.0">>, rabbit_framing_amqp_0_9_1, {user,<<"admin">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"/">>, [{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,...},...], <0.656.0>, <0.661.0>) at <0.662.0> exit with reason {{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},...]},...} in gen_server2:call/3 line 329 in context child_terminated rabbitmq | 2019-05-31 12:32:09.192 [error] <0.660.0> Supervisor {<0.660.0>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(1, <0.659.0>, <0.659.0>, <0.652.0>, <"<rabbit@d2c9db87eea7.1.652.0">>, rabbit_framing_amqp_0_9_1, {user,<<"admin">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"/">>, [{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,...},...], <0.656.0>, <0.661.0>) at <0.662.0> exit with reason reached_max_restart_intensity in context shutdown rabbitmq | 2019-05-31 12:32:09.193 [error] <0.657.0> Supervisor {<0.657.0>,amqp_channel_sup} had child channel started with amqp_channel:start_link(direct, <0.652.0>, 1, <0.658.0>, {<"<rabbit@d2c9db87eea7.1.652.0">>,1}) at <0.659.0> exit with reason {{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},...]},...} in gen_server2:call/3 line 329 in context child_terminated rabbitmq | 2019-05-31 12:32:09.193 [error] <0.657.0> Supervisor {<0.657.0>,amqp_channel_sup} had child channel started with amqp_channel:start_link(direct, <0.652.0>, 1, <0.658.0>, {<"<rabbit@d2c9db87eea7.1.652.0">>,1}) at <0.659.0> exit with reason reached_max_restart_intensity in context shutdown rabbitmq | 2019-05-31 12:32:09.193 [warning] <0.652.0> Connection (<0.652.0>) closing: internal error in channel (<0.659.0>): {{{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,299}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,761}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1171}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.623.0>,{basic_get,<0.662.0>,true,<0.661.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,508}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1139}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]} rabbitmq | 2019-05-31 12:32:09.193 [error] <0.649.0> CRASH REPORT Process <0.649.0> with 0 neighbours exited with reason: {{{{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,...}]},...]},...},...},...} in gen_server:call/3 line 214 in gen_server:call/3 line 214 rabbitmq | 2019-05-31 12:32:09.194 [error] <0.644.0> Ranch listener rabbit_web_dispatch_sup_15672, connection process <0.644.0>, stream 5 had its request process <0.649.0> exit with reason {{{{function_clause,[{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',maybe_delete_cache_entry,[{amqqueue,{resource,<<"/">>,queue,<<"q_tmbo_trigger">>},true,false,none,[{<<"x-message-deduplication">>,bool,true}],<0.623.0>,[],[],[],undefined,undefined,[],[],crashed,0,[],<<"/">>,#{user => <<"admin">>}},232321],[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,426}]},{'Elixir.RabbitMQ.MessageDeduplicationPlugin.Queue',fetch,2,[{file,"lib/rabbit_message_deduplication_queue.ex"},{line,242}]},{rabbit_priority_queue,fetch,2,[{file,"src/rabbit_priority_queue.erl"},{line,299}]},{rabbit_amqqueue_process,fetch,2,[{file,"src/rabbit_amqqueue_process.erl"},{line,761}]},{rabbit_amqqueue_process,handle_call,3,[{file,"src/rabbit_amqqueue_process.erl"},{line,1171}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1029}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,257}]}]},{gen_server2,call,[<0.623.0>,{basic_get,<0.662.0>,true,<0.661.0>},infinity]}},[{gen_server2,call,3,[{file,"src/gen_server2.erl"},{line,329}]},{rabbit_misc,with_exit_handler,2,[{file,"src/rabbit_misc.erl"},{line,508}]},{rabbit_channel,handle_method,3,[{file,"src/rabbit_channel.erl"},{line,1139}]},{rabbit_channel,handle_cast,2,[{file,"src/rabbit_channel.erl"},{line,523}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1050}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]},{gen_server,call,[<0.659.0>,{call,{'basic.get',0,<<"q_tmbo_trigger">>,true},none,<0.649.0>},60000]}} and stacktrace [{gen_server,call,3,[{file,"gen_server.erl"},{line,214}]},{rabbit_mgmt_wm_queue_get,basic_get,5,[{file,"src/rabbit_mgmt_wm_queue_get.erl"},{line,130}]},{rabbit_mgmt_wm_queue_get,basic_gets,6,[{file,"src/rabbit_mgmt_wm_queue_get.erl"},{line,89}]},{rabbit_mgmt_wm_queue_get,'-do_it/2-fun-0-',8,[{file,"src/rabbit_mgmt_wm_queue_get.erl"},{line,75}]},{rabbit_mgmt_util,with_channel,5,[{file,"src/rabbit_mgmt_util.erl"},{line,878}]},{rabbit_mgmt_util,with_decode,5,[{file,"src/rabbit_mgmt_util.erl"},{line,721}]},{rabbit_mgmt_wm_queue_get,accept_content,2,[{file,"src/rabbit_mgmt_wm_queue_get.erl"},{line,51}]},{cowboy_rest,call,3,[{file,"src/cowboy_rest.erl"},{line,1182}]}]

noxdafox commented 5 years ago

Hello,

Did you try with a more recent version of RabbitMQ? 3.7.4 is quite old and the queue deduplication might suffer from some compatibility issue.

Civilis317 commented 5 years ago

Hi, I just tried 3.7.15 and got the same problem.

noxdafox commented 5 years ago

I tried both 3.7.4 and 3.7.15 versions and could not reproduce the issue you suffer from.

Could you please provide a reproducible example? The code you have been used to test the plugin should be good enough.

Also, which version of Erlang are you using?

Civilis317 commented 5 years ago

Ok, I will shortly. I will have to rework some code as to not expose my employers secrets, then I will post a link to the github project here. Thanks for looking into it so far.

noxdafox commented 5 years ago

No need to do anything fancy, it could even be some bash scripting. Thank you for your help.

noxdafox commented 5 years ago

This is most likely a duplicate of #33. As I can now reproduce it, there is no need to provide an example. Thanks!

Civilis317 commented 5 years ago

Yes! I can confirm that it works fine with Strings. messageProperties.setHeader("x-deduplication-header", String.valueOf(n)); solved it for me.

FYI: The problem is still there for nullable header values (ie using a property for the value of the header that is sometimes null), but for me that is not unreasonable.

Thank you very much for sorting this out and thank you for providing this very useful plugin!

noxdafox commented 5 years ago

Release 0.4.1 now handles non-string headers including integers and null values.

noxdafox commented 5 years ago

Closing the issue, please re-open if the problem persists.