php-mqtt / client

An MQTT client written in and for PHP.
MIT License
372 stars 72 forks source link

Skipping received messages #73

Closed danilopinotti closed 3 years ago

danilopinotti commented 3 years ago

Hello!

First of all, Thanks to write this great lib for PHP!

Well, today I was developing a solution with this library and I found a possible bug.

I made two processes:

  1. Publish random data with passing QoS
  2. Subscribe to the topic that first process is publishing

Both with QoS 2.

The publishing process sends about 1000 messages/second The read process starts reading all messages, but, after a few seconds, begins to skip some data.

I printed some of these cases. The process that publishes: image This process publishes all the values (6128, 6129, 6130, ...)

The process that subscribes: image This process receives "6128" and then skipped to "6264"

PS: I send the messages count as 'message'. PS2: In the "publish" script, I call the 'loop' method each 800 messages. PS3: I used another software (MQTT Explorer) to listens to this same topic, and it receives all the data without skipping anyone.

Namoshek commented 3 years ago

Is logging the only thing your subscriber does?

danilopinotti commented 3 years ago

@Namoshek

Is logging the only thing your subscriber does?

Yes. I tried some scenarios and all of them skips. One thing I noticed that the first data is reading well, but after thousand data, begins to skip.

I generated a file logging the output of the "listen" process: https://pastebin.com/raw/cYD8c0Lu

In this example, the "Publish" script published 12436 messages performing 2275 messages/second

UPDATE I noticed that 'alllowSleep', when true, may cause this situation. Another thing I noticed is that when subscribe callback has a more slow task, this may occur too.

When I turned off allowSleep flag, the situation doesn't occur, but when I put the full code inside the callback, the situation occurs. The "full code" consists exactly in enqueue a job that handles the data received from the broker. I'm using Laravel 8 with this current lib (not laravel-client).

Namoshek commented 3 years ago

Well, I pretty much had an explanation ready already, but nevertheless I tested it myself and can of course confirm the behavior. It is not an issue of the library though, as far as I can tell. My argument for that statement is the following line from the Mosquitto log which I used for my test:

1615919721: Outgoing messages are being dropped for client test-subscriber.

What I think is happening: the delivery queue of your subscriber is overflowing. Most MQTT brokers (if not all) use a queue size limit for delivery queues. When a message is published to a broker, it gets copied to the delivery queue of all subscribers with a matching topic. In case of QoS 0, the delivery queue can be emptied quickly through fire-and-forget. With QoS 1 and 2, due to the acknowledgement and confirmation messages, the delivery queue has to hold already sent messages until their delivery is confirmed. If too many messages are published to the delivery queue too quickly and the subscriber is not able to work through the messages, the broker will drop messages before even putting them on the delivery queue.

Just from the publisher perspective, it seems that this shouldn't cause an issue since the subscriber is acknowledging all of its published QoS 2 messages after 800 messages (through $client->loop(true, true)). Well, that's only the subscriber queue though. The delivery queue is handled asynchronously and if publishing is faster for some reason, it can still cause an overload.

For publishing, I used the following code (based on your details):

for ($i = 0; $i < 80000; $i++) {
    $client->publish('foo/bar/baz', 'counting... ' . $i, MqttClient::QOS_EXACTLY_ONCE);

    if ($i % 800 === 0) {
        $client->loop(true, true);
    }
}

My suggestion would be to either figure out a way to reduce your message frequency or to tune your broker settings. Mosquitto for example has two interesting settings regarding this issue:

Please be aware that tuning these settings can have significant impact on system stability and memory utilization of your broker, depending on the number of clients and message frequency.

If your subscriber is too slow to process messages, you may also throw received messages on a local queue (e.g. RabbitMQ or Redis) and process them with (multiple) different processes.


As a side note: from the client perspective, there is now way messages can be dropped (as long as the operating system is not dropping traffic from the socket, which would most likely cause data corruption though). The reason for that is sequential processing. Available and buffered bytes are read from the socket, then the bytes are parsed until a full message is found and then the message is processed. It is a continuous stream of messages.

danilopinotti commented 3 years ago

Makes sense, once when I limiting the rate to 50ms to send the data, this error doesn't occur. This situation occurs in local env, I think in production will take a long time to reach this "bottleneck". Well, I'll think in a way that scales my application or tuning the broker (VerneMQ) when was the time!

Thank you for the attention and the accurated answer!