Open zackangelo opened 9 years ago
Hi ! I would also require this feature. However I have no idea how to implement it as the acknowledgment is also link to the back pressure of the stream.
Or maybe I am not understanding something. For example, on my current sample code :
Shouldn't the message stay in the queue due to back pressure of the flow ? Shouldn't messages be just prefetch and put back to the queue if not consumed when consumer dies ?
Thanks in advance for your help.
ACK before delivery was introduced in this commit: fdba020. Looks like this can be easily changed.
Channel.close()
cancels the subscription and prevents broker from sending new messages. There are still messages in buffer.Channel
in which it was received.cancel
in rule 2.8.The way cancel()
is handled in QueuePublisher.scala
causes Channel
to be closed. In consequence all in-flight messages can't be acknowledged anymore. Just chaning the order in which delivery and ACK is perfomed will lead to situation where message is successfuly delivered to the subscriber but never acknowledged to the broker. In current implementation approximately 1 message may sneak in between.
AlreadyClosedException
from channel.basicAck. That way subscriber may receive approximately 1 message that will be never acknowledged to the broker.QueueSubscription.cancel()
could be implemented by Channel.basicCancel(String). In-flight messages could be still acknowledged to the broker and Channel
would be closed after QueueSubscription.buffer dries out.buffer
are discarded and Channel
is closed. This is somehow complicated to implement because we would need to know whether there is a message in delivery.I'm open to discussion and other solutions.
@vrabeux wrote:
Or maybe I am not understanding something. For example, on my current sample code :
- if the publisher send say 10 messages in less than 1 sec.
- the consumer takes care of 1 message in 10 seconds.
- The consumer dies on the second message.
Under current implementation: Message 1 and 2 are acknowledged to the broker because they were delivered to the subscriber. The thing under discussion is whether the message should be acknowledged only after subscriber returned without exception.
=> If I look at my queue. All message are consumed which is not really the case.
Only 2 messages were acknowledged to the broker. Consumer is allowed to prefetch (without acknowledging) messages to improve throughput. Current implementation sets prefetch to 20. Doing 1 by 1 fetch-delivery-ack would cause system to behave in synchronous manner and dramatically lower perfomance. Even Akka Streams does similar thing - they use 16 as their magic number.
Shouldn't the message stay in the queue due to back pressure of the flow ?
Once the subscriber dies, the subscription is considered canceled, the Channel
is closed and messages return to the queue.
Shouldn't messages be just prefetch and put back to the queue if not consumed when consumer dies ?
That is the current behaviour.
@vrabeux Indeed there was a bug in code that handles exceptions from onNext
: messages could stay in buffer as long as program is running even after onError
was signalled to the Subscriber
. I fixed that here: https://github.com/ScalaConsultants/reactive-rabbit/commit/8d014d99b866b4e819800769652a3277dbf80eaa
Generally speaking, the onNext
should never throw:
Rule 2.13:
Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.
@mkiedys Thanks for the reply.
I was more talking about an app crash or instance crash. All prefetch messages should be re-queued. However, since they are acked before the work is completed, they are not re-queued. Anyway, that may be a particular use case.
I agree that having a prefetch that is around 20 works in most cases. However, if we have a very slow service (lets say several seconds or worse minutes) a too big prefetch number can cause problems : a new worker would not be able to get some work if most works are prefetched by an other instance. On the contrary if the work is very fast, then 20 may even be a bit small.
All of these prefetch number, distributed work loads, and streams are really connected together and it really seam that akka-streams can be a powerfull tool to implement it. For now, I implemented my own "Source" that correspond to my use case and I use reactive-rabbit later in the flow. It works so far but it is way too "naive" to really use it.
The action of ACKing must be exposed to end-users of the library, since only the user of the stream knows exactly when an element should be considered as handled.
Imagine Source().map().map(NOW_HANDLED).map()
, as library author you won't know that after the second map the user considers the element as "handled", and the rest is maybe logging things etc.
Perhaps it would be possible to offer .mapAck()
however I'm not completely familiar rabbit APIs, and if that would be doable. Still, I think it should be exposed to users when to ack.
@ktoso Yes, that it why I decided to implement my own source : to be able to ack the message down the stream when I want it. The way I do it is however very "ugly" and "naive" since I need to wrap the rabbit Channel and delivery tag within the object I pass down the stream ... I don't even handle if the connection is closed, channel still alive or not etc ...
The .mapAck()
could be a way of ACKing the message but I am too much of a noob with akka-streams so I cant really tell. If we consider actors we could definitively chain (as in responsibility chain) a Ack(tag)
message to the original actor or one able to ack it. But for streams I dont think that would work : a stream seams to be one way.
After all I am wondering if akka-stream is even the correct way to go for my need.
@vrabeux wrote:
I was more talking about an app crash or instance crash. All prefetch messages should be re-queued. However, since they are acked before the work is completed, they are not re-queued. Anyway, that may be a particular use case.
All messages are returned to the broker once the subscription is canceled. Messages are always acknowledged one-by-one after delivery to the stream. Message that sits in the buffer is not acknowledged.
I agree that having a prefetch that is around 20 works in most cases. However, if we have a very slow service (lets say several seconds or worse minutes) a too big prefetch number can cause problems : a new worker would not be able to get some work if most works are prefetched by an other instance. On the contrary if the work is very fast, then 20 may even be a bit small.
If there are multiple consumers attatched to the queue than broker tries to do it best to share messages between them also taking into consideration if particular consumer is slow. So that should be not an issue. Have you tried coding example like this for your case and checking wherer one or all consumers get messages evenly?
@ktoso wrote:
Perhaps it would be possible to offer .mapAck() however I'm not completely familiar rabbit APIs, and if that would be doable. Still, I think it should be exposed to users when to ack.
Usual workflow is:
Prefetch stands for how many unacknowledged messages can be delivered to the consumer before broker stops delivering. It's essentially a base on which the reactive component is built.
I agree that it would be nice to have an option to ACK message deeper in the processing pipeline.
Risk is that this may cause stream to stall if user decides not to ACK a number of messages equal to prefetch. In this case subscriber would have to always ACK a particural message or reject it. The mapAck
would need a companions like mapReject
and mapRequeue
.
My original plan for this scenario was to provide a pair of streams:
It's essentially a Processor
under current design of Reactive Streams API. I have plan for this only if the userbase of the library grows to a point where it is reasonable to do additional work.
@mkiedys Sorry, maybe I am doing something wrong then or I may just dont understand how it works.
If we look at QueueSubscription.scala#L40
if(channel.isOpen()) {
channel.basicAck(delivery.deliveryTag.underlying, false)
subscriber.onNext(delivery)
}
This gave me the impression that the delivery
is sent after being acked. My yesterday tests were confirming this, but maybe it was due to a bug on my side.
I'll try to get a small sample working this weekend.
This gave me the impression that the
delivery
is sent after being acked. My yesterday tests were confirming this, but maybe it was due to a bug on my side.
Message is delivered to the consumer after it was acknowledged to the broker. One by one. Improvement can be made to acknowledge after delivery - not before.
Ok I am confused here. Didn't you say that :
Messages are always acknowledged one-by-one after delivery to the stream.
Is it : 1 ) --
If it is 2, then I definitively have an issue on my side. If it is 1, then the flowing flow :
Does not work in my use case as messages are not re-queued.
Usage example for a Processor
:
val connection = Connection()
val processor: Processor[Confirm, Delivery] = connection.processor(queue = "invoices")
Source(processor)
.map(delivery ⇒ // decide whether to acknowledge or requeue
if(Random.nextBoolean())
Ack(delivery.deliveryTag)
else
Requeue(delivery.deliveryTag))
.to(Sink(processor)) // tell to the broker
@vrabeux wrote:
Does not work in my use case as messages are not re-queued.
It might be that Akka Streams does additional buffering. In this case more than one message might get lost.
The message is acknowledged before onNext
. If stream decides to take more than one message before even delivering something to your code than all of them might get lost.
@mkiedys Ok. I'll really need to experiment a bit more and try to understand more deeply all of this.
The Processor
seams nice. But from what I understood, a runnable flow is 1 Source to 1 Sink. How would this handle cases like :
Source(inputQueue).map().map(ACK).map(...).to(aCompletelyDifferentOtherQueue)
Anyway thank you so much for your help and the talk.
You would have to split your stream into two: one going to the Processor
sink and another to your exchange*).
@ktoso Is there a better solution?
*) In AMQP, messages are never published to the queues directly. You always publish them via the exchange.
Yes makes sens. Thanks.
+1 I would really like to see this feature
I recommend reading about Akka Streams buffering: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/scala/stream-rate.html
I you depend on driver to send acknowledgments one-by-one you may want to reduce buffer size from default 16 to 1:
.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
Sadly the smallest buffer is 1 message.
For what its worth I ended up creating my own Reactive Streams implementation that is in pure Java and does not Auto Ack (also because its in Java is consequently far less code than this project as I just used the RabbitMQ driver directly).
public interface DeliveryAckSubscriber extends Subscriber<DeliveryResult> {
Delivery getDelivery();
}
public interface ReactiveAMQPOperations {
//rpc is for direct-reply
Publisher<Delivery> rpc(final Message m);
Publisher<DeliveryAckSubscriber> toPublisher(String queue, int prefetchCount);
}
public class DeliveryResult {
private final DeliveryResponse response; // for rpc
private final Throwable error; // if null no error
private final boolean requeue;
//constructor and getters snipped for brevity.
}
Basically the idea is you get a Publisher
that is essentially a tuple of a Subscriber
and Delivery
object (a delivery object being the body and envelope etc). That is every message that comes from the queue gets a special subscriber that is awaiting for a single onNext
of DeliveryResult
that will signify whether the message is to be acknowledged or not.
As nifty and cute as it would be to have request(int n)
changing basicQos
prefetch we do not do this but instead internally buffer. That is the prefetch is set only once. The Publisher
also only allows one subscriber. If another one subscribes we drop the other subscriber (honestly this hasn't been tested enough so its probably a bad idea). There has been some thought though on allowing request(int n)
to signify ack
of previous messages but this would not allow any way of nack
ing messages.
The rpc
method allows direct-reply
rpc (see rabbitmq doc) that provides a Publisher
that is Promise. A Publisher
that emits a single object that is cold is essentially a promise. Both above Publisher
s are cold with the exception of replacing the subscriber.
If folks are interested I was going to open source it. I also have an pure RxJava implementation that is easier to work with as well.
Thanks for inspiration Adam. I recommend running tests from reactive-streams-tck
against your implementation to check whether specification isn't violated somewhere.
Feel free send pull request here if you see ways of simplifying things.
It will take me a little time to get it opensource ready as its in our proprietary repository. That being said it is unfortunately Java so I'm not sure if it will help but it might.
As for the TCK we do pass it (well the required tests) and my secret was to shamefully steal parts of the RxJava SerializedObserver. This is because the RabbitMQ Java Consumer
handleDelivery
has overlapping calls. I believe reactive-rabbit uses Scala's STM to do this. I'm not sure which is faster but I would not be surprised if Scala STM beat the RxJava synchronized
version under very heavy loads. (I did have my own implementation using ConcurrentLinkedQueue
and some nasty r/w locks but the RxJava version was much faster).
Other implementations will often use a queue (blocking or nonblocking) to keep the RabbitMQ threads from overlapping and/or blocking up at the cost of a context switch. Thus for both of our implementations you really have to not block or dispatch downstream (in rxjava this would be observeOn) on a separate thread or else you might block up too many RabbitMQ threads (the ExecutorService that is passed into the connection).
@agentgt Please let us know when time comes; A Stream Java version would be nice.
Is this enhancement available? In my case, rabbitmq ack should be only sent after sink(DB insert) is completed successfully. A sample code would be really help full. (In Java) Thanks
Currently, messages seem to be auto-ACKed after they are pulled off the queue and submitted downstream (https://github.com/ScalaConsultants/reactive-rabbit/blob/master/src/main/scala/io/scalac/amqp/impl/QueueSubscription.scala#L39).
Have you given any thought to incorporating a way to ACK messages after they've been fully processed by the stream? If it's something you think would be worthwhile and you have some ideas on how to implement, I can take a crack at a PR.