mcollina / mqtt-level-store

Store your in-flight MQTT message on Level, for Node
MIT License
24 stars 11 forks source link

Keys limitation (Offline storage issue) #19

Open mugul opened 4 years ago

mugul commented 4 years ago

Hi there!

context: Mqtt protocol has from 1 to 65.535 ids for in flight messages - messages in the middle of transmission between the client and the server.

This storage uses the messageId as key to store a timestamp, and then that timestamp to store the pacakge. But there is a little trouble.

problem: Mqtt's messageId is generated with a random value between 1 and 65.535, which leave us with only 65.535 possible keys, and with an increasing risk of losing data.

risks: When the storage is empty, we have a low risk of repeating keys, but when it starts growing, the chance of generating a random value between 1 and 65.535 that we already have stored gets higher, and after 32k packages stored (after a few days without internet connection), the chance is to lose half of the packages.

proposal: If the local storage could use another, longer and not randomly assigned message id for local storage this would solve the problem.

Then would have to coordinate so that in flight message id's do not clash, and keep track of which message mqtt is sending (for qos > 0), because it can deliver it (and have to delete it from storage), or can fail (and have to insert it in the storage again, if it was pulled out).

Hope I explained myself...

Best regards!

redboltz commented 4 years ago

Mqtt's messageId is generated with a random value between 1 and 65.535, which leave us with only 65.535 possible keys, and with an increasing risk of losing data.

MQTT.js create the first messageId randomly but from the next messageId in the same session is created incrementally.

See

https://github.com/mqttjs/MQTT.js/blob/2a9b8fbe100f8cf84c0d7b784c5912780a3e54a7/lib/client.js#L189

https://github.com/mqttjs/MQTT.js/blob/2a9b8fbe100f8cf84c0d7b784c5912780a3e54a7/lib/client.js#L1308

I think that there is no random conflicts.

There is another messageId conflict. Consider this scenario.

  1. Client CONNECT to the broker.
  2. Client PUBLISH a message QoS1, and MQTT.js allocate messageId:10 (randomly).
  3. The broker doesn't send PUBACK messageId 10. (Some trouble?)
  4. Client PUBLISH a message QoS1, and MQTT.js allocate messageId:11 (incrementally).
  5. The broker sends PUBACK messageId 11.
  6. Do the step 4 and 5 repeatedly and then the last messageId is 9.
  7. Client PUBLISH a message QoS1, and MQTT.js allocate messageId:10 (incrementally). But messageId 10 is in-flight.

It is not directly related to mqtt-level-store. It is MQTT.js's issue. If we want to solve the issue, I think that we need to store in-flight messageId to some map and check it in _nextId().

Step 3 is some broker trouble situation. If offline publish situation, it could happen without the broker trouble.

@mugul, could you clarify the problem?

mugul commented 4 years ago

Sorry for the confusion with the incrementally messageId... I'll try to explain it better:

The problem is when you have no internet for a long time.

  1. Client CONNECT to the broker.
  2. Client PUBLISH a message QoS1, and MQTT.js allocate messageId:10 (randomly).
  3. Client receives PUBACK messageId 10.
  4. Client looses connection with broker.
  5. Client PUBLISH a message QoS1, and MQTT.js allocate messageId:11 (incrementally).
  6. MQTT is not connected, stores in outgoingStore (mqtt-level-store).
  7. mqtt-level-store saves 2 entries. The first one with key:"packets\~11" (11 is the messageId), value: timestamp. and the second one "packet-by-date~ timestamp ~ 11" (11 is the messageId).
  8. Do the step 5, 6 and 7 repeatedly and then the last messageId is 10.
  9. Client PUBLISH a message QoS1, and MQTT.js allocate messageId:11 (incrementally). But still not connected. Stores in outgoingStore (mqtt-level-store).
  10. packet\~11 Already exists... 'packet-by-date~' + _date + '~' + 11, also exists. Both packets (new and stored) have the same cmd "publish", return cb().

At this point, we didnt store the new package, because the key was "taken"...

I think we should be able to store any ammount of packets without using MQTT's messageId.

