mcollina / mqtt-level-store

Store your in-flight MQTT message on Level, for Node
MIT License
24 stars 11 forks source link

Added published order preserving functionality. #10

Closed redboltz closed 6 years ago

redboltz commented 6 years ago

This pull request will add published order preserving functionality.

It requires MQTT.js update. See https://github.com/mqttjs/MQTT.js/pull/858.

Implementation note:

  1. leved-db cannot keep insertion order.
  2. I added the Date + counter to the value. The counter is to order multiple packets in the same millisecond.
  3. When createStream() is called, first read all records(*1) from the level-db, then sort it by Date + counter, finally push the sorted data into my own stream.

*1 level-db provides only asynchronous interface to read records. So I paused my own stream until all asynchronous level-db read is finished. In order to do that, I use pause() and resume() methods.

Unfortunately, pause() and resume() does NOT work with on('readable' ...) but works well with on('data' ...). So I sent the pull request to MQTT.js.

mcollina commented 6 years ago

LevelDB stream returns data in lexicographic order. You should be able to add a date (ISO 8601) as part of the key, and have those returned in order.

redboltz commented 6 years ago

You should be able to add a date (ISO 8601) as part of the key, and have those returned in order.

If I add the date as part of the key, how do I delete it when puback is received? I think that I cannot get the same date as put.

mcollina commented 6 years ago

If I add the date as part of the key, how do I delete it when puback is received? I think that I cannot get the same date as put.

Create a secondary index with the date in the prefix. In practice you should store data in two keys, and then update them accordingly.

packets~ID -> this should include the date within the packet. packet-by-date~DATE~ID -> this returns the packet

redboltz commented 6 years ago

You mean something like this?

key value
packets~123 date1
packets~124 date2
packet-by-date\~date1\~123 packet1
packet-by-date\~date2\~124 packet2

It works well with 'put()', 'del()', and 'get()'. But how do I implement createStream()?

Even if they are sorted by key, I cannot return level-db's stream directly like https://github.com/mcollina/mqtt-level-store/blob/master/mqtt-level-store.js#L53

Because values are mixed dates and packets.

If I get all (or half) records from level-db store, then create my own store, I think that it not a big difference from my approach.

Or is there any way to filter unnecessary records?

mcollina commented 6 years ago

You mean something like this?

No, something like

packet1._date = new Date().toISOString()
key value
packets~123 packet1
packets~124 packet2
packet-by-date\~date1\~123 packet1
packet-by-date\~date2\~124 packet2

It works well with 'put()', 'del()', and 'get()'. But how do I implement createStream()?

You can filter using lt and gt from levelup (see https://github.com/Level/levelup#createReadStream), and put it as { gt: 'packet-by-date~', lt: 'packet-by-date~\xff' }.

redboltz commented 6 years ago

Let's say the table layout https://github.com/mcollina/mqtt-level-store/pull/10#issuecomment-415785274 is approach A, and https://github.com/mcollina/mqtt-level-store/pull/10#issuecomment-415790232 is approach B.

Both use 'gt' 'lt' filter when createStream().

Trigger A B Note
publish put date and value put value twice -
pubrel get date, get value(for compare), compare cmd, conditional update value ??? Maybe B can't be implemented
puback/pubcomp get date, get value(for callback), del 2 records ??? Maybe B can't be implemented
connected create filtered stream create filtered stream -

I implemented the approach A as #11. In order to keep commit log clean, I create the new pull request.