mgdm / Mosquitto-PHP

A wrapper for the Eclipse Mosquitto™ MQTT client library for PHP.
BSD 3-Clause "New" or "Revised" License
535 stars 147 forks source link

Missing messages when not using loopForever() #85

Open rosshettel opened 6 years ago

rosshettel commented 6 years ago

Been using this library to listen for MQTT messages in a constant running daemon process. However, I can't use loopForever() as the daemon needs to call other methods during each cycle. I call loop() about 20 times every 5 seconds. However, during the time loop() is not being called, we miss MQTT messages coming into the client.

Our client is setup with QoS 2 and a persistent session. If I use loopForever() I can see that I don't miss any messages. Is there something I'm missing with the loop() method? I'm not sure why it's missing messages.

davidg246819 commented 6 years ago

I can confirm this is happening. Even when calling loop() every few usecs with 0 or 1 timeout I will miss messages when under heavy load (> 3k or 4k publishes per sec). When calling loopForever() all messages are published and consumed properly.

mgdm commented 6 years ago

I'll see if I can replicate this behaviour. I've not had a lot of time to work on this recently, though hopefully I can fix that soon.

hstanda commented 5 years ago

Hi @mgdm,

Thanks for the extension.

I have also noticed this behaviour if we publish messages back to back in about 100 in 1 sec. To reproduce, have a topic published in the loop of around 300 iterations.

So I switched to loopForever, but for some reason, it never exits if I give it a timeout value.

A code snippet is here: try { $cfg = $this->config->item('mqtt'); $mid = 0; $client = new Mosquitto\Client("web_api_client"); $client->setCredentials($cfg['user'], $cfg['password']); $client->onConnect(function() use ($client, &$mid, $toTopic, $toPayload) { $mid = $client->publish($toTopic, $toPayload, 1); } ); $client->onPublish(function() use ($client,$fromTopic) { $client->subscribe($fromTopic, 1); }); $client->onMessage( function($response) use ($client, &$responseTopic, &$responseMessage, &$responseReturn) { $responseTopic = $response->topic; $responseMessage = $response->payload; $client->exitLoop(); $client->disconnect(); }); $client->connect($cfg['host'], 1883, 5); $client->loopForever(100); $responseReturn = array('topic'=> $responseTopic, 'message' => $responseMessage); } catch (Exception $e) { $response = 'Exception: ' .$e->getMessage(); $responseReturn = false; }

Please let me know if I am doing something wrong.

Thanks for the help in advance.

mgdm commented 5 years ago

Hi, Apologies for the late response, I've been on holiday. The timeout you give in loopForever() means each trip around the loop is going to take up to that period of time, I wonder if that could explain it being slow. I'll see if I can reproduce this myself and try and instrument it. The messages that come in should be being buffered, but it may take several loop() calls to process one message depending on the QoS value that was set and a few other variables. Do you see some messages coming in, then some missing, and later ones coming in?

hstanda commented 5 years ago

@mgdm Thanks for replying I need to set a timeout for loopForever().

Goal: disconnect from the Broker if the given time is passed or the subscribed topic received.

I have snippet above, it will be very helpful if you can guide me to achieve the goal.