jcabotc / hare

Some AMQP abstractions
8 stars 5 forks source link

Graceful shutdown failure #7

Open archseer opened 7 years ago

archseer commented 7 years ago

This is admittedly before the Actor change in the main branch (because our workers seem to be incompatible with the latest layout). But if we try to gracefully shut down an app that has a running (but idle) consumer with :init.stop, the code will break:

time=20:44:39.657 level=error       GenServer #PID<0.1105.0> terminating
** (stop) :unexpected_delivery_and_no_default_consumer
Last message: {:consumer_call, {:"basic.cancel_ok", "amq.ctag-2jitkFZKtZsCaO-HgGA7nQ"}, {:"basic.cancel", "amq.ctag-2jitkFZKtZsCaO-HgGA7nQ", false}}
State: {:state, :amqp_selective_consumer, {:state, {:dict, 0, 16, 16, 8, 80, 48, {[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []}, {{[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []}}}, :undefined, {:dict, 0, 16, 16, 8, 80, 48, {[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []}, {{[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []}}}, :none}}
time=20:44:39.666 level=error       GenServer #PID<0.1106.0> terminating
** (stop) exited in: :gen_server2.call(#PID<0.1105.0>, {:consumer_call, {:"basic.cancel_ok", "amq.ctag-2jitkFZKtZsCaO-HgGA7nQ"}, {:"basic.cancel", "amq.ctag-2jitkFZKtZsCaO-HgGA7nQ", false}}, :infinity)
    ** (EXIT) :unexpected_delivery_and_no_default_consumer
    (rabbit_common) src/deps/rabbit_common/src/gen_server2.erl:327: :gen_server2.call/3
    (amqp_client) src/deps/amqp_client/src/amqp_channel.erl:745: :amqp_channel.handle_method_from_server1/3
    (stdlib) gen_server.erl:601: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:667: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:method, {:"basic.cancel_ok", "amq.ctag-2jitkFZKtZsCaO-HgGA7nQ"}, :none, :noflow}}
State: {:state, 23, #PID<0.567.0>, #PID<0.1105.0>, :network, {[{{#PID<0.1108.0>, #Reference<0.0.1.627>}, #PID<0.1108.0>, {:"basic.cancel", "amq.ctag-2jitkFZKtZsCaO-HgGA7nQ", false}, :none, :noflow}], []}, false, #PID<0.1107.0>, :none, :none, 0, true, :none, {0, nil}, {0, nil}, true, false}
time=20:44:39.667 level=warn        Connection (#PID<0.567.0>) closing: internal error in channel (#PID<0.1106.0>): {:unexpected_delivery_and_no_default_consumer,
 {:gen_server2, :call,
  [#PID<0.1105.0>,
   {:consumer_call, {:"basic.cancel_ok", "amq.ctag-2jitkFZKtZsCaO-HgGA7nQ"},
    {:"basic.cancel", "amq.ctag-2jitkFZKtZsCaO-HgGA7nQ", false}}, :infinity]}}

time=20:44:39.674 level=error       GenServer #PID<0.567.0> terminating
** (stop) {:channel0_died, :method_field_shortstr_overflow}
Last message: {:channel_exit, 0, :method_field_shortstr_overflow}
State: {:state, :amqp_network_connection, {:state, #Port<0.12584>, "client 127.0.0.1:62178 -> 127.0.0.1:5672", 60, #PID<0.573.0>, 131072, #PID<0.565.0>, {:internal_error, 541, "<0.1106.0>:{unexpected_delivery_and_no_default_consumer,\n               {gen_server2,call,\n                   [<0.1105.0>,\n                    {consumer_call,\n                        {'basic.cancel_ok',\n                            <<\"amq.ctag-2jitkFZKtZsCaO-HgGA7nQ\">>},\n                        {'basic.cancel',\n                            <<\"amq.ctag-2jitkFZKtZsCaO-HgGA7nQ\">>,false}},\n                    infinity]}}"}, false}, #PID<0.571.0>, {:amqp_params_network, "guest", "guest", "vhost", 'localhost', 5672, 0, 0, 0, :infinity, :none, [&:amqp_auth_mechanisms.plain/3, &:amqp_auth_mechanisms.amqplain/3], [], []}, 0, [{"capabilities", :table, [{"publisher_confirms", :bool, true}, {"exchange_exchange_bindings", :bool, true}, {"basic.nack", :bool, true}, {"consumer_cancel_notify", :bool, true}, {"connection.blocked", :bool, true}, {"consumer_priorities", :bool, true}, {"authentication_failure_close", :bool, true}, {"per_consumer_qos", :bool, true}, {"direct_reply_to", :bool, true}]}, {"cluster_name", :longstr, "rabbit@local"}, {"copyright", :longstr, "Copyright (C) 2007-2016 Pivotal Software, Inc."}, {"information", :longstr, "Licensed under the MPL.  See http://www.rabbitmq.com/"}, {"platform", :longstr, "Erlang/OTP"}, {"product", :longstr, "RabbitMQ"}, {"version", :longstr, "3.6.6"}], :none, {:closing, :internal_error, {:"connection.close", 541, "<0.1106.0>:{unexpected_delivery_and_no_default_consumer,\n               {gen_server2,call,\n                   [<0.1105.0>,\n                    {consumer_call,\n                        {'basic.cancel_ok',\n                            <<\"amq.ctag-2jitkFZKtZsCaO-HgGA7nQ\">>},\n                        {'basic.cancel',\n                            <<\"amq.ctag-2jitkFZKtZsCaO-HgGA7nQ\">>,false}},\n                    infinity]}}", 0, 0}, :none}}

I think that's because the process tree starts the connection first, then the consumers. So on shutdown, the connection is probably terminated before the consumer :/

archseer commented 7 years ago

Went ahead and updated to latest master, this is how the failure looks now:

(stop) :unexpected_delivery_and_no_default_consumer
Last message: {:consumer_call, {:"basic.cancel_ok", "amq.ctag-hxDXVXp2GQX0CdN5EbPcyg"}, {:"basic.cancel", "amq.ctag-hxDXVXp2GQX0CdN5EbPcyg", false}}
State: {:state, :amqp_selective_consumer, {:state, {:dict, 0, 16, 16, 8, 80, 48, {[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []}, {{[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []}}}, :undefined, {:dict, 0, 16, 16, 8, 80, 48, {[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []}, {{[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []}}}, :none}}
time=21:11:14.259 level=error       GenServer #PID<0.1884.0> terminating
** (stop) exited in: :gen_server2.call(#PID<0.1883.0>, {:consumer_call, {:"basic.cancel_ok", "amq.ctag-hxDXVXp2GQX0CdN5EbPcyg"}, {:"basic.cancel", "amq.ctag-hxDXVXp2GQX0CdN5EbPcyg", false}}, :infinity)
    ** (EXIT) :unexpected_delivery_and_no_default_consumer
    (rabbit_common) src/deps/rabbit_common/src/gen_server2.erl:327: :gen_server2.call/3
    (amqp_client) src/deps/amqp_client/src/amqp_channel.erl:745: :amqp_channel.handle_method_from_server1/3
    (stdlib) gen_server.erl:601: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:667: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:method, {:"basic.cancel_ok", "amq.ctag-hxDXVXp2GQX0CdN5EbPcyg"}, :none, :noflow}}
State: {:state, 23, #PID<0.1363.0>, #PID<0.1883.0>, :network, {[{{#PID<0.1886.0>, #Reference<0.0.1.14688>}, #PID<0.1886.0>, {:"basic.cancel", "amq.ctag-hxDXVXp2GQX0CdN5EbPcyg", false}, :none, :noflow}], []}, false, #PID<0.1885.0>, :none, :none, 0, true, :none, {0, nil}, {0, nil}, true, false}
time=21:11:14.259 level=warn        Connection (#PID<0.1363.0>) closing: internal error in channel (#PID<0.1884.0>): {:unexpected_delivery_and_no_default_consumer,
 {:gen_server2, :call,
  [#PID<0.1883.0>,
   {:consumer_call, {:"basic.cancel_ok", "amq.ctag-hxDXVXp2GQX0CdN5EbPcyg"},
    {:"basic.cancel", "amq.ctag-hxDXVXp2GQX0CdN5EbPcyg", false}}, :infinity]}}

time=21:11:14.260 level=error       GenServer #PID<0.1363.0> terminating
** (stop) {:channel0_died, :method_field_shortstr_overflow}
Last message: {:channel_exit, 0, :method_field_shortstr_overflow}
State: {:state, :amqp_network_connection, {:state, #Port<0.60171>, "client 127.0.0.1:62688 -> 127.0.0.1:5672", 60, #PID<0.1374.0>, 131072, #PID<0.1361.0>, {:internal_error, 541, "<0.1884.0>:{unexpected_delivery_and_no_default_consumer,\n               {gen_server2,call,\n                   [<0.1883.0>,\n                    {consumer_call,\n                        {'basic.cancel_ok',\n                            <<\"amq.ctag-hxDXVXp2GQX0CdN5EbPcyg\">>},\n                        {'basic.cancel',\n                            <<\"amq.ctag-hxDXVXp2GQX0CdN5EbPcyg\">>,false}},\n                    infinity]}}"}, false}, #PID<0.1368.0>, {:amqp_params_network, "guest", "guest", "nimbus", 'localhost', 5672, 0, 0, 0, :infinity, :none, [&:amqp_auth_mechanisms.plain/3, &:amqp_auth_mechanisms.amqplain/3], [], []}, 0, [{"capabilities", :table, [{"publisher_confirms", :bool, true}, {"exchange_exchange_bindings", :bool, true}, {"basic.nack", :bool, true}, {"consumer_cancel_notify", :bool, true}, {"connection.blocked", :bool, true}, {"consumer_priorities", :bool, true}, {"authentication_failure_close", :bool, true}, {"per_consumer_qos", :bool, true}, {"direct_reply_to", :bool, true}]}, {"cluster_name", :longstr, "rabbit@hyrule"}, {"copyright", :longstr, "Copyright (C) 2007-2016 Pivotal Software, Inc."}, {"information", :longstr, "Licensed under the MPL.  See http://www.rabbitmq.com/"}, {"platform", :longstr, "Erlang/OTP"}, {"product", :longstr, "RabbitMQ"}, {"version", :longstr, "3.6.6"}], :none, {:closing, :internal_error, {:"connection.close", 541, "<0.1884.0>:{unexpected_delivery_and_no_default_consumer,\n               {gen_server2,call,\n                   [<0.1883.0>,\n                    {consumer_call,\n                        {'basic.cancel_ok',\n                            <<\"amq.ctag-hxDXVXp2GQX0CdN5EbPcyg\">>},\n                        {'basic.cancel',\n                            <<\"amq.ctag-hxDXVXp2GQX0CdN5EbPcyg\">>,false}},\n                    infinity]}}", 0, 0}, :none}}
time=21:11:14.261 level=error       GenServer Hare.Conn terminating
** (stop) exited in: :gen_server.call(#PID<0.1363.0>, {:command, {:close, {:"connection.close", 200, "Goodbye", 0, 0}, :infinity}}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:212: :gen_server.call/3
    (hare) lib/hare/adapter/amqp.ex:29: Hare.Adapter.AMQP.close_connection/1
    (hare) lib/hare/core/conn/state/bridge.ex:128: Hare.Core.Conn.State.Bridge.disconnect/1
    (hare) lib/hare/core/conn/state.ex:91: Hare.Core.Conn.State.disconnect/1
    (stdlib) gen_server.erl:629: :gen_server.try_terminate/3
    (stdlib) gen_server.erl:795: :gen_server.terminate/7
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :open_channel
State: %Hare.Core.Conn.State{bridge: %Hare.Core.Conn.State.Bridge{adapter: Hare.Adapter.AMQP, backoff: [0, 10, 100, 1000, 5000], config: [host: "localhost", port: 5672, username: "guest", password: "guest", virtual_host: "vhost"], given: %AMQP.Connection{pid: #PID<0.1363.0>}, next_intervals: nil, ref: #Reference<0.0.2.2156>, status: :connected}, reply: &Connection.reply/2, waiting: %Hare.Core.Conn.State.Waiting{clients: []}}
time=21:11:14.262 level=error       GenServer Nimbus.Publisher.Inbound terminating
** (stop) exited in: :gen_server.call(Hare.Conn, :open_channel, 5000)
    ** (EXIT) exited in: :gen_server.call(#PID<0.1363.0>, {:command, {:close, {:"connection.close", 200, "Goodbye", 0, 0}, :infinity}}, :infinity)
        ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:212: :gen_server.call/3
    (hare) lib/hare/actor/state.ex:19: Hare.Actor.State.up/1
    (hare) lib/hare/actor.ex:110: Hare.Actor.connect/2
    (connection) lib/connection.ex:741: Connection.connect/3
    (stdlib) gen_server.erl:601: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:667: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:DOWN, #Reference<0.0.1.14330>, :process, #PID<0.1440.0>, {:shutdown, {:connection_closing, {:internal_error, 541, "<0.1884.0>:{unexpected_delivery_and_no_default_consumer,\n               {gen_server2,call,\n                   [<0.1883.0>,\n                    {consumer_call,\n                        {'basic.cancel_ok',\n                            <<\"amq.ctag-hxDXVXp2GQX0CdN5EbPcyg\">>},\n                        {'basic.cancel',\n                            <<\"amq.ctag-hxDXVXp2GQX0CdN5EbPcyg\">>,false}},\n                    infinity]}}"}}}}
State: %Hare.Actor.State{chan: nil, conn: Hare.Conn, given: %Hare.Publisher.State{config: [exchange: [name: "inbound", opts: [durable: true]]], declaration: %Hare.Publisher.Declaration{context: Hare.Context, steps: [declare_exchange: [export_as: :exchange, name: "inbound", opts: [durable: true]]]}, exchange: %Hare.Core.Exchange{chan: %Hare.Core.Chan{adapter: Hare.Adapter.AMQP, given: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.1363.0>}, pid: #PID<0.1440.0>}}, name: "inbound"}, given: %{}, mod: Nimbus.Publisher.Inbound}, mod: Hare.Publisher, ref: nil}
time=21:11:14.263 level=error       GenServer Nimbus.Publisher.Outbound terminating
** (stop) exited in: :gen_server.call(Hare.Conn, :open_channel, 5000)
    ** (EXIT) exited in: :gen_server.call(#PID<0.1363.0>, {:command, {:close, {:"connection.close", 200, "Goodbye", 0, 0}, :infinity}}, :infinity)
        ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:212: :gen_server.call/3
    (hare) lib/hare/actor/state.ex:19: Hare.Actor.State.up/1
    (hare) lib/hare/actor.ex:110: Hare.Actor.connect/2
    (connection) lib/connection.ex:741: Connection.connect/3
    (stdlib) gen_server.erl:601: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:667: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:DOWN, #Reference<0.0.2.2215>, :process, #PID<0.1444.0>, {:shutdown, {:connection_closing, {:internal_error, 541, "<0.1884.0>:{unexpected_delivery_and_no_default_consumer,\n               {gen_server2,call,\n                   [<0.1883.0>,\n                    {consumer_call,\n                        {'basic.cancel_ok',\n                            <<\"amq.ctag-hxDXVXp2GQX0CdN5EbPcyg\">>},\n                        {'basic.cancel',\n                            <<\"amq.ctag-hxDXVXp2GQX0CdN5EbPcyg\">>,false}},\n                    infinity]}}"}}}}
State: %Hare.Actor.State{chan: nil, conn: Hare.Conn, given: %Hare.Publisher.State{config: [exchange: [name: "outbound", opts: [durable: true]]], declaration: %Hare.Publisher.Declaration{context: Hare.Context, steps: [declare_exchange: [export_as: :exchange, name: "outbound", opts: [durable: true]]]}, exchange: %Hare.Core.Exchange{chan: %Hare.Core.Chan{adapter: Hare.Adapter.AMQP, given: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.1363.0>}, pid: #PID<0.1444.0>}}, name: "outbound"}, given: %{}, mod: Nimbus.Publisher.Outbound}, mod: Hare.Publisher, ref: nil}

Basically, from what I understand, the consumer closes and the server acknowledges that with basic_cancel.ok, but we never await that, so we either need to trap exits in the consumer and await it, or add a default consumer that will swallow up these errors.