I think it could be store's problem, because when MQTT connects, just ask's for a stream of outgoing messages, and doesn't mind how it was stored... only requires that the package has the messageId (but i think is not required to be the key of the storage).

The mapping you're saying could be usefull if we change our storage key, to know which packets are in-flight, and which have been sent. (Because we will need a way to relate our internal storage key, and the messageId MQTT is using to send the packet to the broker)

Or maybe I'm understanding something wrong.

Please tell me if it's still not clear enough, to try to find another way to explain it.

tkurki commented 4 years ago

MQTT.js create the first messageId randomly

Is this persisted someplace or is it random per process start? The code links look like it is just in memory, so (if you have written a considerable amount of messages to the outgoing store) there is a definite chance for collision there are as well. If not directly at start then some time after as the advancing id may collide with ids already in store.

redboltz commented 4 years ago

@mugul,

I think we should be able to store any ammount of packets without using MQTT's messageId.

It would cause big breaking change. After user's code call publish with QoS1/2, user expects they can get the messageId(MQTT Packet Id) using getLastMessageId. See https://github.com/mqttjs/MQTT.js/blob/master/lib/client.js#L1322.

The current publish interface, user cannot know the message is stored or sent to the network. So after publish is called, getLastMessageId should return the allocated messageId event if the packet is only stored. That limits the number of storing packet (up to 0xfffe). I think that it is reasonable.

If user want to store more than the limit, I think that user should store the message at the user program side.

If you want to change MQTT.js to store more than 0xfffe packets, you need to prepare some interface the message is stored (and messageId is NOT allocated) or sent (and messageId is allocated). I personally think that it is too complicated interface.

tkurki commented 4 years ago

Do you find the current behaviour, where messages may be randomly dropped, especially so in case of restarts, reasonable?

mcollina commented 4 years ago

This might need fixing in MQTT.js.

It seems the obvious mistake is for MQTT.js to issue the same id twice

tkurki commented 4 years ago

Continue in a linked MQTT.js issue?

mugul commented 4 years ago

@redboltz

If user want to store more than the limit, I think that user should store the message at the user program side.

When MQTT connect's, ask's for pending messages to the storage.

This is achieved asking to the outgoing store "createStream()", and the storage returns a stream with all the remaining packets.

But, MQTT asumes that each packet has the "packet.messageId" property, and it uses this messageId to send the packet to the broker. (And this ID has to be from 1 to 65.535 by protocol definition)

This is why, I still think that the mistake is to use this ID as Key in the storage.

