Closed aitorhh closed 6 years ago
Hi Aitor, Can you add code example of what you would expect? AFAIK the MQTT connector fails its stage if the connection drops.
Hi Juanjo,
The following application is used to illustrate the errors I want to get notified of in the application:
package iot.cluster
package processors.test
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import akka.stream.scaladsl._
import akka.util.ByteString
//#https://developer.lightbend.com/docs/alpakka/current/mqtt.html
import akka.stream.alpakka.mqtt._
import akka.stream.alpakka.mqtt.scaladsl._
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
object MqttConnectionTest {
def apply(
subTopic: String,
inConf: MqttConnectionSettings,
outConf: MqttConnectionSettings
) = {
new MqttConnectionTest(subTopic, inConf, outConf)
}
def main(args: Array[String]): Unit = {
val urlIn = args(0)
val idIn = args(1)
val subTopic = args(2)
val urlOut = args(3)
val idOut = args(4)
println(s"mqtt forwarder > from $urlIn (clientId: $idIn); topic $subTopic; to $urlOut (clientId: $idOut)")
MqttConnectionTest(subTopic,
MqttConnectionSettings(urlIn, idIn, new MemoryPersistence ),
MqttConnectionSettings(urlOut, idOut, new MemoryPersistence ))
}
}
class MqttConnectionTest(
subTopic: String,
inConf: MqttConnectionSettings,
outConf: MqttConnectionSettings
){
import MqttConnectionTest._
import GraphDSL.Implicits._
val decider: Supervision.Decider = {
case e =>
println(s"Error processing: $e. Stopping")
Supervision.Stop
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withSupervisionStrategy(decider)
)
val settings = MqttSourceSettings(inConf, Map(subTopic -> MqttQoS.atLeastOnce))
val mqttSink = MqttSink(outConf, MqttQoS.atLeastOnce)
val mqttSource = MqttSource(settings, bufferSize = 8)
mqttSource
.map(msg => {println(msg); msg})
.to(mqttSink)
.run
}
And these are the mqtt broker running locally with docker:
docker run -p 1883:1883/tcp -it eclipse-mosquitto
docker run -p 1884:1883/tcp -it eclipse-mosquitto
The following tests executed are:
Broker available
sbt "runMain iot.cluster.processors.test.MqttConnectionTest tcp://localhost:1883 sourceId # tcp://localhost:1884 sinkId"
Messages printed in the terminal.
No broker available
sbt "runMain iot.cluster.processors.test.MqttConnectionTest tcp://localhost:1885 sourceId # tcp://localhost:1886 sinkId"
No message/error is being thrown.
This can be considered as if there were no messages sent to the topic because the application continues its execution.
No sink broker available
sbt "runMain iot.cluster.processors.test.MqttConnectionTest tcp://localhost:1883 sourceId # tcp://localhost:1885 sinkId"
No message/error is being thrown and messages from subTopic not shown.
This can be considered as if there were no messages sent to the topic because the application continues its execution.
No source broker available
sbt "runMain iot.cluster.processors.test.MqttConnectionTest tcp://localhost:1885 sourceId # tcp://localhost:1884 sinkId"
No message/error is being thrown.
This can be considered as if there were no messages sent to the topic because the application continues its execution.
Source disconnected
sbt "runMain iot.cluster.processors.test.MqttConnectionTest tcp://localhost:1883 sourceId # tcp://localhost:1884 sinkId"
Messages printed in the terminal until broker is disconnected. Afterwards, no messages is printed.
Re-connection does not work either. (Should I create another issue?)
Sink disconnected
sbt "runMain iot.cluster.processors.test.MqttConnectionTest tcp://localhost:1883 sourceId # tcp://localhost:1884 sinkId"
Messages printed in the terminal until sink broker is disconnected. Afterwards, no messages is printed.
Re-connection does not work either.
Same client id
With two applications running with the same clientId:
sbt "runMain iot.cluster.processors.test.MqttConnectionTest tcp://localhost:1883 sourceId # tcp://localhost:1884 sinkId"
sbt "runMain iot.cluster.processors.test.MqttConnectionTest tcp://localhost:1883 sourceId # tcp://localhost:1884 sinkId"
Only, the latest application executed works, the firsts stops without any message.
The error missing on two consumers with the same clientId is an issues with most MQTT libraries (they don't report when a client is kicked out). I'm following up the issue in https://github.com/eclipse/paho.mqtt.java/issues/461 and I'll fix it in alpakka as soon as it is fixed in Paho.
Automatic reconnect is not enabled by default. It has to be enabled.
Otherwise, maybe I'm misunderstanding but I don't think that an exception should be thrown from the stream. It just fails the stage and all the stages upstream or downstream are failed. Are you expecting something else?
Thanks for the information.
When you say that the stream fails, how can the user/system be notified of the failure? That's actually what I'm after. Seems that I am missing something that already exists.
In the previous examples, the application just remains running without any message, log or anything that indicates the error.
On Mon, 18 Dec 2017, 13:05 Juanjo Diaz, notifications@github.com wrote:
The error missing on two consumers with the same clientId is an issues with most MQTT libraries (they don't report when a client is kicked out). I'm following up the issue in eclipse/paho.mqtt.java#461 https://github.com/eclipse/paho.mqtt.java/issues/461 and I'll fix it in alpakka as soon as it is fixed in Paho.
Automatic reconnect is not enabled by default. It has to be enabled.
Otherwise, maybe I'm misunderstanding but I don't think that an exception should be thrown from the stream. It just fails the stage and all the stages upstream or downstream are failed. Are you expecting something else?
— You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub https://github.com/akka/alpakka/issues/635#issuecomment-352407620, or mute the thread https://github.com/notifications/unsubscribe-auth/AEM0r2ZV-_rcoM_BZ0PRgxxcCGYZLt4Gks5tBlUMgaJpZM4Q6DqS .
Great to see these features added. I close the issue. Thanks!
Hi,
I am really missing the error reporting in the mqtt connector. In my application, I have no clue if the connector is in "connected" state or not. Is there any way that the streams stop working and sends an exception to its supervisor? For example, I current handle other exception in my stream with, by defining the following:
Currently, I am completely blind on error cases with mqtt. I hope there is already a solution that I haven't figured it out yet.
Thanks!