rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
12.07k stars 3.91k forks source link

Possible race condition with the `amqp10_client:flow_link_credit` function #2579

Open AntoineGagne opened 5 years ago

AntoineGagne commented 5 years ago

It seems as if there might a race condition with amqp10_client:flow_link_credit function. I have code similar to the following snippet:

init([SessionPid]) ->
    %% ...
    {ok, State, {continue, attach}}.

handle_continue(attach, State=#state{credits = Credits,
                                     session_pid = SessionPid}) ->
    %% ...
    {ok, LinkRef} = amqp10_client:attach_receiver_link(SessionPid, Name, Address, settled,
                                                       unsettled_state, Filters),
    {noreply, State#state{link_ref = LinkRef}};

handle_continue(_Continue, State) ->
    {noreply, State}.

handle_info({amqp10_event, {link, LinkRef, attached}}, State=#state{credits = Credits}) ->
    ok = amqp10_client:flow_link_credit(LinkRef, Credits, never),
    {noreply, State#state{link_ref = LinkRef}};

handle_info({amqp10_event, {link, LinkRef, {detached, Reason}}}, State) ->
    {noreply, State};

%% ...

If there is a detached event, then the whole session process crashes with a badmatch error.

The only way to fix this seems to be to put the amqp10_client:flow_link_credit right after the amqp10_client:attach_receiver_link. However, this make me feel as if the same could happen if the attached and detachedevents are received right after the amqp10_client:attach_receiver_link and before the amqp10_client:flow_link_credit. Is this the expected behaviour?

Sorry if the question isn't that clear!

michaelklishin commented 5 years ago

Can you please paste the stack trace? Is there a runnable test case you can share?

AntoineGagne commented 5 years ago

Here is the relevant log of the code I posted:

when=2019-06-13T18:02:55.781475+00:00 level=error pid=<0.525.0> at=proc_lib:crash_report/4:508 report="[[{initial_call,{amqp10_client_session,init,['Argument__1']}},{pid,<0.525.0>},{registered_name,[]},{error_inf
o,{exit,{{badmatch,#{0 => {link,<<\"receiver-link-0\">>,{link_ref,receiver,<0.525.0>,0},attached,<0.528.0>,0,0,receiver,undefined,{pid,<0.528.0>},0,30,30,0,false,undefined,never},1 => {link,<<\"receiver-link-1\">
>,{link_ref,receiver,<0.525.0>,1},attached,<0.531.0>,1,1,receiver,undefined,{pid,<0.531.0>},0,30,30,0,false,undefined,never},3 => {link,<<\"receiver-link-3\">>,{link_ref,receiver,<0.525.0>,3},attached,<0.537.0>,3
,3,receiver,undefined,{pid,<0.537.0>},0,30,30,0,false,undefined,never}}},[{amqp10_client_session,send_flow,5,[{file,\"src/amqp10_client_session.erl\"},{line,612}]},{amqp10_client_session,mapped,2,[{file,\"src/amq
p10_client_session.erl\"},{line,278}]},{gen_fsm,handle_msg,8,[{file,\"gen_fsm.erl\"},{line,486}]},{proc_lib,init_p_do_apply,3,[{file,\"proc_lib.erl\"},{line,249}]}]},[{gen_fsm,terminate,7,[{file,\"gen_fsm.erl\"},
{line,609}]},{proc_lib,init_p_do_apply,3,[{file,\"proc_lib.erl\"},{line,249}]}]}},{ancestors,[<0.524.0>,<0.519.0>,amqp10_client_sup,<0.412.0>]},{message_queue_len,0},{messages,[]},{links,[<0.524.0>]},{dictionary,
[]},{trap_exit,true},{status,running},{heap_size,6772},{stack_size,27},{reductions,97202}],[]]" label={proc_lib,crash}
when=2019-06-13T18:02:55.783130+00:00 level=warning pid=<0.517.0> at=event_hub_amqp_session:handle_info/2:88 what=session_crashed reason="{{badmatch,#{0 => {link,<<\"receiver-link-0\">>,{link_ref,receiver,<0.525.
0>,0},attached,<0.528.0>,0,0,receiver,undefined,{pid,<0.528.0>},0,30,30,0,false,undefined,never},1 => {link,<<\"receiver-link-1\">>,{link_ref,receiver,<0.525.0>,1},attached,<0.531.0>,1,1,receiver,undefined,{pid,<
0.531.0>},0,30,30,0,false,undefined,never},3 => {link,<<\"receiver-link-3\">>,{link_ref,receiver,<0.525.0>,3},attached,<0.537.0>,3,3,receiver,undefined,{pid,<0.537.0>},0,30,30,0,false,undefined,never}}},[{amqp10_
client_session,send_flow,5,[{file,\"src/amqp10_client_session.erl\"},{line,612}]},{amqp10_client_session,mapped,2,[{file,\"src/amqp10_client_session.erl\"},{line,278}]},{gen_fsm,handle_msg,8,[{file,\"gen_fsm.erl\
"},{line,486}]},{proc_lib,init_p_do_apply,3,[{file,\"proc_lib.erl\"},{line,249}]}]}" monitor_reference=#Ref<0.1786877685.349962242.237527>

I think the error comes from the link that is removed on the detached event and is no longer there when the send_flow/2 function is called. A call to maps:find/2 and handling the case where the link is non-existent would probably fix this if that is the case.

michaelklishin commented 5 years ago

Feel free to submit a PR.