eclipse / paho.mqtt.cpp

Other
1.02k stars 437 forks source link

Ability to set message_arrived callback per topic #202

Open flounderpinto opened 5 years ago

flounderpinto commented 5 years ago

Looking at this as an example: https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/samples/async_subscribe.cpp Is there no way to set the message_arrived() callback per topic? I can only find a way to set the callback at the client level: client.set_callback(cb); and not at the topic level: cli_.subscribe(TOPIC, QOS, nullptr, subListener_); The issue is that all messages are dumped into the message_arrived() method, and it's non-trivial to sort out which message is from which topic (due to topic wildcards). What am I missing? How it this typically handled?

fpagliughi commented 5 years ago

It's funny. I've been thinking to request that we add this to all of the Paho client libraries for the different languages! It would be very handy.

But to answer your question... no. There is no per-subscription callback mechanism. It's exactly what you see. The client sends one or more subscription requests to the broker/server, and the server sends any messages over that match any of those subscriptions. When your client receives a message, it needs to decide what to do with it, but you're right; it's complicated by wildcards and overlapping subscriptions.

MQTT v5 tries to deal with this somewhat. The subscribe message can include a "subscription identifier", and the server will include one or more of them with a message that it sends back to the client. See the standard for more info. So it may be easier to deal with this in the new code that's coming out.

In my applications, if I expect different messages coming back at a high rate, I will sometimes open multiple clients, each with different subscriptions. That way I can have separate client callbacks for each set of subscriptions. But if they're not coming too fast, I just map them myself.

flounderpinto commented 5 years ago

Thank you for the detailed response. That certainly helps clarify things.

I ended up working around this by (more or less) using the synchronous mqtt::client class, creating a separate thread for every topic, and blocking on try_consume_message_for().

I had the same multiple client thought, but I really have very few messages. The messages are just orthogonal to each other, so I didn't want to couple the code.

If it helps any I could write up the request to add this.

fpagliughi commented 3 years ago

BWT, just added an example on how to do this with the MQTT v5 subscription identifiers. https://github.com/eclipse/paho.mqtt.cpp/blob/develop/src/samples/sync_consume_v5.cpp

fpagliughi commented 2 years ago

Still thinking about this years later!

I recently stumbled on the MQTTMatcher class in the Paho Python library: https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/matcher.py

This uses a prefix tree (trie) to keep a collection of topic filters, each with an associated value.

You can use it to iterate through all the items that match a specific topic. So this can be used to keep a collection of callback functions for all of the subscribed topic filters in a client. When a message arrives, you can go through and dispatch to all the callbacks that match the topic of the received message.

I implemented a topic_matcher class to do this in C++. It's currently in the develop branch but will be promoted soon to master and be part of the next release. https://github.com/eclipse/paho.mqtt.cpp/blob/develop/src/mqtt/topic_matcher.h

It's a bit like astd::map in that you give it a bunch of key/value pairs where the keys are topic filters (which can contain wildcards), and the values are whatever you want. It's a template, generic to type. You can get an exact match using find() or an iterator over all matches using matches(topic).

There are some hints for usage in the unit tests: https://github.com/eclipse/paho.mqtt.cpp/blob/develop/test/unit/test_topic_matcher.cpp

I'll add an example app that implements callbacks per topic.

Then I'll need to decide whether to build the capability of callback-per-topic into the client or leave it to the user, especially since the MQTT v5 way to do this with topic subscriptions would be more efficient, but only works with v5 and requires the use of subscriptions on the server.

fpagliughi commented 4 months ago

This is probably still not going to make it into a release ...yet.

But the upcoming v1.4 will have a fixed and optimized topic_matcher trie collection that is testing well to the spec, and has an iterator implementation that doesn't look like it was coded by drunk monkeys.

So coding this manually, now, should be easy.

Let's hope to get it into v1.5.