php-mqtt / laravel-client

An MQTT client library for Laravel.
MIT License
189 stars 20 forks source link

Multiple subscriptions to the same queue #11

Closed santilorenzo closed 3 years ago

santilorenzo commented 3 years ago

I'm using RabbitMQ as broker and I wrote a Laravel command that runs this code:

class MyCommand extends Command
{
    protected $signature = 'mqtt:listen';

    private $mqtt;

    public function __construct()
    {
        parent::__construct();
        $this->mqtt = MQTT::connection();
    }

    public function handle()
    {
        $this->mqtt->subscribe('devices.+.write', function (string $topic, string $message) {
            // Actually do stuff with the message
        }, 2);
        $this->mqtt->loop(true);
    }
}

And I set up a supervisor configuration to run 4 instances of this command:

[program:mqtt-listen]
command = php /app/artisan mqtt:listen
process_name=%(program_name)s-%(process_num)s
numprocs = 4
startsecs = 0
autostart = true
autorestart = true

What happens is that I see 4 connections (and I suppose this is correct) Schermata 2021-03-01 alle 17 10 32 but I also see 4 different queues bond to the same exchange: Schermata 2021-03-01 alle 17 11 09

The result is that i receive 4 times the same message from 4 different queues.

What I want is to run 4 listeners to the same queue.

Is this possible and how?

Namoshek commented 3 years ago

I edited your question and added the info that you are using RabbitMQ as broker - quite a valuable info.

The reason you are seeing four client queues is because that's how QoS 1 works with the RabbitMQ MQTT plugin. Messages are published to the amq.topic exchange (at least by default) and the exchange then copies the message to all matching subscription queues. Because QoS 1 guarantees message delivery, the broker needs to keep track of the messages per client, which is achieved the easiest with dedicated delivery queues.

From an MQTT standpoint, what you are trying to do makes not much sense. MQTT is a publish-subscribe protocol with the primary goal that the publisher needs no knowledge about interested subscribers. And the subscribers need (and want) no knowledge about other subscribers. If you start delivering published messages to some subscribers only, then you are violating one of the fundamental principles of MQTT - at least in my opinion.


How to solve your issue?

Well, first you should clearify what your actual goal is:

Depending on your needs, there will be different solutions:

Please clearify your intents, then I might be able to help a bit more.


On a side note, the RabbitMQ MQTT plugin has a configuration for the exchange. I have no idea what it does, but I could imagine that changing it to amq.default does what you want, since the amq.default exchange is a round-robin exchange. This will change the behavior of your entire MQTT broker though and as explained above, this is nothing you'd expect from an MQTT broker!

santilorenzo commented 3 years ago

Thanks @Namoshek for the very thorough answer,I better understood the philosophy behind MQTT. The application I'm developing involves, among other features, the data collection of (potentially) thousands of IOT devices that periodically (every 5 seconds? idk) update their statuses through an MQTT message and listen for a new configuration input as well.

For this reason I'm designing the architecture with the microservices pattern and I'll deploy it on Kubernetes, supporting replication. Since a requirement of the software is to keep track of the past statuses of the devices (for analysis and stats), every update need to be stored on a database (which dbms is currently discussed), I thought it would have been a good idea to replicate the listener instances and share the load of picking jobs from the queue and store the data on the DB.

What I understood from what you said, is that MQTT is not designed for the consumer to be aware of each other, so I guess this isn't the best strategy to adopt.

I'm not sure if I'll be able to create different consumers for different topics because the IOT devices are likely to post the data on a topic like "devices.{mac_address}.write" and receive data on "devices.{mac_address}.read". In this case it would be very difficult to distribute the load on different consumers.

If you have any advice, you'll be very welcome. Thanks anyway for the support!

Namoshek commented 3 years ago

I understand. Ad-hoc, I can think of a few different solutions:

Anyway, whatever solution you find, I would be very interested in an update if you find a viable solution for your use case!

santilorenzo commented 3 years ago

After some deeper search, I found out that the latest version of the MQTT protocol (5.0), does infact support such a mechanism; they are called shared subscriptions: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250

Sadly, seems like RabbitMQ doesn't support this version of MQTT yet, but instead VernMQ does. Also sadly, the MQTT client that you are using in this library as a dependency, only supports version 3.1 of MQTT, hence it doesn't support shared subscriptions.

