php-mqtt / client

An MQTT client written in and for PHP.
MIT License
373 stars 72 forks source link

Receiving only one message with subscribe and interrupt if max delay is reached #43

Closed ArnaudLier closed 2 years ago

ArnaudLier commented 3 years ago

Hello! I need to publish a message to /in and receive it in /out. I tried this:

    $mqtt = new MQTTClient('mqtt.mydomain.com', 1883, uniqid());
        $mqtt->connect('MyUsername', 'MyPassword');
        $response = null;
    $mqtt->subscribe($peripheral->hostname . '/out', function (string $topic, string $message) {
            $response = json_decode($message['message'], true);
            $mqtt->interrupt();
        });
        $mqtt->publish($peripheral->hostname . '/in', '$');
        sleep(2000);
        $mqtt->interrupt();
    $mqtt->close();

But it seems I'm doing something wrong. Can you help me? Thanks!

Namoshek commented 3 years ago

Not sure I understand. Are you trying to write a script which receives a message on /in and forwards it to /out? Because what you wrote, publishing something on /in and expecting to receive it on /out, is not how MQTT works. Its like calling telephone number A and expecting someone with telephone number B to receive the call. It's just not going to happen.

It would be helpful to see the whole script though. You can of course anonymize all data, as long as important ideas are not covered. For example is $peripheral->hostname unknown to me at the moment and it could also be a source of error.

ArnaudLier commented 3 years ago

No, I want to send a command to /in and receive the response in /out, I can't change how it works and I did it with another PHP MQTT Library but I would like to change to this one cause it's more up-to-date.

$peripheral->hostname is a unique identifier for a service which isn't null, the request is sent correctly.

Namoshek commented 3 years ago

Ok, got it. I think your only mistake is not calling $mqtt->loop() after you publish your message:

$mqtt->subscribe($peripheral->hostname . '/out', function (string $topic, string $message) {
    $response = json_decode($message['message'], true);

    // Escape the $mqtt->loop() by setting an exit flag which is picked up on the next iteration.
    $mqtt->interrupt();
});

$mqtt->publish($peripheral->hostname . '/in', '$');

// Wait for messages on subscribed topics until $mqtt->interrupt() is called.
$mqtt->loop();

// Close the connection to the MQTT broker.
$mqtt->close();

It is important to understand that the callback passed to subscribe() is executed within the $mqtt->loop() method.

ArnaudLier commented 3 years ago

Yeah but if no message is received (the peripheral is offline), it will hang forever, isn't it?

Namoshek commented 3 years ago

Yes, the loop runs until you terminate it (however this happens). If you want to set a timeout of, let's say, 10 seconds, you have at least the following two options:

1) Use pcntl_signal with pcntl_alarm (requires ext-pcntl):

// Register a signal handler for the SIGALRM signal, which indicates a timeout.
pcntl_signal(SIGALRM, function () use ($mqtt) {
    $mqtt->interrupt();
});

// Register an alarm to be sent after 10 seconds.
pcntl_alarm(10);

// ... subscribe, publish, loop and close here ...

2) Use a loop event handler:

// Register a hook which is called once per loop and which interrupts the loop after some time.
$mqtt->registerLoopEventHandler(function (MqttClient $mqtt, float $elapsedTime) {
    if ($elapsedTime >= 10) {
        $mqtt->interrupt();
    }
});

// ... subscribe, publish, loop and close here ...

You should also check out how the hook callback is called: https://github.com/php-mqtt/client/blob/00f8ffbe4693c6b4d418a0d1a9307420e5571d10/src/MQTTClient.php#L779-L793

The important bit is that the loop will run once more after you interrupt it in the loop event handler.

ArnaudLier commented 3 years ago

Okay, thank you for your precious help!

ArnaudLier commented 3 years ago

Even if I send a message myself with an MQTT Client, subscribing doesn't seems to work:

        $mqtt = new MQTTClient('mqtt.mydomain.fr', 1883, uniqid());
        $mqtt->connect('MyUsername', 'MyPassword');
        $response = null;

        /**$mqtt->registerLoopEventHandler(function (MqttClient $mqtt, float $elapsedTime) {
            if ($elapsedTime >= 5) {
                $mqtt->interrupt();
            }
        });**/

        $mqtt->subscribe('hello', function (string $topic, string $message) use ($response) {
            return 'hey';
            $response = json_decode($message, true);

            // Escape the $mqtt->loop() by setting an exit flag which is picked up on the next iteration.
            $mqtt->interrupt();
        });

        $mqtt->publish('hello', '$');

        // Wait for messages on subscribed topics until $mqtt->interrupt() is called.
        $mqtt->loop(true);

        // Close the connection to the MQTT broker.
        $mqtt->close();
