Closed erwinkendo closed 7 years ago
Would you like to send a PR against MQTT.js? Use a through2 stream, so the messages are sent slowly and we avoid a thundering rod scenario.
Sounds good, I will work in this on monday. It is still not clear to me why I should use through2, why does it works better that plain CreateStream? I though through2 is already used, at least in the store.js library using the 'readable-stream' package. Am I wrong or confused in any way?
Oh, no problem about through2, just use a Transform if you like it more. The only catch is do not use the on('data')
event.
Ok, I think I understand now, we are trying to avoid backpressure when reading the store.
I am going to implement something similar to the callback to read the queued msg when a connection is made.
I am facing a small "problem" though. Because _setupStream
is called every time a reconnection attempt is being made, the db is been read every time this happens, leading to _sendPacket
being called that exact amount of times, sending the msg several times. The suback msg is being handled correctly and for any repeated suback msg received the client discards it, but I would prefer a cleaner behaviour.
I tried using the inner state this.connected
to prevent the process from readingbut this flag is set to true much later in the process, even after I can send msg to the broker. I am currently using a condition to read the db only if the process isn't in queue checking that.outgoing[packet.messageId]
and it seems to work well for qos 1 and 2.
I will have the PR done today :)
The backpressure is needed to avoid the overwhelming the broker on reconnect. I am not sure if I got your problem correctly, but I recommend using pump, a module that supports destroying streams correctly. You should destroy this stream on disconnect, so that it restore stuff correctly on the next run.
I already did a PR :+1: That package seems very interesting and useful, but I am not sure if its use is necessary now. Please review the PR if some change must be made. I tested this new PR with continued operation, forcing disconnect/reconnect while more messages where being queued and for shut-down and start-up operation and the keys in the db are being putted and deleted without error.
I've just released a new version. Feel free to reopen if this is still a bug.
Good Day @mcollina great work, I was going to implement this with nedb but this seems very interesting.
One small issue, there is a problem with when using this package with current mqtt.js version.
When a in-flight message is loaded in setupStream the message is indeed loaded, but after the message is sent, when the check for callback is done in handleAck, }because no callback was assigned in the
this.outgoing
array, the message is not deleted from the LevelDB storage.How do you propose to fix this issue cleanly? I managed to do this assigning a dummy callback before the
that._sendPacket(packet)
function, but maybe you can propose a bigger change in the client.js library.Kind regards