dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.51k stars 1.07k forks source link

[Server] Retained messages store optimized for database #1660

Open Int32Overflow opened 1 year ago

Int32Overflow commented 1 year ago

Describe the feature request

Currently, all retained messages are kept in the working memory, which can quickly become a problem with larger quantities. It would be a good idea to store these in a SQL database or in Redis, for example. However, the "interfaces" are currently very limited in order to implement such a concept.

Which project is your feature request related to?

Describe the solution you'd like

In the method "FilterRetainedApplicationMessages" the retained messages are no longer handled in a for loop. The topic is now passed to a method in the new interface "IMqttClientSubscriptionsManager", which can return several retained messages as a result. This interface can of course be overwritten.

https://github.com/dotnet/MQTTnet/blob/2aa4219838a67e336ae4f5902d956c455bc48c11/Source/MQTTnet/Server/Internal/MqttClientSubscriptionsManager.cs#L401-L453

chkr1011 commented 1 year ago

I am wondering how to get the matching retained messages from the database. If the topic contains no wildcards ist might be working (or some usages of #). But how to get the proper set of messages from the database if the "+" wildcard is used?

Int32Overflow commented 1 year ago

For example cache only topics in ram and load payload and other stuff on match from database.

Int32Overflow commented 1 year ago

Perhaps it is also possible to split the topics into individual segments and then use them like a tree path. PostgreSQL can map trees for example...

logicaloud commented 1 year ago

How the retained message is looked up should probably remain an extension because different storage providers will have different capabilities. For SQL, the LIKE condition would probably work, with wildcard topic characters becoming % in the query. I.e. for topic house/1/room/+/temperature the query would be something like select * from retainedmessages where topic like 'house/1/room/%/temperature'. Case sensitivity and special characters (underscore) would need to be taken care of.

Int32Overflow commented 1 year ago

@chkr1011 Can you give me an update?

I was able to successfully save and search the Mqtt topics in a Postgresql table. Postgresql supports a very fast way to search for topics that match a certain topic pattern with the help of index.

-- https://www.postgresql.org/docs/current/ltree.html

CREATE EXTENSION  ltree;

CREATE TABLE IF NOT EXISTS test (path ltree not null unique, message bytea);
CREATE INDEX path_gist_idx ON test USING GIST (path gist_ltree_ops(siglen=1000));
CREATE INDEX path_idx ON test USING BTREE (path);

INSERT INTO test VALUES ('Root1');
INSERT INTO test VALUES ('Root1.Child1.Child1.Child1');
INSERT INTO test VALUES ('Root1.Child1.Child1.Child1.A');
INSERT INTO test VALUES ('Root1.Child1.Child1.Child1.B');
INSERT INTO test VALUES ('Root1.Child1.Child1.Child2');
INSERT INTO test VALUES ('Root1.Child1.Child1.Child3');
INSERT INTO test VALUES ('Root1.Child1.Child1.Child3.A');
INSERT INTO test VALUES ('Root1.Child1.Child1.Child4');

INSERT INTO test VALUES ('Root1.Child1.Child2.Child1');
INSERT INTO test VALUES ('Root1.Child1.Child2.Child2');
INSERT INTO test VALUES ('Root1.Child1.Child2.Child3');
INSERT INTO test VALUES ('Root1.Child1.Child2.Child4');

INSERT INTO test VALUES ('Root1.Child1.Child3.Child1');
INSERT INTO test VALUES ('Root1.Child1.Child3.Child2');
INSERT INTO test VALUES ('Root1.Child1.Child3.Child3');
INSERT INTO test VALUES ('Root1.Child1.Child3.Child4');

INSERT INTO test VALUES ('Root1.Child1.Child4.Child1');
INSERT INTO test VALUES ('Root1.Child1.Child4.Child2');
INSERT INTO test VALUES ('Root1.Child1.Child4.Child3');
INSERT INTO test VALUES ('Root1.Child1.Child4.Child4');

INSERT INTO test VALUES ('Root1.Child1.Child5.Child1');
INSERT INTO test VALUES ('Root1.Child1.Child5.Child2');
INSERT INTO test VALUES ('Root1.Child1.Child5.Child3');
INSERT INTO test VALUES ('Root1.Child1.Child5.Child4');

INSERT INTO test VALUES ('Root1.Child1.Child6.Child1');
INSERT INTO test VALUES ('Root1.Child1.Child6.Child2');
INSERT INTO test VALUES ('Root1.Child1.Child6.Child3');
INSERT INTO test VALUES ('Root1.Child1.Child6.Child4');
-- Root1.Child2
INSERT INTO test VALUES ('Root1.Child2.Child1.Child1');
INSERT INTO test VALUES ('Root1.Child2.Child1.Child2');
INSERT INTO test VALUES ('Root1.Child2.Child1.Child3');
INSERT INTO test VALUES ('Root1.Child2.Child1.Child4');

INSERT INTO test VALUES ('Root1.Child2.Child2.Child1');
INSERT INTO test VALUES ('Root1.Child2.Child2.Child2');
INSERT INTO test VALUES ('Root1.Child2.Child2.Child3');
INSERT INTO test VALUES ('Root1.Child2.Child2.Child4');

INSERT INTO test VALUES ('Root1.Child2.Child3.Child1');
INSERT INTO test VALUES ('Root1.Child2.Child3.Child2');
INSERT INTO test VALUES ('Root1.Child2.Child3.Child3');
INSERT INTO test VALUES ('Root1.Child2.Child3.Child4');

INSERT INTO test VALUES ('Root1.Child2.Child4.Child1');
INSERT INTO test VALUES ('Root1.Child2.Child4.Child2');
INSERT INTO test VALUES ('Root1.Child2.Child4.Child3');
INSERT INTO test VALUES ('Root1.Child2.Child4.Child4');

INSERT INTO test VALUES ('Root1.Child2.Child5.Child1');
INSERT INTO test VALUES ('Root1.Child2.Child5.Child2');
INSERT INTO test VALUES ('Root1.Child2.Child5.Child3');
INSERT INTO test VALUES ('Root1.Child2.Child5.Child4');

INSERT INTO test VALUES ('Root1.Child2.Child6.Child1');
INSERT INTO test VALUES ('Root1.Child2.Child6.Child2');
INSERT INTO test VALUES ('Root1.Child2.Child6.Child3');
INSERT INTO test VALUES ('Root1.Child2.Child6.Child4');
-- Root1.Child3
INSERT INTO test VALUES ('Root1.Child3.Child1.Child1');
INSERT INTO test VALUES ('Root1.Child3.Child1.Child2');
INSERT INTO test VALUES ('Root1.Child3.Child1.Child3');
INSERT INTO test VALUES ('Root1.Child3.Child1.Child4');

INSERT INTO test VALUES ('Root1.Child3.Child2.Child1');
INSERT INTO test VALUES ('Root1.Child3.Child2.Child2');
INSERT INTO test VALUES ('Root1.Child3.Child2.Child3');
INSERT INTO test VALUES ('Root1.Child3.Child2.Child4');

INSERT INTO test VALUES ('Root1.Child3.Child3.Child1');
INSERT INTO test VALUES ('Root1.Child3.Child3.Child2');
INSERT INTO test VALUES ('Root1.Child3.Child3.Child3');
INSERT INTO test VALUES ('Root1.Child3.Child3.Child4');

INSERT INTO test VALUES ('Root1.Child3.Child4.Child1');
INSERT INTO test VALUES ('Root1.Child3.Child4.Child2');
INSERT INTO test VALUES ('Root1.Child3.Child4.Child3');
INSERT INTO test VALUES ('Root1.Child3.Child4.Child4');

INSERT INTO test VALUES ('Root1.Child3.Child5.Child1');
INSERT INTO test VALUES ('Root1.Child3.Child5.Child2');
INSERT INTO test VALUES ('Root1.Child3.Child5.Child3');
INSERT INTO test VALUES ('Root1.Child3.Child5.Child4');

INSERT INTO test VALUES ('Root1.Child3.Child6.Child1');
INSERT INTO test VALUES ('Root1.Child3.Child6.Child2');
INSERT INTO test VALUES ('Root1.Child3.Child6.Child3');
INSERT INTO test VALUES ('Root1.Child3.Child6.Child4');

Example 1

MQTT-Wildcard: Root1/Child1/+/Child1 corresponds to the following SQL command:

SELECT path FROM test WHERE path ~ 'Root1.Child1.*{1}.Child1'

Results:

Example 2

MQTT-Wildcard: Root1/Child1/Child1/Child1/# corresponds to the following SQL command:

SELECT path FROM test WHERE path ~ 'Root1.Child1.Child1.Child1.*'

Results: