akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 645 forks source link

AMQP: AmqpSource materialized value #1190

Open FXHibon opened 6 years ago

FXHibon commented 6 years ago

Hello,

When using alpakka to consume AMQP messages, I would like to have the AmqpSource emits a Future[Done] as a materialized value. Unfortunately, we can only create Source[Message, NotUsed]

This Future would have the same behavior that the AmqpSink, meaning it would be complete with success as soon as the AMQP connection is up, and would complete with an error if the connection could not be created.

val source: Source[CommittableIncomingMessage, Future[Done]] = AmqpSource.committableSource(settings, bufferSize)
val fConnection: Future[Done] = source.to(Sink.ignore).run()
fConnection.onComplete {
  case Success(_) => logger.info("app is up")
  case Failure(err) => logger.error("cannot connect to amqp", err)
}

If you think it's a good idea, I might do a PR to implement this behavior

huntc commented 6 years ago

Are you able to elaborate on your use case? It appears as though you want to determine whether an AMQP broker is reachable from looking at your example. If that's the case then I'd say such an approach would be flawed - you can only observe the past - always. Thus, by the time you observe when you have a connection, by the time you do something with it the connection may no longer be there.

In general, Alpakka projects provide a Source and Flow with a convenient Sink for Flow. The latter is hardly ever used in my experience but is good to get people started quickly.

FXHibon commented 6 years ago

It appears as though you want to determine whether an AMQP broker is reachable

Not exactly. I would not use this Future to monitor the current state of the connection obviously.

But when your application is starting, you may want consider it fully started (healthy) only when the connexion to the broker is up, and not before. Without this materialized value, this is not possible.

You still have to react to loosing the connection during the application lifecycle, but in this case, the Source complete, so you can easily monitor this.

huntc commented 6 years ago

But when your application is starting, you may want consider it fully started (healthy) only when the connexion to the broker is up, and not before. Without this materialized value, this is not possible.

I personally wouldn't include whether a connection to AMQP constitutes the app as being healthy on startup.

I wouldn't recommend extending the AMQP Source to return a Future. Instead, my recommendation is to rely upon the existing composability.

FXHibon commented 6 years ago

I wouldn't recommend extending the AMQP Source to return a Future

And yet it is the current behavior of AmqpSink, and it sounds quite relevant to me.

Instead, my recommendation is to rely upon the existing composability.

I'm not sure to understand how I can get this kind of behavior without having to modify the implementation of the AmqpSourceStage. What do you have in mind?

huntc commented 6 years ago

I'm not sure to understand how I can get this kind of behavior without having to modify the implementation of the AmqpSourceStage. What do you have in mind?

val done = AmqpSource.committableSource(settings, bufferSize)
  .runWith(Sink.head)
  .map(_ => Done)

However, don't you have another problem here? Once you've consumed from the queue then that element is lost.

FXHibon commented 6 years ago

Ok, I think there is a misunderstanding :) By doing

val done = AmqpSource.committableSource(settings, bufferSize)
  .runWith(Sink.head)
  .map(_ => Done)

you just run a one-shot consumer, and watch the completion of this process but this is not the problem I would like to solve.

Here is an use case:

val consumerCompletion: Future[Done] = AmqpSource.committableSource(settings, bufferSize)
  // do stuff with messages, ack/nack it
  // it never stops, unless connection failed
  .runWith(Sink.ignore)

consumerCompletion.onComplete { reason =>
  println(s"amqp connection lost because of $reason")
  // clean some stuff and then STOP the app here
  // because it's not health without amqp connection
}

This is a standard use case: a never ending consumer. When it stops because there is a problem with amqp, the whole application must stop.

But you can not know when your application is up & running, because you have absolutely zero vision on the event the connnection has just been made.

This would be solved if the AmqpSource emits a Future[Done] completing when the connection is up, or couldn't be created.

huntc commented 6 years ago

Do you really want your program to stop if it loses connectivity with AMQP? Losing connectivity is quite normal. Have you considered RestartSource and friends? (https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-error.html)

Also, how about using the monitor stage?

FXHibon commented 6 years ago

Do you really want your program to stop if it loses connectivity with AMQP?

If my service cannot run normally, yes I want it to stop. If it can't start normally, I don't want an health check telling me that "everything is ok" whereas it's not even fully started.

Whatever the behavior you choose to implement, I think it's completely relevant to be able to know whether the connection was successful or failed. And then you implement the behavior that match your needs.

Losing connectivity is quite normal

Yes, it happens. And that's why you want to handle this safely: don't tell you're app is up & running before you know if you can connect to the broker

juanjoDiaz commented 6 years ago

I'm with @huntc in that the connection to RabbitMQ will probably come and go. It's sort of expected. So the fact that the client connects once doesn't tell you much about your app health.

However, I also understand @FXHibon request. It doesn't do any harm, some people might find it useful and other connectors behave like that (the MQTT one for example).

In any case, I think that it would be good to come to an agreement about what should the different stages return and keep it consistent across all connectors. Currently we have connectors returning NotUsed, KillSwitch, Future[Done], etc. @ennru any opinion on this?

ennru commented 6 years ago

I believe this is a relevant suggestion.

In general, I'd say

queimadus commented 1 year ago

I was looking for a way to perform a graceful shutdown of an Amqp Source while draining already pulled elements from the broker but I'm not seeing any interface that allows me to do that.

Adding a KillSwitch in front of the source wouldn't guarantee we would drain the source's internal queue.

I would expect that the materialized value of the source offered an interface to close the underlying connection/channel, prevented any more messages from being consumed from the broker, and allowed the connected stream to process the queued messages and terminate successfully.

Is there a way to perform a graceful shutdown with the current interface?