smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
230 stars 174 forks source link

Add configuration to outgoing MQTT connector to set retain property #2663

Open diversit opened 3 weeks ago

diversit commented 3 weeks ago

Issue

With the MQTT Connector, it is currently not possible to set the 'retain' property just by configuration for an outgoing MQTT channel.

Now, instead of just returning or emitting the payload for a message, an instance of MqttMessage must be returned to allow to set the retain property to True.

Example:

MqttMessage.of("a-topic", myValue, MqttQoS.AT_LEAST_ONCE, true))

The topic and qos settings can (need to ?) be set in configuration, but retain property cannot. It's unfortunate that, when the message must be retained, the topic and qos values now also must be set in code.

I assume that the MQTT Connector only uses those configuration values to construct a MqttMessage instance and that by creating a MqttMessage in code, this take priority over the configuration values.

Proposal

Add a 'retain' property for outgoing MQTT channel to set the retain property of a outgoing MQTT message.

Example:

mp.messaging.outgoing.my-channel.connector=smallrye-mqtt
mp.messaging.outgoing.my-channel.retain=true
cescoffier commented 3 weeks ago

Fancy a PR? It should be easy to add, as you need:

  1. Add a connector attribute
  2. Pass the configuration of the MQTT client

Of course, we can guide you.

diversit commented 2 weeks ago

I made a PR: #2665

I was already looking at it Friday. The changes did not seem that difficult indeed. I made the necessary changes including a test case, but the test case is failing and I can't figure out why. It seems to go wrong in the MQTT client code. While debugging, the retain property is set when it's being serialised to a byte[] but when deserialising the retain values seems to have been lost.

Any idea how to fix it? Or is my test case wrong?