smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
241 stars 179 forks source link

MQTT - Add support for Qos=2 for Mqtt Session #2285

Open kahlai opened 1 year ago

kahlai commented 1 year ago

Missing QoS implementation

When configure application.properties.

mp.messaging.incoming.<name>.qos=2

Hit error.

Caused by: java.lang.IllegalArgumentException: SRMSG17001: Invalid QoS value: 2
146at io.smallrye.reactive.messaging.mqtt.session.RequestedQoS.valueOf(RequestedQoS.java:34)
147at io.smallrye.reactive.messaging.mqtt.MqttSource.<init>(MqttSource.java:57)
148at io.smallrye.reactive.messaging.mqtt.MqttConnector.getPublisher(MqttConnector.java:88)

Note "This is missing QoS 2, as this mode is not properly supported by the session." found in this class. https://github.com/smallrye/smallrye-reactive-messaging/blob/main/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/RequestedQoS.java

ozangunalp commented 1 year ago

Hi, There are two things preventing us from supporting QoS:2. The first is that the MQTT connector is based on auto-ack. For QoS:2 to make sense we need to implement acks on MQTTMessages. The second is the MqttSession implementation which, as you've seen in the comment, may not support QoS:2. I need to check but I suspect that this is due to re-creating the Mqtt client on each reconnect.

Maybe by handling acks properly, we can get away with it. In any case, we need some more resiliency tests for the MQTT connector.

kahlai commented 1 year ago

I see, thanks for looking into this, not aware about the session and acks implementation. Do you think it make sense to have connection pooling capabilities instead of re-creating the Mqtt client on each reconnect? I might be wrong but re-create client and establish connection for every message sound like an expensive process.