akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 645 forks source link

MQTT: Offer RestartSource helper? #1043

Open ennru opened 6 years ago

ennru commented 6 years ago

Would it make sense to add code in Alpakka to wrap MQTT sources with a RestartSource which exposes the inner Future[Done] which indicates successful subscription?

cc @huntc

huntc commented 6 years ago

Maybe, but perhaps the better place to fix it is within the existing source. I’m unsure why maintaining an MQTT connection is considered optional (aside from this being the behaviour of the underlying library).

I don’t generally like abstracting over existing Akka stream stages. Perhaps a source code example may help.

aisven commented 5 years ago

@huntc @ennru I am using MqttSource.atLeastOnce from alpakka 1.0-M1. Due to the warning/hint in the documentation I want to wrap it in a RestartSource.

Now, the materialized value is a Future[Done] that "completes on successful connection to the MQTT broker". Of course it is good that the source only actually connects to a broker on materialization.

However, how should I deal with this future in terms of the RestartSource?

A side note: I am even considering to use RestartSource.withBackoff instead of RestartSource.onFailuresWithBackoff, because I really want to create an infinite stream so that even on regular completion of the MqttSource I get a restart. I can then, when desired, still stop streaming by "the downstream cancelling, or externally by introducing a KillSwitch right after this Source in the graph".

Edit: Quotes are from scaladoc of MqttSource and RestartSource.

aisven commented 5 years ago

@huntc @ennru I am not sure, but would it perhaps be the best solution to change MqttSource so that it no longer provides the Future[Done] on successful connection as materialized value, but instead fails on unsuccessful connection?

This would more directly go along with the RestartSource which wraps "the given Source with a Source that will restart it when it fails using an exponential backoff".

huntc commented 5 years ago

You might want to try the new mqtt-streaming Alpakka project. Its connection management is much more explicit.

@ennru When do you think the nightly docs will be published?

2m commented 5 years ago

@sourcekick currently, until https://github.com/akka/akka/issues/24771 gets implemented you will need to get the materialized value from the inner source by calling mapMaterializedValue and then sending that value somewhere else.

@huntc current snapshots docs are in flux while we improve docs build process. We'll get back to that rather soon.

aisven commented 5 years ago

@2m That day I actually found that mapMaterializedValue to solve my problem. I like the idea in akka/akka#24771 too.

@huntc I looked into the code of mqtt-streaming for a bit, it is definitely interesting. Looking forward to the docs. On that note, I hope they are going to include some pictures in the sense of architecture drawings and how-to to motivate and show the scenarios.

2m commented 5 years ago

The snapshot docs have been published:

https://doc.akka.io/docs/alpakka/snapshot/mqtt-streaming.html https://doc.akka.io/api/alpakka/snapshot/akka/stream/alpakka/mqtt/streaming/index.html