FraunhoferIOSB / FROST-Server

A Complete Server implementation of the OGC SensorThings API
https://fraunhoferiosb.github.io/FROST-Server/
GNU Lesser General Public License v3.0
194 stars 70 forks source link

Receiving messages twice when more than 1 MQTT client are connected #8

Closed geoxanadu closed 7 years ago

geoxanadu commented 7 years ago

Hi,

if I connect to the SensorServer with more than 1 MQTT client (i.e. from 2 different computers). I receive a published message twice on each client. As soon as one client is disconnected and only one is left, I receive the expected one message.

I publish on "v1.0/Observations" using MQTT a JSON Object:

{ "result" : 21.6, "Datastream":{"@iot.id":1} }

and the clients are subscribed to "v1.0/Datastreams(1)/Observations"

using QoS 0 I would expect that the message is sent only once (fire and forget) or does the problem occure since the SensorThings API Server subcribes to topics "topicFilter='#'" using QoS=1?

my log file looks like:

2017-04-26 17:31:41,393 262 [localhost-startStop-1] INFO d.f.iosb.ilt.sta.ContextListener - Context initialised, loading settings. 2017-04-26 17:31:41,730 599 [localhost-startStop-1] INFO io.moquette.server.Server - Persistent store file: C:\instances\TC01\work\Catalina\localhost\SensorThingsService\moquette_store.mapdb 2017-04-26 17:31:41,750 619 [localhost-startStop-1] INFO i.m.s.p.MapDBPersistentStore - Starting with existing [C:\instances\TC01\work\Catalina\localhost\SensorThingsService\moquette_store.mapdb] db file 2017-04-26 17:31:42,010 879 [localhost-startStop-1] INFO i.m.s.i.ProtocolProcessorBootstrapper - Starting without ACL definition 2017-04-26 17:31:42,556 1425 [localhost-startStop-1] INFO i.m.server.netty.NettyAcceptor - Server binded host: xxx.xxx.xxx.xxx, port: 1883 2017-04-26 17:31:42,563 1432 [localhost-startStop-1] INFO i.m.server.netty.NettyAcceptor - Server binded host: xxx.xxx.xxx.xxx, port: 9876 2017-04-26 17:31:42,615 1484 [localhost-startStop-1] INFO d.f.i.i.s.m.m.MoquetteMqttServer - paho-client connecting to broker: tcp://xxx.xxx.xxx.xxx:1883 2017-04-26 17:31:42,726 1595 [nioEventLoopGroup-3-1] INFO messageLogger - C->B CONNECT client <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> 2017-04-26 17:31:42,727 1596 [nioEventLoopGroup-3-1] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type CONNECT 2017-04-26 17:31:42,728 1597 [nioEventLoopGroup-3-1] INFO i.m.spi.impl.ProtocolProcessor - CONNECT for client <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> 2017-04-26 17:31:42,758 1627 [nioEventLoopGroup-3-1] INFO io.moquette.spi.ClientSession - cleaning old saved subscriptions for client <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> 2017-04-26 17:31:42,767 1636 [nioEventLoopGroup-3-1] INFO i.m.spi.impl.ProtocolProcessor - Connection established 2017-04-26 17:31:42,768 1637 [nioEventLoopGroup-3-1] INFO i.m.spi.impl.ProtocolProcessor - CONNECT processed 2017-04-26 17:31:42,769 1638 [nioEventLoopGroup-3-1] INFO messageLogger - C->B SUBSCRIBE <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics [[qos=1, topicFilter='#']] 2017-04-26 17:31:42,769 1638 [nioEventLoopGroup-3-1] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type SUBSCRIBE 2017-04-26 17:31:42,770 1639 [nioEventLoopGroup-3-1] INFO i.m.spi.impl.ProtocolProcessor - SUBSCRIBE client <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> 2017-04-26 17:31:42,819 1688 [nioEventLoopGroup-3-1] INFO io.moquette.spi.ClientSession - <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> subscribed to the topic filter <#> with QoS 1 - LEAST_ONE 2017-04-26 17:31:42,832 1701 [localhost-startStop-1] INFO d.f.i.i.s.m.m.MoquetteMqttServer - paho-client connected to broker

Connect with first Client (receiving one message):