I ended up writing a simple node script (using the mqtt package: https://www.npmjs.com/package/mqtt ) that connects to a shared topic and writes data in a database table.

You can find more details in this blog post: https://www.hivemq.com/blog/mqtt-client-load-balancing-with-shared-subscriptions/

Namoshek commented 3 years ago

Thanks for the update!

I wasn't aware that MQTT 5 supports shared subscriptions, even though MQTT 5 support for the underlying php-mqtt/client is on my list already. From the description it doesn't really sound like a change to the protocol itself though, only to the server implementation. If I ever find the time (and motivation) to add support for MQTT 5, this wrapper library will receive support for it as well. Unfortunately, MQTT 5 is quite a change and will for sure take some time to implement.

As you've found a satisfying solution, I'll close this for now.

Namoshek commented 3 years ago

Because I find shared subscriptions an interesting feature, I played around a bit. And surprise, it even works with MQTT 3.1. :tada:

To be fair, it wasn't really surprising to me after learning about shared subscriptions, since the broker is doing all the work (distributing messages across groups). But due to a little detail, it took me some time to figure out how it can be achieved with this library. But first, here some very simple code:

$logger = new SimpleLogger(LogLevel::INFO);
$client = new MqttClient(MQTT_BROKER_HOST, MQTT_BROKER_PORT, null, MqttClient::MQTT_3_1, null, $logger);
$client->connect(null, true);

$client->subscribe('$share/test-01/foo/#', function (string $topic, string $message, bool $retained) use ($logger, $client) {
    $logger->info('We received a {typeOfMessage} on topic [{topic}]: {message}', [
        'topic' => $topic,
        'message' => $message,
        'typeOfMessage' => $retained ? 'retained message' : 'message',
    ]);

    // After receiving the first message on the subscribed topic, we want the client to stop listening for messages.
    $client->interrupt();
}, MqttClient::QOS_AT_MOST_ONCE);

$client->loop(true);
$client->disconnect();

One would expect to see some log output when publishing messages to any topic starting with foo/. And, in fact, we are receiving messages. But due to the way how the client processes incoming messages from the broker (matching their topics to subscriptions), we don't see our subscription callback being called.

The underlying issue for that is the $share/test-01 part of the topic. The client is comparing the received topic name to all registered subscriptions. And there is the problem already: a shared subscription has the form $share/<group>/<topic>, while published messages will only include <topic> as topic. Therefore they of course will never match, since this feature / case has not been implemented in the topic matcher yet.

But luckily, there are event handlers for all of the important features - one of them being the message received event handler. We can utilize such a handler to process the received messages:

$logger = new SimpleLogger(LogLevel::INFO);
$client = new MqttClient(MQTT_BROKER_HOST, MQTT_BROKER_PORT, null, MqttClient::MQTT_3_1, null, $logger);
$client->connect(null, true);

$handler = function (MqttClient $client, string $topic, string $message, int $qualityOfService, bool $retained) use ($logger) {
    $logger->info('Received message on topic [{topic}] with QoS {qos}: {message}', [
        'topic' => $topic,
        'message' => $message,
        'qos' => $qualityOfService,
    ]);
};
$client->registerMessageReceivedEventHandler($handler);

$client->subscribe('$share/test-01/foo/#', function (string $topic, string $message, bool $retained) use ($logger, $client) {
    // This callback is basically useless and will never be called.
}, MqttClient::QOS_AT_MOST_ONCE);

$client->loop(true);
$client->disconnect();

If multiple shared subscriptions with different topics were needed, we could either spawn multiple workers or implement custom matching logic for the topics in the event handler callback.

Note: I tested this with Mosquitto v1.6.12.


TL;DR: use a message received event handler to retrieve messages from shared subscriptions.

Namoshek commented 3 years ago

Because it seems to be a common theme that shared subscriptions are supported by brokers even for clients using MQTT 3.1, I went ahead and implemented it in v1.1.0 of php-mqtt/client on which this package depends. A simple composer update should do the trick.

santilorenzo commented 3 years ago

Thank you @Namoshek ! I'll try this out as soon as I can :)