rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
12.15k stars 3.91k forks source link

Connection exception when a link is reopened to same address #2589

Closed kohtala closed 3 weeks ago

kohtala commented 5 years ago

I have a number of worker AMQP 1.0 clients with Qpid Proton 0.28 library, that receive tasks on one address from broker running rabbitmq:3.7.15 Docker image with only rabbitmq-plugins enable rabbitmq_amqp1_0 && rabbitmq-plugins enable rabbitmq_management on top. I only credit the link when the worker is ready for a new task, so tasks are distributed to workers by RabbitMQ. During the task the client connects to another address to receive further messages related to that one task. After the worker is done with the task, it closes the corresponding task link. The rabbitmq/rabbitmq-amqp1.0#60 problem occurs here, but I handled the event as a link close event. This far it seems ok.

Problem comes once worker receives another task and it requires opening link to same address as in earlier task (the below logs shows continuous.00000000-0000-0000-0000-000000000004). I try to open the receiver before accepting the task, and send one more status message on a third link. But the message disposition for accepting the task never reaches RabbitMQ as it has an internal error and the TCP connection to broker is reset at broker end. This is from the container log:

2019-07-01 08:45:52.794 [error] <0.3711.0> connection <0.3705.0>, channel 1 - error:
{amqp_error,not_allowed,[97,116,116,101,109,112,116,32,116,111,32,114,101,117,115,101,32,99,111,110,115,117,109,101,114,32,116,97,103,32,39,99,116,97,103,45,0,0,0,3,39],'basic.consume'}
2019-07-01 08:45:52.795 [warning] <0.3705.0> Connection (<0.3705.0>) closing: received hard error {server_initiated_close,530,[97,116,116,101,109,112,116,32,116,111,32,114,101,117,115,101,32,99,111,110,115,117,109,101,114,32,116,97,103,32,39,99,116,97,103,45,0,0,0,3,39]} from server
2019-07-01 08:45:52.796 [error] <0.3702.0> ** Generic server <0.3702.0> terminating
** Last message in was {'$gen_cast',{frame,{'v1_0.attach',{utf8,<<"7dc393e3-31f5-4318-bdad-2f223230bcec-continuous.00000000-0000-0000-0000-000000000004">>},{uint,3},true,{ubyte,2},{ubyte,0},{'v1_0.source',{utf8,<<"continuous.00000000-0000-0000-0000-000000000004">>},{uint,0},undefined,{uint,0},false,undefined,undefined,undefined,undefined,undefined,undefined},{'v1_0.target',undefined,{uint,0},undefined,{uint,0},false,undefined,undefined},undefined,undefined,{uint,0},{ulong,0},undefined,undefined,undefined},<0.3696.0>}}
** When Server state == {state,<0.3705.0>,<0.3711.0>,unlimited,<0.3696.0>,<0.3701.0>,{[],[]},{session,0,2147483646,2147483647,336,65535,65458,2,65533,65535,78,2,{0,nil},{1,{1,{outgoing_delivery,2,{symbol,<<"amqp:released:list">>}},nil,nil}}}}
** Reason for termination == 
** {{{shutdown,{connection_closing,{server_initiated_close,530,[97,116,116,101,109,112,116,32,116,111,32,114,101,117,115,101,32,99,111,110,115,117,109,101,114,32,116,97,103,32,39,99,116,97,103,45,0,0,0,3,39]}}},{gen_server,call,[<0.3711.0>,{subscribe,{'basic.consume',0,<<"continuous.00000000-0000-0000-0000-000000000004">>,<<99,116,97,103,45,0,0,0,3>>,false,false,false,false,[{<<"x-credit">>,table,[{<<"credit">>,long,0},{<<"drain">>,bool,false}]}]},<0.3702.0>},60000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,223}]},{rabbit_amqp1_0_channel,convert_error,1,[{file,"src/rabbit_amqp1_0_channel.erl"},{line,47}]},{rabbit_amqp1_0_outgoing_link,attach,3,[{file,"src/rabbit_amqp1_0_outgoing_link.erl"},{line,58}]},{rabbit_amqp1_0_session_process,with_disposable_channel,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,410}]},{rabbit_amqp1_0_session_process,handle_control,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,230}]},{rabbit_amqp1_0_session_process,handle_cast,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,167}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1056}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]}
2019-07-01 08:45:52.796 [error] <0.3702.0> CRASH REPORT Process <0.3702.0> with 0 neighbours exited with reason: {{shutdown,{connection_closing,{server_initiated_close,530,[97,116,116,101,109,112,116,32,116,111,32,114,101,117,115,101,32,99,111,110,115,117,109,101,114,32,116,97,103,32,39,99,116,97,103,45,0,0,0,3,39]}}},{gen_server,call,[<0.3711.0>,{subscribe,{'basic.consume',0,<<"continuous.00000000-0000-0000-0000-000000000004">>,<<99,116,97,103,45,0,0,0,3>>,false,false,false,false,[{<<"x-credit">>,table,[{<<"credit">>,long,0},{<<"drain">>,bool,false}]}]},<0.3702.0>},60000]}} in gen_server:call/3 line 223 in gen_server2:terminate/3 line 1172
2019-07-01 08:45:52.797 [error] <0.3700.0> Supervisor {<0.3700.0>,rabbit_amqp1_0_session_sup} had child channel started with rabbit_amqp1_0_session_process:start_link({0,<0.3696.0>,<0.3701.0>,{user,<<"guest">>,[administrator],[{rabbit_auth_backend_internal,none}]},...}) at <0.3702.0> exit with reason {{shutdown,{connection_closing,{server_initiated_close,530,[97,116,116,101,109,112,116,32,116,111,32,114,101,117,115,101,32,99,111,110,115,117,109,101,114,32,116,97,103,32,39,99,116,97,103,45,0,0,0,3,39]}}},{gen_server,call,[<0.3711.0>,{subscribe,{'basic.consume',0,<<"continuous.00000000-0000-0000-0000-000000000004">>,<<99,116,97,103,45,0,0,0,3>>,false,false,false,false,[{<<"x-credit">>,table,[{<<"credit">>,long,0},{<<"drain">>,bool,false}]}]},<0.3702.0>},60000]}} in gen_server:call/3 line 223 in context child_terminated
2019-07-01 08:45:52.798 [error] <0.3700.0> Supervisor {<0.3700.0>,rabbit_amqp1_0_session_sup} had child channel started with rabbit_amqp1_0_session_process:start_link({0,<0.3696.0>,<0.3701.0>,{user,<<"guest">>,[administrator],[{rabbit_auth_backend_internal,none}]},...}) at <0.3702.0> exit with reason reached_max_restart_intensity in context shutdown
2019-07-01 08:45:55.800 [error] <0.3696.0> closing AMQP connection <0.3696.0> (172.21.0.1:34450 -> 172.21.0.2:5672):
{handshake_error,running,<0.3702.0>,{{symbol,<<"amqp:internal-error">>},"Session error: ~p~n~p~n",[{{shutdown,{connection_closing,{server_initiated_close,530,[97,116,116,101,109,112,116,32,116,111,32,114,101,117,115,101,32,99,111,110,115,117,109,101,114,32,116,97,103,32,39,99,116,97,103,45,0,0,0,3,39]}}},{gen_server,call,[<0.3711.0>,{subscribe,{'basic.consume',0,<<"continuous.00000000-0000-0000-0000-000000000004">>,<<99,116,97,103,45,0,0,0,3>>,false,false,false,false,[{<<"x-credit">>,table,[{<<"credit">>,long,0},{<<"drain">>,bool,false}]}]},<0.3702.0>},60000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,223}]},{rabbit_amqp1_0_channel,convert_error,1,[{file,"src/rabbit_amqp1_0_channel.erl"},{line,47}]},{rabbit_amqp1_0_outgoing_link,attach,3,[{file,"src/rabbit_amqp1_0_outgoing_link.erl"},{line,58}]},{rabbit_amqp1_0_session_process,with_disposable_channel,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,410}]},{rabbit_amqp1_0_session_process,handle_control,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,230}]},{rabbit_amqp1_0_session_process,handle_cast,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,167}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1056}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]]}}

Sorry for not having a client to test this with. I am rather pushing limits to get even this info to you. Hard to take time to prepare the test client.

I got around this problem by keeping all links open for now, and just not crediting links we are not interested in. So I am not asking for your effort in support.

kjnilsson commented 5 years ago

Thanks. Ok this is probably a problem with the AMQP 1.0 -> AMQP 0.9.1 translation that goes on. The error states that this is an attempt to reuse the consumer tag so it could be that the 1.0 session doesn't correctly dispose of the consumer when the corresponding link is called. I will see if it is possible to reproduce simply.

ansd commented 3 weeks ago

Fixed in RabbitMQ 4.0.