rabbitmq / rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
https://www.rabbitmq.com/
Other
12.3k stars 3.91k forks source link

STOMP: close client connections after sending a consumer cancellation notification #2562

Open michaelklishin opened 4 years ago

michaelklishin commented 4 years ago

See rabbitmq/rabbitmq-server#2410.

STOMP 1.2 assumes that every ERROR frame is for a fatal event and must be followed by a TCP connection closure. Such soft and hard error distinction is present in some protocols (AMQP 0-9-1, AMQP 1.0) but not others. Consumer cancellation notifications is one such “soft error” initiated by the server. Another is a resource alarm notification but it’s not relevant for this discussion.

This change would be counterproductive in some ways and it’s unfortunate that there is no way to notify a STOMP client of something it could easily recover from but there are no alternative options available to us:

eigengrau commented 4 years ago

Hi Michael! Thanks for tracking this. I agree with you that as it stands, in STOMP 1.2 there is nothing a client could reasonably do to recover from an ERROR frame anyway, apart from closing the connection on his side.

In our case, closing the connection would be desirable. We are using Spring’s STOMP client, which does the right thing out of the box in the case of the transport going down, but has no special handling for ERROR frames. I also had a look at a Go client implementation we are using. That one actually special cases ERROR frames and closes the connection of its own accord. However, since the outcome remains the same, the situation wouldn’t be worse if the RabbitMQ STOMP plugin were to close the connection instead.

IIRC this is also what happens when a client attempts to subscribe to a non-existing exchange (via the /exchange/unicorn STOMP destination), so there is added consistency in closing the connection on cancellations, as well, even when another protocol might allow a client to recover.

michaelklishin commented 4 years ago

@eigengrau can you post what SUBSCRIBE frame was used in your test and whether the queue was pre-declared?