Namoshek commented 3 years ago

I cannot reproduce your issue. It might be an issue with your broker (it probably does not send messages back to the client which published it in the first place, even if subscribed).

The following is a minimal example which does exactly what you described. It can be run within the php-mqtt/client-examples repository for example (after installing composer dependencies). Although it really only needs php-mqtt/client as dependency installed in the vendor directory.

<?php

require __DIR__ . '/vendor/autoload.php';

use PhpMqtt\Client\MQTTClient;

try {
    $mqtt = new MQTTClient("test.mosquitto.org", 1883, uniqid());
    $mqtt->connect();

    $mqtt->subscribe('php-mqtt/client/test', function ($topic, $message) use ($mqtt) {
        echo sprintf("Received message on topic [%s]: %s\n", $topic, $message);

        $mqtt->interrupt();
    }, MQTTClient::QOS_AT_MOST_ONCE);

    $mqtt->publish("php-mqtt/client/test", 'This is only a test message.', MQTTClient::QOS_AT_MOST_ONCE);

    $mqtt->loop();
    $mqtt->close();
} catch (\Throwable $e) {
    echo 'An exception occured: ' . $e->getMessage() . PHP_EOL;
}
Namoshek commented 3 years ago

Closed due to inactivity. Issue seems resolved.

abydahana commented 2 years ago

It might be an issue with your broker (it probably does not send messages back to the client which published it in the first place, even if subscribed).

Hello @Namoshek, I got same case with this. The command is successfully sent to broker and the device is triggered but the response message is not received. I use mosquitto on Centos 7, does there a configuration in case it not send message to the client?

Thank you.

Namoshek commented 2 years ago

@abydahana This is nothing I can answer without seeing the problematic code. Normally, there is no special configuration for Mosquitto required for it to work with this library.

abydahana commented 2 years ago

@abydahana This is nothing I can answer without seeing the problematic code. Normally, there is no special configuration for Mosquitto required for it to work with this library.

This is my code:

// snippet using CodeIgniter framework (version 4.1.4)

public function index()
{
  $device   = service('uri')->setSilent()->getSegment(2); // tasmota_5A7EF6
  $client   = new \PhpMqtt\Client\MqttClient('localhost', 1883, $device);

  $client->connect(null, true);

  $client->subscribe('mqtt/' . $device . '/tele/STATE', function ($topic, $message) use ($client)
  {
    echo sprintf("Received message on topic [%s]: %s\n", $topic, $message);

    $client->interrupt();

  }, $client::QOS_EXACTLY_ONCE);

  $client->registerLoopEventHandler(function($client, $elapsedTime)
  {
    if($elapsedTime > 3)
    {
      $client->interrupt();
    }
  });

  $client->publish('mqtt/' . $device . '/cmnd/Power', 'TOGGLE', $client::QOS_EXACTLY_ONCE);

  $client->loop(true);

  $client->disconnect();
}

I expecting to be like this:

  1. Toggle button clicked which it will trigger the power toggle command (publish);
  2. Response will be sent to client API (not related to MQTT).

The response should be like this (I use Tasmota):

{"Time":"2021-10-28T20:57:01","Uptime":"0T07:45:23","UptimeSec":27923,"Heap":27,"SleepMode":"Dynamic","Sleep":50,"LoadAvg":19,"MqttCount":7,"POWER":"ON","Wifi":{"AP":1,"SSId":"OUTLETPINTAR.COM","BSSId":"A0:AB:1B:E4:11:59","Channel":3,"Mode":"11n","RSSI":100,"Signal":-50,"LinkCount":1,"Downtime":"0T00:00:05"}}
abydahana commented 2 years ago

Oh sorry. I don't know if the procedure both subscribe and publish must use the same topic. The response is appear now. I will deep dive to find how to get the device responses.

Thank you.

abydahana commented 2 years ago

Hello @Namoshek, does this library can able to fetch the device response API like a response format I mentioned above? Or it depends on Mosquitto log response?

Namoshek commented 2 years ago

The format of the data published via MQTT is independent of the client used to publish/subscribe. So the answer is no, this library cannot change the format of the data.

abydahana commented 2 years ago

The format of the data published via MQTT is independent of the client used to publish/subscribe. So the answer is no, this library cannot change the format of the data.

Okay, got it. Thank you.