IBMStreams / streamsx.messaging

This toolkit is focused on interacting with popular messaging systems such as Kafka, JMS, XMS, and MQTT. After release v5.4.2 the complete toolkit will be deprecated. See the README.md file for hints to alternative toolkits.
http://ibmstreams.github.io/streamsx.messaging/
Apache License 2.0
30 stars 32 forks source link

MQTT reconnectionBound behaviour incorrect. #312

Closed ddebrunner closed 3 years ago

ddebrunner commented 7 years ago

The MQTT operators have a parameter reconnectionBound that limits the number of connection retries.

This optional parameter of type int32 specifies the number of successive connections that are attempted for an operator. Specify 0 for no retry, n for n number of retries, -1 for inifinite retry.

However the behaviour when the connection retry limit is reached is not documented.

The current behaviour is to just stop trying to connect leaving the operator in a healthy but totally pointless state.

I would say the operator should fail (throw an exception) once it cannot reconnect, thus highlighting to any monitoring tools that there is an issue with the application.

Alex-Cook4 commented 7 years ago

+1 to

I would say the operator should fail (throw an exception) once it cannot reconnect, thus highlighting to any monitoring tools that there is an issue with the application.

I would consider it ideal for users to have unlimited reconnection attempts and instead monitor the "isConnected" metric, but I agree that while in general we don't want Streams operators to go unhealthy because of an external system (making it look like a Streams problem), having that option is useful. Especially since once we "cannot reconnect" it has become a Streams problem.

ddebrunner commented 7 years ago

Looking at this line, it would seem the intention is to cause the operator to fail, need to verify what is happening.

https://github.com/IBMStreams/streamsx.messaging/blob/master/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttClientWrapper.java#L220

ddebrunner commented 7 years ago

The code in the async wrapper is slightly different, it throws a RuntimeException: https://github.com/IBMStreams/streamsx.messaging/blob/master/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttAsyncClientWrapper.java#L137

which them will cause different behaviour at:

https://github.com/IBMStreams/streamsx.messaging/blob/master/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java#L482

ddebrunner commented 7 years ago

Not sure the MqttAsyncClientWrapper is used though.

If it is unused it would be good to remove it, to make understanding the operators easier.

ddebrunner commented 7 years ago

I can reproduce the operator not being connected after 5 retries while remaining healthy.

In this case it was a user authorization error at line 262 of MqttClientWrapper.

schubon commented 7 years ago

+1 to

I would say the operator should fail (throw an exception) once it cannot reconnect, thus highlighting to any monitoring tools that there is an issue with the application.

If the exception message states the connection problem to the peer it should be obvious to the user that it is not a Streams problem, isn't it?

Alex-Cook4 commented 7 years ago

If the exception message states the connection problem to the peer it should be obvious to the user that it is not a Streams problem, isn't it?

Eventually yes, but in the meantime the users just see "failure in Streams". It depends on how proficient the user is before they even see the exception message unfortunately (at least in my experience at customers).

I think this behavior is acceptable, but I think of this parameter as pointless unless I want my operator to fail as warning sign. i.e. Why would I give up trying to connect? The failure is just a clear way to indicate the problem, rather than setting up more complex monitoring.

ddebrunner commented 7 years ago

I did some more investigation and the operator is throwing an exception and expecting to fail (terminate) in this condition, I need to do some more digging as to why it isn't.

(this includes comments indicate the expected behaviour is to fail).

ghost commented 7 years ago

I observed the expected behavior for the MQTTSource. (Expected = retry N times, then restart until relaunch counter expired. After (relaunch-count +1) x (reconnectionBound +1) connection attempts the PE stays unhealthy). For the MQTTSink, I observed that the operator did not tried to reconnect the expected number of retries. It simply stopped retrying and remained healthy. Maybe, the MQTTSink dead-locked. This behavior is not reproducible, however. This issue needs more investigation.

schubon commented 3 years ago

Customers are already using the new toolkit https://github.com/IBMStreams/streamsx.mqtt