eclipse-mosquitto / mosquitto

Eclipse Mosquitto - An open source MQTT broker
https://mosquitto.org
Other
9.08k stars 2.4k forks source link

Mosquitto Bridge not respecting max_inflight_messages #2301

Open canique opened 3 years ago

canique commented 3 years ago

Mosquitto Version: 2.0.11

When using MQTTv50 bridge protocol, Mosquitto is sending a bunch of retained messages in quick succession to the remote broker. In my case more than 60msgs are sent in way under 1 second.

The remote broker then closes the connection because of "too many concurrent publish messages" (default limit on the remote broker is 10).

Is there any way to rate-limit publish messages? max_inflight_messages is set to 1 without effect. When using MQTTv31 bridge protocol, max_inflight_messages seems to be respected, though.

canique commented 3 years ago

After some investigation, I need to clarify: With MQTTv5 Mosquitto gets the allowed max. message quota directly from the remote broker. So that's why the max_inflight_messages setting stays without effect.

But: Still the remote message quota is exceeded. So there is a bug somewhere.

P.S.: The bug happens irrespective of the MQTT protocol version. The only edge case where I think it's not happening is if max_inflight_messages it set to 1. By chance I had it set to that value, that's why I thought MQTTv31 is not affected.

canique commented 3 years ago

I think the bug is in db__message_delete_outgoing() in database.c

1st bug: After the first DL_FOREACH_SAFE() loop it can happen that you have received an MID which does not map to any of the inflight outbound messages. This special case needs to be treated specifically. My proposal (immediately after the first DL_FOREACH_SAFE()): if(context->msgs_out.inflight_maximum != 0 && !foundMid){ util__decrement_send_quota(context); return MOSQ_ERR_NOT_FOUND; } "foundMid" needs to be set in the first DL_FOREACH_SAFE loop to true, if the mid comparison equals to true. "foundMid" needs to be initialized with false.

I'm decrementing the quota here because it has earlier been incremented in handle_pubackcomp.c for an invalid MID. After checking the MID and recognizing it as invalid I'm reverting the earlier incrementation.

Whether the premature return is appropriate or not, I can't tell. The persistent_changes variable might need to be updated.

canique commented 3 years ago

2nd bug:

In the second DL_FOREACH_SAFE() loop there is no check for the send quota. The loop is dequeuing everything it can find starting with the message after the one with the MID that a match was found for. After the first if check within the loop, there should be a second if check. if(context->msgs_out.inflight_maximum != 0 && context->msgs_out.inflight_quota == 0){ //break if no quota left break; }

Otherwise I've seen it happening that the loop did an iteration although inflight_quota was already 0.

canique commented 3 years ago

3rd bug (bridge mode)

If the connection to the remote broker drops... 1) before reconnecting: db__message_reconnect_reset_outgoing() will get called and reculculate the inflight quota counting the messages that are in flight. It will first set the quota to maximum, and then decrement step by step. 2) after reconnecting - when the CONN_ACK packet is received - the RECEIVE_MAXIMUM property will be read, the new inflight maximum will be set, and - this is the critical point - the inflight quota will be set to maximum without looking at which messages are in flight at all. So the inflight quota will be at max. although there are e.g. outbound msgs in flight that have not been successfully sent yet. Hence you can end up exceeding the quota.

Here is a proposal for handle__connack() although it is not perfect. Consider the edge case where the remote broker restarts with a reduced RECEIVE_MAXIMUM value. If you have msgs in flight that exceed this value, and the remote broker closes the connection upon exceeding the limit, you'll end up in an endless connection loop. A possible approach would be to re-loop though the inflight messages and put them back in the queue if the number of the messages is higher than the new receive maximum.

uint16_t new_inflight_maximum = 0;
mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM,
        &new_inflight_maximum, false);

if (new_inflight_maximum > context->msgs_out.inflight_maximum){
    uint16_t inflight_maximum_diff = new_inflight_maximum - context->msgs_out.inflight_maximum;
    log__printf(NULL, MOSQ_LOG_DEBUG, "CONNACK: Increasing quota from %d/%d to %d/%d.", context->msgs_out.inflight_quota, context->msgs_out.inflight_maximum, context->msgs_out.inflight_quota + inflight_maximum_diff, new_inflight_maximum);
    context->msgs_out.inflight_quota += inflight_maximum_diff;
} else if (new_inflight_maximum < context->msgs_out.inflight_maximum){
    uint16_t inflight_maximum_diff = context->msgs_out.inflight_maximum - new_inflight_maximum;
    uint16_t inflight_quota_reduced = (context->msgs_out.inflight_quota < inflight_maximum_diff) ? 0 : (context->msgs_out.inflight_quota - inflight_maximum_diff);
    log__printf(NULL, MOSQ_LOG_DEBUG, "CONNACK: Decreasing quota from %d/%d to %d/%d.", context->msgs_out.inflight_quota, context->msgs_out.inflight_maximum, inflight_quota_reduced, new_inflight_maximum);
    context->msgs_out.inflight_quota = inflight_quota_reduced;
}

context->msgs_out.inflight_maximum = new_inflight_maximum;
//if we do this we break the quota - since we're not considering what's already in flight
//context->msgs_out.inflight_quota = context->msgs_out.inflight_maximum;

If there are cases where db__message_reconnect_reset_outgoing() is not called before the CONN_ACK, then my solution won't fit.

entirosadmin commented 3 years ago

We also have the same problem with inflight messages set to 1 We have confirmed the bug when we are stopping the central broker (in this case a HiveMQ broke), and the mosquitto is building up a queue. When HiveMQ is up again, mosquitto seems to "forget" the inflight=1 settings and pushes as much as it can against HiveMQ (which in this case disconnects the client as it has max 100 as incoming limit)

GalletFlorian commented 1 month ago

Hello @canique

We have the same issue in the 2.0.18 version. To your knowledge, to do you think that eclipse corrected this bug ?

Best,

Florian

canique commented 1 month ago

Hi @GalletFlorian I think if it was fixed, this bug wouldn't still be open :)

GalletFlorian commented 1 month ago

Hi @GalletFlorian I think if it was fixed, this bug wouldn't still be open :)

Thanks for the fast reply !

Since it's a 2021 bug with a proposed solution I though it was, somehow, included in the newer (than 2.0.11) versions...