akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 645 forks source link

define Committable #104

Open patriknw opened 7 years ago

patriknw commented 7 years ago

For sources that read from a queue and acknowledge consumption later downstream.

Something like Committable in akka-stream-kafka. https://github.com/akka/reactive-kafka/blob/master/core/src/main/scala/akka/kafka/ConsumerMessage.scala#L37

johanandren commented 7 years ago

Should it go in Alpakka and not directly in Akka Streams?

ktoso commented 7 years ago

It can start here, kind of as an incubator?

drewhk commented 7 years ago

It should not be just Committable, but Repliable. Commit is a specific use case of in-stream replies.

drewhk commented 7 years ago

Also, an adapter stage is needed that takes a flow of Reply-Request and turns it into a Source[Repliable]

patriknw commented 7 years ago

I was creating the issue here to spark discussion and explore it with concrete examples from the Alpakka connectors. When it has matured the interfaces should move to akka-stream so that same can be used in Alpakka and akka-stream-kafka (and other places)

filosganga commented 7 years ago

I need it as well for ironmq

pspiegel commented 7 years ago

We (SBB) also need this feature in order to acknowledge (commit) or discard (rollback) messages from the JMSConnector manually later downstreams. Furthermore, after a few (unsuccessful) retries we wanna put the message in some kind of a backout Q.

Falmarri commented 7 years ago

I have a PR out that is a first draft of this functionality, but I didn't see this issue before I wrote it. It's not nearly as safe as the design suggested in this version.

https://github.com/akka/alpakka/pull/292

ssturdivant commented 7 years ago

Where does this ticket lie on the development roadmap? I have a project requiring application-level JMS ack, and need to decide between waiting for this ticket or implementing another solution.

Thanks.

patriknw commented 7 years ago

Akka team is not working on this and we don't have it in plans for the next month.

EmilDafinov commented 6 years ago

Since https://github.com/akka/alpakka/pull/483 went in, would that count as a good example of Commitable / Repliable as described here? Would the CommittableIncomingMessage introduced work as an interface for such things?

I like the initial example as well, where the Commitable is just an interface with a single 'commit' method. Maybe then there should be a corresponding Rejectable trait, for cases where messages have to be explicitly rejected? I'm thinking that would be rather nice, since rejecting makes sense for some queueing systems (AMPQ) and not so much for others (Kafka)

juanjoDiaz commented 6 years ago

I've also added a committable message to MQTT following the same pattern in #515 (still WIP).

I think that all the systems are different: some can reject and other not, some require some parameters and some require others, etc.

So I'm not totally convinced of a general committable trait being a good idea.

EmilDafinov commented 6 years ago

@juanjoDiaz, thanks for the update:) Is MQTT one of the ones requiring parameters? I'd love to see an example of that. Other than the need for reject and possibly parameters, can you think of any other possible considerations?

If a general Committable did turn out to be practical though, something occurred to me:

Again, it is not something that I have too much experience with, but given the two points above, it is beginning to smell a bit like a monad to me. Thoughts?

juanjoDiaz commented 6 years ago

MQTT doesn't require parameter (It does but we figure them out without the user providing them). But AMQP and Kafka do for example. So I expect other sources to have also params.

It's clear that most sources that are committable will return a wrapper containing the message and the methods for ack/nack. However, the message & the methods will be different for each source.

I think that to have a commoncommittable we would need a commonmessage first. I'm not sure of what is the benefit of this considering that they'll be different in all connectors anyway.