emqx / emqx

The most scalable open-source MQTT broker for IoT, IIoT, and connected vehicles
https://www.emqx.com/
Other
13.84k stars 2.21k forks source link

The message queue size may exceed the maximum limit after setting topic priority #13409

Open tigercl opened 2 months ago

tigercl commented 2 months ago

What happened?

The corresponding code in EMQX is as follows:

-spec in(message(), mqueue()) -> {option(message()), mqueue()}.
in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
    {_Dropped = Msg, MQ};
in(
    Msg = #message{topic = Topic},
    MQ =
        #mqueue{
            default_p = Dp,
            p_table = PTab,
            q = Q,
            len = Len,
            max_len = MaxLen,
            dropped = Dropped
        } = MQ
) ->
    Priority = get_priority(Topic, PTab, Dp),
    PLen = ?PQUEUE:plen(Priority, Q),
    Msg1 = with_ts(Msg),
    case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
        true ->
            %% reached max length, drop the oldest message
            {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
            Q2 = ?PQUEUE:in(Msg1, Priority, Q1),
            {without_ts(DroppedMsg), MQ#mqueue{q = Q2, dropped = Dropped + 1}};
        false ->
            {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg1, Priority, Q)}}
    end.

PLen is the length of a priority queue. The following code uses PLen and MaxLen to compare, which may cause the actual message queue length to exceed the maximum queue length I set.

For example, if I set priorities for 10 topics and set the maximum message queue length to 1000, the actual maximum message queue length will be 10 * 1000 = 10000.

Below is my test result, showing that the current queue length 2 has exceeded the maximum queue length 1:

image-20240704141634027

And here is http response returned by EMQX:

image-20240704141443943

What did you expect to happen?

The total length of all priority queues should not exceed the maximum queue length.

How can we reproduce it (as minimally and precisely as possible)?

No response

Anything else we need to know?

No response

EMQX version

EMQX 5.7.1

OS version

```console # On Linux: $ cat /etc/os-release # paste output here $ uname -a # paste output here ```

Log files

ieQu1 commented 2 months ago

It's not hard to fix, but after reading the code, it looks like this was designed like this deliberately.

There are two possibilities how to fix it:

What is the best solution @tigercl ?

tigercl commented 1 month ago

@ieQu1 I think the latter is better, but this mechanism should already exist, it's just that the message queue will reach the maximum limit later than the user expects.

I'm just worried that if the actual effective value is much larger than the value set by the user, it may cause OOM.