moscajs / aedes

Barebone MQTT broker that can run on any stream server, the node way
MIT License
1.78k stars 231 forks source link

[feat!]: change to named exports to enable monkey patching #888

Open getlarge opened 1 year ago

getlarge commented 1 year ago

Is your feature request related to a problem? Please describe.

I am currently trying to build an OpenTelemetry instrumentation library for Aedes to enable traces to be completed with what happens inside Aedes. OpenTelemetry provides a library that contains a base class which provide many helpers to patch the module to instrument. It relies heavily on shimmer, require-in-the-middle and import-in-the-middle to achieve this. My plan is to start by patching :

After giving a first try i noticed few obstacles :

  1. the handlers are (kind of) default export, and it is not possible (unless proven otherwise) to patch the default export (i guess it is not mutable object ?)
  2. the methods assigned during the class (or function) instantiation (for example aedes instance method handle) cannot be patched as they are not explicitly assigned to Aedes.prototype
  3. it will be needed to make the span context transit from the packet published to the packet delivered, it means adding “hidden” properties to the packet that would be replicated when creating a new packet instance. So aedes-packet would need to be patched as well, and updated to use a named export. So mqtt-packet needs to be patched at runtime to store and retrieve the serialized context following this proposal

Describe the solution you'd like

Describe alternatives you've considered

Additional context Add any other context or screenshots about the feature request here.

getlarge commented 1 year ago

This could be an opportunity to revise how modules are exported and solve #878 by the same occasion.

getlarge commented 12 months ago

I will work on a PR soon. For reference i am copying the patch that i applied for aedes-otel-instrumentation.

diff --git a/node_modules/aedes/aedes.js b/node_modules/aedes/aedes.js
index c02d289..668b162 100644
--- a/node_modules/aedes/aedes.js
+++ b/node_modules/aedes/aedes.js
@@ -7,7 +7,7 @@ const series = require('fastseries')
 const { v4: uuidv4 } = require('uuid')
 const reusify = require('reusify')
 const { pipeline } = require('stream')
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const memory = require('aedes-persistence')
 const mqemitter = require('mqemitter')
 const Client = require('./lib/client')
@@ -45,6 +45,7 @@ function Aedes (opts) {
   // +1 when construct a new aedes-packet
   // internal track for last brokerCounter
   this.counter = 0
+  this.concurrency = opts.concurrency
   this.queueLimit = opts.queueLimit
   this.connectTimeout = opts.connectTimeout
   this.maxClientsIdLength = opts.maxClientsIdLength
@@ -52,24 +53,19 @@ function Aedes (opts) {
     concurrency: opts.concurrency,
     matchEmptyLevels: true // [MQTT-4.7.1-3]
   })
-  this.handle = function handle (conn, req) {
-    conn.setMaxListeners(opts.concurrency * 2)
-    // create a new Client instance for a new connection
-    // return, just to please standard
-    return new Client(that, conn, req)
-  }
+
   this.persistence = opts.persistence || memory()
   this.persistence.broker = this
   this._parallel = parallel()
   this._series = series()
   this._enqueuers = reusify(DoEnqueues)

-  this.preConnect = opts.preConnect
-  this.authenticate = opts.authenticate
-  this.authorizePublish = opts.authorizePublish
-  this.authorizeSubscribe = opts.authorizeSubscribe
-  this.authorizeForward = opts.authorizeForward
-  this.published = opts.published
+  this._preConnect = opts.preConnect
+  this._authenticate = opts.authenticate
+  this._authorizePublish = opts.authorizePublish
+  this._authorizeSubscribe = opts.authorizeSubscribe
+  this._authorizeForward = opts.authorizeForward
+  this._published = opts.published

   this.decodeProtocol = opts.decodeProtocol
   this.trustProxy = opts.trustProxy
@@ -250,6 +246,15 @@ function removeSharp (sub) {
   return code !== 43 && code !== 35
 }