When you shut down a node that has a leader queue replica, the behavior of the queue depends on whether it is of a replicated type, durable or not, and so on. In some cases, RabbitMQ will re-register the consumer with a newly elected leader transparently, but it still might send a notification in that case (I'd have to try it myself to be sure). Closing STOMP client connections, in that case, would indeed be unnecessary and counter-productive.

So we'd like to have a reasonably complete step-by-step description of your test so that we can conclude what scenario exactly are we looking at. Spring's STOMP client may handle reconnection transparently but not every client would behave the same way.

eigengrau commented 4 years ago

Sure! We were subscribed to a gateway-managed queue, /queue/foo. Only x-message-ttl was set in the SUBSCRIBE headers, so, as expected, the plugin created a durable, non-replicated queue.

The issue we were hitting (stale subscriptions in the client) originally occurred when a node in a three-node cluster was shut down. When investigating this, we have also been seeing some crashes, which may have been caused by an overloaded machine. Additionally, we saw what looked like a crash in the context of a currently connected user being deleted by our Hashicorp Vault system. I’ll paste some logs below.

Since we found subscription cancellation messages in the client logs when analyzing this, we reproduced the issue in a controlled context, where we would just delete the client’s queue on a single instance RabbitMQ system. We used the node.js script below to create a subscription. After running the script, we delete one of the queues the client is subscribed to. We then observe that the client’s connection is left open and the client still receives messages for its remaining subscription.

Script

// npm install @stomp/stompjs ws
const stompjs = require("@stomp/stompjs");

// Websocket polyfill. Cf.
// <https://stomp-js.github.io/guide/stompjs/rx-stomp/ng2-stompjs/pollyfils-for-stompjs-v5.html>
Object.assign(global, {WebSocket: require("ws")});

function onConnect(frame) {
    console.log(`connected via STOMP: ${JSON.stringify(frame)}`);
    client.subscribe("/queue/deleteme", onMessage);
    client.subscribe("/queue/letmelive", onMessage);
}

function onDisconnect(frame) {
    console.log("disconnected");
}

function onError(frame) {
    console.log(`Broker reported error: ${frame.headers["message"]}
Additional details: ${frame.body}`);
}

function onMessage(msg) {
    console.log(msg.body);
    console.log(`${msg.body}
headers: ${JSON.stringify(msg.headers, undefined, 4)}`);
}

var serverURL = "ws://localhost:15674/ws";

var client = new stompjs.Client({
    brokerURL: serverURL,
    connectHeaders: {
        host: "fixme",
        login: "fixme",
        passcode: "fixme",
    }
});

client.onStompError = onError;
client.onConnect = onConnect;
client.OnDisconnect = onDisconnect;
client.activate();

Logs of unrelated(?) crash

I’m unsure what to make of these and whether they are relevant to this particular issue. We are using the Hashicorp Vault system to automatically create/delete RabbitMQ users. It looks like the underlying connection was closed, so this probably did not lead to a stale subscription in the client.

2020-07-13 09:17:53.400 [warning] <0.902.0> Connection (<0.902.0>) closing: received hard error {'connection.close',320,"user 'token-52bbcf84-eb20-8273-f8b1-6e1e0fb55055-redacted-28f60a77-169a-2bb4-94ee-6ae4d10251b4' is deleted",0,0} from server
2020-07-13 09:17:53.410 [warning] <0.899.0> STOMP connection redacted:58804 -> redacted:61613 terminated with reason {channel_died,{shutdown,{connection_closing,{server_initiated_close,320,"user 'token-52bbcf84-eb20-8273-f8b1-6e1e0fb55055-redacted-28f60a77-169a-2bb4-94ee-6ae4d10251b4' is deleted"}}}}, closing it
2020-07-13 09:17:53.442 [error] <0.1927.0> Error on AMQP connection <0.1927.0> (redacted:47508 -> redacted:5672, vhost: 'redacted', user: 'token-a91f01cb-f328-f252-de61-29d12ce6f44b-redacted-9b8ee49b-fab6-bb3c-85af-a8b80c048749', state: running), channel 0:
 operation none caused a connection exception connection_forced: "user 'token-a91f01cb-f328-f252-de61-29d12ce6f44b-redacted-9b8ee49b-fab6-bb3c-85af-a8b80c048749' is deleted"
2020-07-13 09:17:53.620 [error] <0.899.0> ** Generic server <0.899.0> terminating
** Last message in was {'EXIT',<0.908.0>,{shutdown,{connection_closing,{server_initiated_close,320,"user 'token-52bbcf84-eb20-8273-f8b1-6e1e0fb55055-redacted-28f60a77-169a-2bb4-94ee-6ae4d10251b4' is deleted"}}}}
** When Server state == {reader_state,#Port<0.49>,"redacted:58804 -> redacted:61613",{resume,#Fun<rabbit_stomp_frame.0.35630898>},{proc_state,"session-rtlQaGpu2ecPRnh0MOoNiw",<0.908.0>,<0.902.0>,#{<<"T_0">> => {subscription,"/queue/redacted-0",auto,false,"id='0'"}},"1.2",undefined,undefined,{stomp_configuration,<<"guest">>,<<"guest">>,false,false,false},{set,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[<<"redacted-0">>],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},#{},#Fun<rabbit_stomp_processor.5.43045224>,{amqp_adapter_info,{0,0,0,0,0,65535,44049,5},61613,{0,0,0,0,0,65535,2597,24324},58804,<<"redacted:58804 -> redacted:61613">>,{'STOMP',0},[{channels,1},{channel_max,1},{frame_max,0},{client_properties,[{<<"product">>,longstr,<<"STOMP client">>}]},{ssl,false}]},#Fun<rabbit_stomp_reader.3.52755567>,none,{0,0,0,0,0,65535,44049,5},true,stomp_headers,<<"token-52bbcf84-eb20-8273-f8b1-6e1e0fb55055-redacted-28f60a77-169a-2bb4-94ee-6ae4d10251b4">>,<<"amq.topic">>,true},running,false,true,{state,fine,5000,#Ref<0.3134612131.1120403458.33411>},undefined,<0.902.0>,<0.898.0>,{<0.912.0>,<0.913.0>},{10,10}}
** Reason for termination == 
** {channel_died,{shutdown,{connection_closing,{server_initiated_close,320,"user 'token-52bbcf84-eb20-8273-f8b1-6e1e0fb55055-redacted-28f60a77-169a-2bb4-94ee-6ae4d10251b4' is deleted"}}}}
2020-07-13 09:17:53.647 [error] <0.899.0> CRASH REPORT Process <0.899.0> with 0 neighbours exited with reason: {channel_died,{shutdown,{connection_closing,{server_initiated_close,320,"user 'token-52bbcf84-eb20-8273-f8b1-6e1e0fb55055-redacted-28f60a77-169a-2bb4-94ee-6ae4d10251b4' is deleted"}}}} in gen_server2:terminate/3 line 1183
2020-07-13 09:17:53.647 [error] <0.897.0> Supervisor {<0.897.0>,rabbit_stomp_client_sup} had child rabbit_stomp_reader started with rabbit_stomp_reader:start_link(<0.898.0>, {acceptor,{0,0,0,0,0,0,0,0},61613}, {stomp_configuration,<<"guest">>,<<"guest">>,false,false,false}) at <0.899.0> exit with reason {channel_died,{shutdown,{connection_closing,{server_initiated_close,320,"user 'token-52bbcf84-eb20-8273-f8b1-6e1e0fb55055-redacted-28f60a77-169a-2bb4-94ee-6ae4d10251b4' is deleted"}}}} in context child_terminated
2020-07-13 09:17:53.647 [error] <0.897.0> Supervisor {<0.897.0>,rabbit_stomp_client_sup} had child rabbit_stomp_reader started with rabbit_stomp_reader:start_link(<0.898.0>, {acceptor,{0,0,0,0,0,0,0,0},61613}, {stomp_configuration,<<"guest">>,<<"guest">>,false,false,false}) at <0.899.0> exit with reason reached_max_restart_intensity in context shutdown
michaelklishin commented 4 years ago

The log snippet at the bottom simply suggests a connection was force closed because its user was deleted. This plugin could handle it a bit nicer so that it's just a basic error log entry but this is expected: as of 3.7.0, deleting a user will force close all of her active connections regardless of the protocol.

@eigengrau I guess what I'm trying to understand first and foremost is whether in your case the client was connected to the node where the queue/destination was declared or another node. If you no longer have that information, that's fine, we will eventually try both scenarios.

eigengrau commented 4 years ago

I guess what I'm trying to understand first and foremost is whether in your case the client was connected to the node where the queue/destination was declared or another node.

When we originally ran into this (a Spring client talking to a three node cluster), can we infer that the client must have been connected to a node that was not the node that was holding the queue? When the transport goes away, the application would have reconnected and created new subscriptions (this worked reliably, so far). In this case, we observed no reconnection in the application logs and in the RabbitMQ management UI we could see the respective queue had no consumers, even though the application was still connected.

When we reproduced this using the script, we just had one instance running locally, so it must have been holding both the queue and the connection.

The log snippet at the bottom simply suggests a connection was force closed because its user was deleted.

Good to know, thanks! So I take it this was one just one of the Erlang processes crashing (“let it crash“ is what I still recall from my incredibly brief foray–“Seven languages in seven weeks”–into Erlang).

ikavgo commented 1 year ago

When server sees invalid command it sends ERROR frame but doesn't close connections.

ikavgo commented 1 year ago

It looks like when client has auto-reconnect feature (like stomp.py) not closing TCP connection after ERROR frame lets client reuse tcp connection with mangled state. Client tries to reconnect and here server process crashes:

2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>     errorContext: child_terminated
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>     reason: {function_clause,
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                 [{rabbit_stomp_processor,process_connect,
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                      [no_implicit,
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                       {stomp_frame,"STOMP",
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                           [{"passcode","guest"},
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                            {"login","guest"},
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                            {"accept-version","1.1"}],
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                           []},
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                       {proc_state,"session-4kjrlXD0KljbP5b7X2Gaag",
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                           <0.1469.0>,<0.1461.0>,#{},"1.1",undefined,undefined,
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                           {stomp_configuration,<<"guest">>,<<"guest">>,false,
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                               false,false},
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                           {set,0,16,16,8,80,48,
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                               {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                                []},
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                               {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
2023-07-20 18:39:40.151597+02:00 [error] <0.1456.0>                                 []}}},

because channel pid still saved inside state

michaelklishin commented 1 year ago

According to the STOMP spec:

The server MAY send ERROR frames if something goes wrong. In this case, it MUST then close the connection just after sending the ERROR frame.

Closing the connection won't allow stomp.py to reuse the TCP connection.

Then there is also this bit on connection lingering but I don't see anything that would conflict with the idea of simply closing the TCP socket after sending an ERROR frame.

Or was the above comment just to clarify what may be going on with some clients, @ikvmw?

ikavgo commented 1 year ago

yeah, it was an example of what we may see in the real world. essentialy +1 to closing after ERROR