2017-04-26 17:32:30,750 49619 [nioEventLoopGroup-3-2] INFO messageLogger - C->B CONNECT client 2017-04-26 17:32:30,751 49620 [nioEventLoopGroup-3-2] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type CONNECT 2017-04-26 17:32:30,751 49620 [nioEventLoopGroup-3-2] INFO i.m.spi.impl.ProtocolProcessor - CONNECT for client 2017-04-26 17:32:30,752 49621 [nioEventLoopGroup-3-2] INFO io.moquette.spi.ClientSession - cleaning old saved subscriptions for client 2017-04-26 17:32:30,757 49626 [nioEventLoopGroup-3-2] INFO i.m.spi.impl.ProtocolProcessor - Connection established 2017-04-26 17:32:30,758 49627 [nioEventLoopGroup-3-2] INFO i.m.spi.impl.ProtocolProcessor - CONNECT processed 2017-04-26 17:32:32,071 50940 [nioEventLoopGroup-3-2] INFO messageLogger - C->B SUBSCRIBE to topics [[qos=0, topicFilter='v1.0/Datastreams(1)/Observations']] 2017-04-26 17:32:32,072 50941 [nioEventLoopGroup-3-2] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type SUBSCRIBE 2017-04-26 17:32:32,072 50941 [nioEventLoopGroup-3-2] INFO i.m.spi.impl.ProtocolProcessor - SUBSCRIBE client 2017-04-26 17:32:32,073 50942 [nioEventLoopGroup-3-2] INFO io.moquette.spi.ClientSession - subscribed to the topic filter <v1.0/Datastreams(1)/Observations> with QoS 0 - MOST_ONE 2017-04-26 17:32:38,803 57672 [nioEventLoopGroup-3-1] INFO messageLogger - C->B PUBLISH <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:32:38,803 57672 [nioEventLoopGroup-3-1] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type PUBLISH 2017-04-26 17:32:38,804 57673 [nioEventLoopGroup-3-1] INFO i.m.spi.impl.ProtocolProcessor - PUB --PUBLISH--> SRV executePublish invoked with io.moquette.parser.proto.messages.PublishMessage@54ee28ff 2017-04-26 17:32:38,808 57677 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:32:38,810 57679 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:32:38,810 57679 [nioEventLoopGroup-3-2] INFO messageLogger - C<-B PUBLISH to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:32:38,811 57680 [nioEventLoopGroup-3-1] INFO messageLogger - C<-B PUBLISH <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:33:32,071 110940 [nioEventLoopGroup-3-2] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type PINGREQ 2017-04-26 17:33:38,803 117672 [nioEventLoopGroup-3-1] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type PINGREQ

Connect with second client and now receiving two messages:

2017-04-26 17:34:09,652 148521 [nioEventLoopGroup-3-3] INFO messageLogger - C->B CONNECT client 2017-04-26 17:34:09,653 148522 [nioEventLoopGroup-3-3] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type CONNECT 2017-04-26 17:34:09,653 148522 [nioEventLoopGroup-3-3] INFO i.m.spi.impl.ProtocolProcessor - CONNECT for client 2017-04-26 17:34:09,655 148524 [nioEventLoopGroup-3-3] INFO io.moquette.spi.ClientSession - cleaning old saved subscriptions for client 2017-04-26 17:34:09,659 148528 [nioEventLoopGroup-3-3] INFO i.m.spi.impl.ProtocolProcessor - Connection established 2017-04-26 17:34:09,660 148529 [nioEventLoopGroup-3-3] INFO i.m.spi.impl.ProtocolProcessor - CONNECT processed 2017-04-26 17:34:10,994 149863 [nioEventLoopGroup-3-3] INFO messageLogger - C->B SUBSCRIBE to topics [[qos=0, topicFilter='v1.0/Datastreams(1)/Observations']] 2017-04-26 17:34:10,994 149863 [nioEventLoopGroup-3-3] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type SUBSCRIBE 2017-04-26 17:34:10,995 149864 [nioEventLoopGroup-3-3] INFO i.m.spi.impl.ProtocolProcessor - SUBSCRIBE client 2017-04-26 17:34:10,995 149864 [nioEventLoopGroup-3-3] INFO io.moquette.spi.ClientSession - subscribed to the topic filter <v1.0/Datastreams(1)/Observations> with QoS 0 - MOST_ONE 2017-04-26 17:34:15,491 154360 [nioEventLoopGroup-3-1] INFO messageLogger - C->B PUBLISH <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,491 154360 [nioEventLoopGroup-3-1] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type PUBLISH 2017-04-26 17:34:15,491 154360 [nioEventLoopGroup-3-1] INFO i.m.spi.impl.ProtocolProcessor - PUB --PUBLISH--> SRV executePublish invoked with io.moquette.parser.proto.messages.PublishMessage@29f37ac3 2017-04-26 17:34:15,492 154361 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,492 154361 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,492 154361 [nioEventLoopGroup-3-3] INFO messageLogger - C<-B PUBLISH to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,492 154361 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,492 154361 [nioEventLoopGroup-3-2] INFO messageLogger - C<-B PUBLISH to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,492 154361 [nioEventLoopGroup-3-1] INFO messageLogger - C<-B PUBLISH <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,497 154366 [nioEventLoopGroup-3-1] INFO messageLogger - C->B PUBLISH <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,497 154366 [nioEventLoopGroup-3-1] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type PUBLISH 2017-04-26 17:34:15,497 154366 [nioEventLoopGroup-3-1] INFO i.m.spi.impl.ProtocolProcessor - PUB --PUBLISH--> SRV executePublish invoked with io.moquette.parser.proto.messages.PublishMessage@6229e87b 2017-04-26 17:34:15,497 154366 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,497 154366 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,498 154367 [nioEventLoopGroup-3-3] INFO messageLogger - C<-B PUBLISH to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,498 154367 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,498 154367 [nioEventLoopGroup-3-2] INFO messageLogger - C<-B PUBLISH to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:34:15,498 154367 [nioEventLoopGroup-3-1] INFO messageLogger - C<-B PUBLISH <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics <v1.0/Datastreams(1)/Observations>