+// assiging to prototype is a breaking change as it is required to bind the Aedes instance to the function
+// @example net.createServer(broker.handle.bind(broker)) or net.createServer((socket) => broker.handle(socket))
+Aedes.prototype.handle = function handle (conn, req) {
+    conn.setMaxListeners(this.concurrency * 2)
+    // create a new Client instance for a new connection
+    // return, just to please standard
+    return new Client(this, conn, req)
+}
+
 function callPublished (_, done) {
   this.broker.published(this.packet, this.client, done)
   this.broker.emit('publish', this.packet, this.client)
@@ -338,6 +343,30 @@ Aedes.prototype.close = function (cb = noop) {

 Aedes.prototype.version = require('./package.json').version

+Aedes.prototype.preConnect = function (client, packet, callback) {
+  this._preConnect(client, packet, callback)
+}
+
+Aedes.prototype.authenticate = function (client, username, password, callback) {
+  this._authenticate(client, username, password, callback)
+}
+
+Aedes.prototype.authorizePublish = function (client, packet, callback) {
+  this._authorizePublish(client, packet, callback)
+}
+
+Aedes.prototype.authorizeSubscribe = function (client, sub, callback) {
+  this._authorizeSubscribe(client, sub, callback)
+}
+
+Aedes.prototype.authorizeForward = function (client, packet) {
+  return this._authorizeForward(client, packet)
+}
+
+Aedes.prototype.published = function (packet, client, callback) {
+  this._published(packet, client, callback)
+}
+
 function defaultPreConnect (client, packet, callback) {
   callback(null, true)
 }
diff --git a/node_modules/aedes/lib/client.js b/node_modules/aedes/lib/client.js
index 414d8e5..e525712 100644
--- a/node_modules/aedes/lib/client.js
+++ b/node_modules/aedes/lib/client.js
@@ -4,12 +4,12 @@ const mqtt = require('mqtt-packet')
 const EventEmitter = require('events')
 const util = require('util')
 const eos = require('end-of-stream')
-const Packet = require('aedes-packet')
-const write = require('./write')
+const { Packet } = require('aedes-packet')
+const { write } = require('./write')
 const QoSPacket = require('./qos-packet')
-const handleSubscribe = require('./handlers/subscribe')
-const handleUnsubscribe = require('./handlers/unsubscribe')
-const handle = require('./handlers')
+const { handleSubscribe } = require('./handlers/subscribe')
+const { handleUnsubscribe } = require('./handlers/unsubscribe')
+const { handle } = require('./handlers')
 const { pipeline } = require('stream')
 const { through } = require('./utils')

diff --git a/node_modules/aedes/lib/handlers/connect.js b/node_modules/aedes/lib/handlers/connect.js
index a4c32d0..bd2d8cb 100644
--- a/node_modules/aedes/lib/handlers/connect.js
+++ b/node_modules/aedes/lib/handlers/connect.js
@@ -2,10 +2,10 @@

 const retimer = require('retimer')
 const { pipeline } = require('stream')
-const write = require('../write')
+const { write } = require('../write')
 const QoSPacket = require('../qos-packet')
 const { through } = require('../utils')
-const handleSubscribe = require('./subscribe')
+const { handleSubscribe } = require('./subscribe')
 const uniqueId = require('hyperid')()

 function Connack (arg) {
@@ -264,4 +264,4 @@ function emptyQueueFilter (err, client, packet) {
   }
 }

-module.exports = handleConnect
+module.exports = { handleConnect }
diff --git a/node_modules/aedes/lib/handlers/index.js b/node_modules/aedes/lib/handlers/index.js
index a5dfaa8..b611293 100644
--- a/node_modules/aedes/lib/handlers/index.js
+++ b/node_modules/aedes/lib/handlers/index.js
@@ -1,13 +1,13 @@
 'use strict'

-const handleConnect = require('./connect')
-const handleSubscribe = require('./subscribe')
-const handleUnsubscribe = require('./unsubscribe')
-const handlePublish = require('./publish')
-const handlePuback = require('./puback')
-const handlePubrel = require('./pubrel')
-const handlePubrec = require('./pubrec')
-const handlePing = require('./ping')
+const { handleConnect } = require('./connect')
+const { handleSubscribe } = require('./subscribe')
+const { handleUnsubscribe } = require('./unsubscribe')
+const { handlePublish } = require('./publish')
+const { handlePuback } = require('./puback')
+const { handlePubrel } = require('./pubrel')
+const { handlePubrec } = require('./pubrec')
+const { handlePing } = require('./ping')

 function handle (client, packet, done) {
   if (packet.cmd === 'connect') {
@@ -74,4 +74,4 @@ function finish (conn, packet, done) {
   done(error)
 }

-module.exports = handle
+module.exports = { handle }
diff --git a/node_modules/aedes/lib/handlers/ping.js b/node_modules/aedes/lib/handlers/ping.js
index a4c042c..69b3ded 100644
--- a/node_modules/aedes/lib/handlers/ping.js
+++ b/node_modules/aedes/lib/handlers/ping.js
@@ -1,6 +1,6 @@
 'use strict'

-const write = require('../write')
+const { write } = require('../write')
 const pingResp = {
   cmd: 'pingresp'
 }
@@ -10,4 +10,4 @@ function handlePing (client, packet, done) {
   write(client, pingResp, done)
 }

-module.exports = handlePing
+module.exports = { handlePing }
diff --git a/node_modules/aedes/lib/handlers/puback.js b/node_modules/aedes/lib/handlers/puback.js
index e4b419c..8376861 100644
--- a/node_modules/aedes/lib/handlers/puback.js
+++ b/node_modules/aedes/lib/handlers/puback.js
@@ -8,4 +8,4 @@ function handlePuback (client, packet, done) {
   })
 }

-module.exports = handlePuback
+module.exports = { handlePuback }
diff --git a/node_modules/aedes/lib/handlers/publish.js b/node_modules/aedes/lib/handlers/publish.js
index e30c9db..5c3e167 100644
--- a/node_modules/aedes/lib/handlers/publish.js
+++ b/node_modules/aedes/lib/handlers/publish.js
@@ -1,6 +1,6 @@
 'use strict'

-const write = require('../write')
+const { write } = require('../write')

 function PubAck (packet) {
   this.cmd = 'puback'
@@ -62,4 +62,4 @@ function authorizePublish (packet, done) {
   this.broker.authorizePublish(this, packet, done)
 }

-module.exports = handlePublish
+module.exports = { handlePublish }
diff --git a/node_modules/aedes/lib/handlers/pubrec.js b/node_modules/aedes/lib/handlers/pubrec.js
index 5c914dd..dc7a7f0 100644
--- a/node_modules/aedes/lib/handlers/pubrec.js
+++ b/node_modules/aedes/lib/handlers/pubrec.js
@@ -1,6 +1,6 @@
 'use strict'

-const write = require('../write')
+const { write } = require('../write')

 function PubRel (packet) {
   this.cmd = 'pubrel'
@@ -27,4 +27,4 @@ function handlePubrec (client, packet, done) {
   }
 }

-module.exports = handlePubrec
+module.exports = { handlePubrec }
diff --git a/node_modules/aedes/lib/handlers/pubrel.js b/node_modules/aedes/lib/handlers/pubrel.js
index 09dcc86..672b697 100644
--- a/node_modules/aedes/lib/handlers/pubrel.js
+++ b/node_modules/aedes/lib/handlers/pubrel.js
@@ -1,6 +1,6 @@
 'use strict'

-const write = require('../write')
+const { write } = require('../write')

 function ClientPacketStatus (client, packet) {
   this.client = client
@@ -47,4 +47,4 @@ function pubrelDel (arg, done) {
   persistence.incomingDelPacket(this.client, arg.packet, done)
 }

-module.exports = handlePubrel
+module.exports = { handlePubrel }
diff --git a/node_modules/aedes/lib/handlers/subscribe.js b/node_modules/aedes/lib/handlers/subscribe.js
index 2470427..e2007aa 100644
--- a/node_modules/aedes/lib/handlers/subscribe.js
+++ b/node_modules/aedes/lib/handlers/subscribe.js
@@ -1,10 +1,10 @@
 'use strict'

 const fastfall = require('fastfall')
-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const { through } = require('../utils')
 const { validateTopic, $SYS_PREFIX } = require('../utils')
-const write = require('../write')
+const { write } = require('../write')

 const subscribeTopicActions = fastfall([
   authorize,
@@ -245,4 +245,4 @@ function completeSubscribe (err) {

 function noop () { }

-module.exports = handleSubscribe
+module.exports = { handleSubscribe }
diff --git a/node_modules/aedes/lib/handlers/unsubscribe.js b/node_modules/aedes/lib/handlers/unsubscribe.js
index e08c317..b9cd7ef 100644
--- a/node_modules/aedes/lib/handlers/unsubscribe.js
+++ b/node_modules/aedes/lib/handlers/unsubscribe.js
@@ -1,6 +1,6 @@
 'use strict'

-const write = require('../write')
+const { write } = require('../write')
 const { validateTopic, $SYS_PREFIX } = require('../utils')

 function UnSubAck (packet) {
@@ -101,4 +101,4 @@ function completeUnsubscribe (err) {

 function noop () { }

-module.exports = handleUnsubscribe
+module.exports = { handleUnsubscribe }
diff --git a/node_modules/aedes/lib/qos-packet.js b/node_modules/aedes/lib/qos-packet.js
index 5527fe1..07c1581 100644
--- a/node_modules/aedes/lib/qos-packet.js
+++ b/node_modules/aedes/lib/qos-packet.js
@@ -1,6 +1,6 @@
 'use strict'

-const Packet = require('aedes-packet')
+const { Packet } = require('aedes-packet')
 const util = require('util')

 function QoSPacket (original, client) {
diff --git a/node_modules/aedes/lib/write.js b/node_modules/aedes/lib/write.js
index 716d81a..a5d186c 100644
--- a/node_modules/aedes/lib/write.js
+++ b/node_modules/aedes/lib/write.js
@@ -21,4 +21,4 @@ function write (client, packet, done) {
   setImmediate(done, error, client)
 }

-module.exports = write
+module.exports = { write }
diff --git a/node_modules/aedes/types/client.d.ts b/node_modules/aedes/types/client.d.ts
index 2906213..c415fce 100644
--- a/node_modules/aedes/types/client.d.ts
+++ b/node_modules/aedes/types/client.d.ts
@@ -6,10 +6,10 @@ import {
   Subscriptions,
   UnsubscribePacket
 } from './packet'
-import { Connection } from './instance'
+import Aedes, { Connection } from './instance'
 import { EventEmitter } from 'node:events'

-export interface Client extends EventEmitter {
+export class Client extends EventEmitter {
   id: Readonly<string>;
   clean: Readonly<boolean>;
   version: Readonly<number>;
@@ -19,6 +19,8 @@ export interface Client extends EventEmitter {
   connected: Readonly<boolean>;
   closed: Readonly<boolean>;

+  constructor(broker: Aedes, conn: Connection, req?: IncomingMessage)
+
   on(event: 'connected', listener: () => void): this;
   on(event: 'error', listener: (error: Error) => void): this;
mcollina commented 12 months ago

This seems a massive change to do (not opposed).

I think modifying aedes-packet is incorrect, as the transaction should be decouple in the tracing flow: publishing terminates when mqemitter terminates, and delivery starts as another trace. This is needed to correctly support multi-processes systems with Redis or MongoDB

getlarge commented 12 months ago

This seems a massive change to do (not opposed).

I think modifying aedes-packet is incorrect, as the transaction should be decouple in the tracing flow: publishing terminates when mqemitter terminates, and delivery starts as another trace. This is needed to correctly support multi-processes systems with Redis or MongoDB

Indeed, this implies some breaking changes.

Aedes-packet does not need to be modified BUT mqtt-packet has to be patched at runtime to propagate the context and enable distributed traces, those traces are composed of multiple spans which SHOULD be related to trace the communication between multiple services. That’s how it is specified in OpenTelemetry.

To reformulate your statement : publishing terminates **a span** when mqemitter terminates, and delivery starts as another **span**. …the latter is linked using the span identifier of the former, as a parent. This test illustrates it.

Regarding your concern about multi process systems, the packets are stored by those systems (in aedes-persistence-X and aedes-emitter-X) right ? So as long as the context is correctly serialized into the packet it should be fine ? Am i missing something ? BTW for the serialization of the context, i simply followed what is suggested in this document, as you can here

mcollina commented 12 months ago

Changing mqtt-packet is a non starter unfortunately:(.

getlarge commented 12 months ago

Changing mqtt-packet is a non starter unfortunately:(.

Maybe i wasn’t clear, no change will be requested in the mqtt-packet source code.

robertsLando commented 12 months ago

@mcollina what we would like to know is mostly if the approach could have some performance implications and/or if there are better alternatives

mcollina commented 12 months ago

I think the changes here are not really needed, as the "in-the-middle" approach would be sufficient. However I think it would make things easier.

As for perf, it should be neutral when no monkeypatching is used.

A better approach would be to design a system that requires no monkeypatching at all. This would definitely be faster.


How do you plan to propagate the trace over the MQTT protocol?

getlarge commented 12 months ago

A better approach would be to design a system that requires no monkeypatching at all. This would definitely be faster.

How do you imagine this design ? Allowing the Aedes consumer to provide some extra functions that can wrap the functions that needs to monitored ?


How do you plan to propagate the trace over the MQTT protocol?

As said in one of the message above: There is a document that recommends solutions to store and retrieve the context (trace) to and from the MQTT packet. To summarize it:

  1. for MQTT v5 (which does not yet apply for Aedes) packet.properties.userProperties should be used to store the traceparent and tracestate as properties.
  2. for MQTT v3, in the case of a JSON payload, the traceparent and tracestate should be stored as properties.
  3. for MQTT v3, in other cases the approach is a bit more vague and relies on the binary protocol proposal. It is a bit more vague as it does not recommend a location for the context to be stored, but it suggests a serialization/deserialization algorithms that we could apply. In that case i thought we could prepend the packet payload with it.
mcollina commented 12 months ago

IMHO this makes sense only with MQTT5

getlarge commented 12 months ago

I agree that it would make less of a performance penalty for MQTT 5. For MQTT 3 if some users truly wish to propagate trace between their systems, i don't see another alternative (except prepending the whole MQTT packet with the trace context, in the same way this is done for the PROXY protocol). It could be an opt-in feature ?