Even if I store the packet on my own (and I'm doing so with mqtt-level-store), the storage requires "messageId" to be used as key, and that ID limit's the storage to 65.535 packets.

The only way I can see this as an MQTT "fix" is if the messageId is generated when sending the packet, and doesn't require's it from the Storage.

redboltz commented 4 years ago

@mugul , if outgoingStore() is only for offline store, you can achieved the goal. But it is used for re-sending publish message. (I personally think that it is the primary purpose.) And when re-sending, the client needs to use the same messageId as the before sent.

In addition, there is order problem. Consider the following scenario:

  1. connect to the broker with clean session false.
  2. connack received
  3. send publish QoS1 with messageId 10
  4. disconnected from the broker side
  5. (offline) publish QoS1 message (without messageId ??)
  6. re-connect to the broker (with clean session false.)
  7. connack received
  8. should automatically resend the message at the step3 with messageId 10
  9. should automatically send the message at the step5. Maybe messageId is allocated internally but user often want to know the allocated messageId, how?
tkurki commented 4 years ago

@redboltz you have stressed the importance of user's access to messageId twice in this thread. I don't quite see the importance for this use case. messageId can not be used for anything meaningful, only for informative purposes like debug logging, because there are no guarantees:

Making messgeId available only in publish callback would make sense to me. Except that the publish callback api is inherently broken, it may never be called. Consider the sequence

This could be fixed with a publishComplete event. If the client needs to collate when a given message is actually published successfully it would need to use some inline data, as messageId can not be used for this purpose.

So what if we had

redboltz commented 4 years ago

It is one of messageId purpose. See https://github.com/mqttjs/MQTT.js/pull/658

tkurki commented 4 years ago

Thanks. Right, so you need to be able to at least delete messages that have been assigned messageId from the outgoing store. However if we have been disconnected the whole time we could have messages with just storeMessageId, no messageId yet assigned.

redboltz commented 4 years ago

@tkurki , I'm not sure the following point.

If MQTT.js client receives puback or pubcomp, MQTT.js needs to remove the message that has the messageId in puback or pubcomp packet. If MQTT.js client receives pubrec (and send back pubrel automatially), MQTT.js needs to update the publish message to pubrel message that has the messageId in pubrec packet. It happens every publish sequence except QoS0. So it should be done efficiently. So outgoingStore should be searched by messageId.

How to achieve this?

mugul commented 4 years ago

@redboltz MQTT give's you the whole packet to store/update/delete, you can have your search ID inside the packet, leaving the messageId for MQTT only.

If it doesn't has the key (lets call it storeMessageId), then you create a new storeMessageId because is a new Packet. If the Packet already has a storeMessageId, you can use it to find the right packet by key.

tkurki commented 4 years ago

I'm shooting (a) from the sidelines (b) from the hip here, so my suggestions may or may not make sense. But here goes:

@redboltz In flight messages would have messageId per my suggestion above. As you say you need to be able to update messages in the store with messageId as the key.

To support your oops these messages will never get there you should be able to cancelMessage(messageId) or retractMessage(messageId), but imho that belongs in the client api, one should not need to modify the outgoing store directly.

@mugul see the linked MQTT.js issue above and the use case there - you must be able to access (cancel) a message by messageId if you are doing it based on data from the broker, that deals only with messageId.

But messages that are written to the outgoing store when the broker is offline should be assigned messageId when sending is actually tried, not when the client calls publish. And the store should manage the messageId, as it needs to take into account what is in flight.

One way to achieve this would be to assign (and persist) unique messageIds when a message is placed in the stream by the store.

redboltz commented 4 years ago

@redboltz In flight messages would have messageId per my suggestion above. As you say you need to be able to update messages in the store with messageId as the key.

Yes, that is my point.

leveldb is a key value store. That can have one key. We can use prefix table~ to emulate multiple tables and mqtt-level-store already used this approach. In addition, leveldb can read the data only by key order.

The current schema is as follows:

table key value purpose
packets messageId date specify packet-by-date record by messageId
packet-by-date date+messageId packet body store packet body by insertion order

packet-by-date make sure the correct order.

Sending packets from the store is implemented as https://github.com/mcollina/mqtt-level-store/blob/master/mqtt-level-store.js#L111

NOTE: date contains _sameDateCount to keep order. NOTE: messageId is the same as MQTT Packet Identifier

In order to expand the store over than 0xffff, and works well with existing messageId updating mechanism, we need to do some hack.

It is just idea not well considered but...

table key value
packets storeMessageId date
packet-by-date date+storeMessageId packet body

If online publish is happend, storeMessageId is the same as messageId (MQTT Packet Idenditifer). If offline publish is happend, sotreMessageId is over than 0xffff. e.g.) 0x10000, 0x10001, 0x10002, ... And user can get the storeMessageid using getLastMessageId. And user can judge it is online published or offline published (stored). We can provide the helper function isPublished(storeMessageId). We can also use cancelMessage(storeMessageId). I think that it can minimize breaking existing code.

When the client(MQTT.js library implementation) receives CONNACK, then sends packet-by-date. At this point, the client can get storeMessageId, and if it is offline (greater than 0xffff), then allocate a messageId and replace the storeMessageId with the messageId.

It is different but releated topic that MQTT.js needs to have allocating the new messageId without confliction as I mentioned https://github.com/mcollina/mqtt-level-store/issues/19#issuecomment-558042165. And if the client ran out the messageId, then stop sending from the outgoingStore and if the client received puback/pubcomp, then continue sending.

tkurki commented 4 years ago

Just an idea, but mqtt-level-store already uses two leveldb databases, outgoing and incoming. You could store outgoing, queued but not yet in flight messages in a third leveldb.

robertsLando commented 1 year ago

I have created a new store that could help with this: https://github.com/robertsLando/mqtt-jsonl-store