Here I disconnect one client and publish one message afterwards:

2017-04-26 17:43:13,651 692520 [nioEventLoopGroup-3-3] INFO messageLogger - C->B DISCONNECT 2017-04-26 17:43:13,652 692521 [nioEventLoopGroup-3-3] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type DISCONNECT 2017-04-26 17:43:13,652 692521 [nioEventLoopGroup-3-3] INFO i.m.spi.impl.ProtocolProcessor - cleaning old saved subscriptions for client 2017-04-26 17:43:13,678 692547 [nioEventLoopGroup-3-3] INFO i.m.spi.impl.ProtocolProcessor - DISCONNECT client finished 2017-04-26 17:43:13,678 692547 [nioEventLoopGroup-3-3] INFO messageLogger - Channel closed 2017-04-26 17:43:15,506 694375 [nioEventLoopGroup-3-1] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type PINGREQ 2017-04-26 17:43:27,423 706292 [nioEventLoopGroup-3-1] INFO messageLogger - C->B PUBLISH <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:43:27,424 706293 [nioEventLoopGroup-3-1] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type PUBLISH 2017-04-26 17:43:27,424 706293 [nioEventLoopGroup-3-1] INFO i.m.spi.impl.ProtocolProcessor - PUB --PUBLISH--> SRV executePublish invoked with io.moquette.parser.proto.messages.PublishMessage@170df951 2017-04-26 17:43:27,427 706296 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:43:27,428 706297 [nioEventLoopGroup-3-1] INFO i.m.s.i.PersistentQueueMessageSender - send publish message to <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> on topic <v1.0/Datastreams(1)/Observations> 2017-04-26 17:43:27,428 706297 [nioEventLoopGroup-3-2] INFO messageLogger - C<-B PUBLISH to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:43:27,428 706297 [nioEventLoopGroup-3-1] INFO messageLogger - C<-B PUBLISH <SensorThings API Server (743cf4fd-d470-4cab-a3a9-0ffce6bd76b4)> to topics <v1.0/Datastreams(1)/Observations> 2017-04-26 17:43:32,080 710949 [nioEventLoopGroup-3-2] INFO i.m.server.netty.NettyMQTTHandler - Received a message of type PINGREQ

Another issue are wildcards:

2017-04-26 10:33:05,316 19914433 [nioEventLoopGroup-3-4] INFO io.moquette.spi.ClientSession - <paho/28A7D875E8DC786DAD> subscribed to the topic filter <v1.0/#> with QoS 0 - MOST_ONE 2017-04-26 10:33:05,333 19914450 [pool-2-thread-1] ERROR d.f.i.i.sta.parser.path.PathParser - Failed to parse because (Set loglevel to trace for stack): Lexical error at line 1, column 3. Encountered: after : "" 2017-04-26 10:33:05,335 19914452 [pool-2-thread-1] ERROR d.f.i.i.s.m.s.Subscription - Not a valid path: Path is not valid.

This should normally work:

2017-04-26 17:01:34,424 43223541 [nioEventLoopGroup-3-3] INFO io.moquette.spi.ClientSession - subscribed to the topic filter <v1.0/+/Observations> with QoS 0 - MOST_ONE 2017-04-26 17:01:34,430 43223547 [pool-2-thread-12] ERROR d.f.i.i.sta.parser.path.PathParser - Failed to parse because (Set loglevel to trace for stack): Lexical error at line 1, column 2. Encountered: " " (32), after : "" 2017-04-26 17:01:34,433 43223550 [pool-2-thread-12] ERROR d.f.i.i.s.m.s.Subscription - Not a valid path: Path is not valid.

Thanks in advance

hylkevds commented 7 years ago

Indeed, that's a bug. We're sending the message to the MQTT server for each subscription. But when two clients have the same subscription we should not send that message twice, since the MQTT server already distributes it to the different clients... oops ;)

Wildcards are not allowed in the MQTT extension of the SensorThings API. Only paths that are also valid in the REST API are allowed.

hylkevds commented 7 years ago

Please test the latest version, and reopen the issue if it did not fully fix the problem.