vert-x3 / vertx-mqtt

Vert.x MQTT
Apache License 2.0
184 stars 86 forks source link

MQTT client never removes messages from outbound queue if not ACKed #162

Closed sophokles73 closed 4 years ago

sophokles73 commented 4 years ago

Version

Which version(s) did you encounter this bug ?

3.9.2

Context

When the MQTT client is used to publish a message using QoS 1 and the server does not send a PUBACK for the message, then the message will remain forever in the client's qos1outbound queue. If this happens repeatedly, the limit set by maxInflightQueue will eventually be exceeded and the client can no longer send any messages.

The client itself (currently) does not keep track of pending PUBACKs by means of timers. Thus, it is the application level code's responsibility to do so. However, the client does not provide any means to remove a timed out PUBLISH attempt from the outbound queue once it determines that it won't wait any longer for a PUBACK. The client seems to have a similar problem with CONNECT as pointed out in #17

I can see two options to address this issue:

  1. Add support for timing out a PUBLISH request in the client itself or
  2. Allow application code to declare a PUBLISH request as failed, thus removing it from the outbound queue.

I guess option 2 could be easily added without breaking backwards compatibility. The groundwork done as part of that could later be used to implement option 1 as well.

WDYT? I can create a PR for any or all of the options ...

vietj commented 4 years ago

I think best option would be a timer that removes the pending message and fails the associated future is failed with a TimeoutException.

In addition we should provide two event handlers

Both provide the opportunity to the application to save the state somewhere (like a persistent storage) and later check it with the other handler.

sophokles73 commented 4 years ago

I think best option would be a timer that removes the pending message and fails the associated future is failed with a TimeoutException.

Sounds good.

In addition we should provide two event handlers

We could do that by means of overloading the publishCompletionHandler method with new/different handlers, e.g.

MqttClient publishCompletionHandler(Handler<AsyncResult<Integer>> publishCompletionHandler, Handler<Integer> phantomAckHandler);

FMPOV we should then deprecate the original one. WDYT?

vietj commented 4 years ago

I think it would be better another distinct handler

vietj commented 4 years ago

that being said I think we cannot use AsyncResult here instead we should have 3 handlers

sorry for bugging with TimeoutException

sophokles73 commented 4 years ago

I see, we need to convey the message ID in case of an expired PUBLISH. Makes sense. But we still agree that when a PUBLISH expires, we always remove the message from the outbound queue, right? I can create a corresponding PR based on this, ok? @vietj WDYT?

vietj commented 4 years ago

yes that's the goal you initially had

vietj commented 4 years ago

yu will get reviewed anyway

sophokles73 commented 4 years ago

@vietj should I create the PR against master or the 3.9 branch?

vietj commented 4 years ago

you can PR against master, we review it then we back port.