moscajs / mosca

MQTT broker as a module
mosca.io
3.2k stars 513 forks source link

Ensuring atomicity of transactions involved in "Server#publish" method #703

Open prabathabey opened 6 years ago

prabathabey commented 6 years ago

Please refer the following code-snippet.

Server.prototype.publish = function publish(packet, client, callback) {
...
  that.storePacket(newPacket, function() {
    if (that.closed) {
      logger.debug({ packet: newPacket }, "not delivering because we are closed");
      return;
    }

    that.ascoltatore.publish(
      newPacket.topic,
      newPacket.payload,
      opts,
      function() {
        that.published(newPacket, client, function() {
          if( newPacket.topic.indexOf( '$SYS' ) >= 0 ) {
            logger.trace({ packet: newPacket }, "published packet");
          } else {
            logger.debug({ packet: newPacket }, "published packet");
          }
          that.emit("published", newPacket, client);
          callback(undefined, newPacket);
        });
      }
    );
  });
}; 

@mcollina As it appears, in the above implementation, "ascoltatore.publish()" method will be invoked even when "storePacket" returns an error as the there's no means to handle such errors at the moment. Similarly, no error is processed within the callback passed into "ascoltatore.publish()" as well, which leads to an inconsistent state of the system as, if "ascoltatore.publish()" fails, at that point, there's no option to rollback what's committed within "Server.storePacket()" method. To me, this looks to me like a bug. What are your thoughts and feedback on this?

mcollina commented 6 years ago

I would say: use http://npm.im/aedes.

If you would like to send a PR, I’ll be happy to review (add unit tests!).

prabathabey commented 6 years ago

@mcollina Thanks for the feedback. Adapting aedes is indeed part of the plans. I've noticed that Aedes supports useful new features such as backpressure, etc, and the implementation is lean and elegant as well, which is another motivation for us to adapt it. However, the timelines would depend on how soon Mosca will be deprecated (if it ever will), and also on the fact that if it already solves some of the issues highlighted in the ticket.

In addition, load balancing subscriber groups (i.e. shared subscriptions) is also one of the critical functionalities that we needed, and therefore, something that was being implemented on top of Mosca, by bridging that with Kafka (since kafka already solves the problem of subscriber load-balancing and it also is part of the platform that we develop already) through ascoltatori. We'd already spent some time implementing the above, and succeeded to a great extent. As far as I have seen, the same can be implemented through Aedes-persistence module as well. I'd appreciate if you could shed some light as to how trivial this would be.

Further, do we by any chance already have any performance benchmark document for Aedes?

Thanks in advance!

mcollina commented 6 years ago

There is no official document for Aedes.