knative-extensions / eventing-rabbitmq

RabbitMQ eventing components. Knative Source and Broker.
Apache License 2.0
90 stars 67 forks source link

Ensure Broker Ingress uses Publisher-Confirm. #334

Closed vaikas closed 2 years ago

vaikas commented 3 years ago

The channel must be configured to use Publisher Confirm, since the acks are asynchronous. As per this article here: https://www.rabbitmq.com/tutorials/tutorial-seven-java.html#:~:text=Publisher%20confirms%20are%20a%20RabbitMQ,of%20on%20the%20server%20side.

xtreme-sameer-vohra commented 3 years ago

Open Questions

gvmw commented 2 years ago

This started as a recommendation from @marcialrosales.

Publisher Confirms is a great idea as it enables RabbitMQ Server to exert back-pressure on publishers and makes things faster and more efficient, as captured in Real-world RabbitMQ deployments - Gerhard Lazu & Wayne Lund at RabbitMQ Summit 2018.

At a high-level, the simplest way of passing on that back-pressure is to not respond to HTTP requests until the Broker Ingress has not received the publisher confirm from rabbitmq-server. We expect the HTTP clients that deliver events to be OK with this, and know what to do (e.g. wait for the response before proceeding with the next event, retry on timeout etc.). Even though I don't know what those HTTP clients are yet, the expectation is that if the tests pass, and the benchmarks in https://github.com/knative-sandbox/eventing-rabbitmq/pull/445 remain just as performant, this will be a good change.

@benmoss was suggesting that I dig into https://github.com/knative/specs/tree/main/specs/eventing to understand the expectations placed on the clients.

benmoss commented 2 years ago

I think https://github.com/knative/eventing/pull/5285 is also relevant

MarcialRosales commented 2 years ago

@gvmw Indeed it is a good way to exert back-pressure, but it is also the only way to ensure guarantee of delivery of messages for senders. Otherwise, it is a fire-and-forget model which is not what the majority of customers expect.

As you propose, the simplest solution is to force a FIFO model by blocking further HTTP requests until the previous send has received its publisher-confirm. But I guess you can configure the HTTP Sink to accept up to N outstanding HTTP request (or send operations). That way you have extra concurrency and yet ensure delivery.

gvmw commented 2 years ago

/assign

gvmw commented 2 years ago

/unassign

ikavgo commented 2 years ago

/assign

ikavgo commented 2 years ago

This is what I have in my head so far:

RabbitMQ

  1. The main goal for publisher-confirms is to make publishing reliable. That is it: give publisher good amount of guarantees. For example if message is a persistent message it will be acked only after storing on disk (note the expected latency - "hundreds of milliseconds" as per https://www.rabbitmq.com/confirms.html#publisher-confirms-latency)
  2. As a side-effect publisher-confirms can be used to throttle publishers. Again, persistent message are the obvious example here - if disk is overloaded the confirms will arrive later and iff publisher has some sort of a limit regarding unacknowledged messages count, at some point it will have to stop publishing and wait.
  3. In the RabbitMQ world publisher-confirms are opt-in and can be used only for a subset of messages but a publisher. Even inside same connection only some channels can be in confirm mode.

Knative

Knative wants to help its users to focus on important things. Providing good defaults, safety, flexibility and so on. It will be good to not overload broker.

On safety and Knative guarantees.

We ca use publisher-confirms to make publishing synchronous and say to users that if sink returned 200 broker accepted the event

I personally don't see much value in this yet, since having routable message doesn't imply or guarantee something useful will come out. The receiving end can be down or not existing or overloaded and so on. In my view apps should be ready to process lost events regardless of the reason.

Using publisher-confirms for throttling and safety

My concerns:

gvmw commented 2 years ago

It's great seeing the context that you've been building @ikvmw in the open.

I know for a fact that @embano1 cares a lot about events not being lost between the Source and the Broker. Maybe he can share more of the real-world use-case that requires those guarantees, since it is difficult to empathise with an end-user when we don't understand the context in which they operate this architecture.

Maybe I am over-simplifying this because I do not have the context that you have @ikvmw, but a mutex does sound like the simplest implementation to me, and I would like to see it proven wrong by benchmarks and real-world use-cases. This is the image that I have in mind:

image

Queues Don't Fix Overload is the source for this image - thank you @ferd πŸ™ŒπŸ» - and it explains why this simple implementation goes far. While the irony is not lost on me - we are putting an HTTP back-pressure mechanism in front of RabbitMQ queues πŸ€” - I think that it captures the context of this issue well. There is a really good related article that you may enjoy even more: Handling Overload.

When all is said and done, I fall-back on Make it work. Make it right. Make it fast. What this means is that I would do the simplest thing - 🐢 Mutex, come here - and then figure out what better looks like. Not having publisher confirms at all is no better than what we are proposing here, and I know for sure that whatever we come up with first will not be as good as the follow-up, so keep calm and iterate πŸšΆπŸ»β€β™‚οΈ πŸ˜€

I would love to read what others think on the subject. @MarcialRosales @lukebakken πŸ‘€

ikavgo commented 2 years ago

Thanks Gerhard,

Do you recall, what is our default regarding message persistence here?

gvmw commented 2 years ago

In Knative Eventing RabbitMQ messages are persistent by default. Did you mean something else?

From the perspective of messages actually reaching one or more queues / streams in the broker, I would recommend looking at setting the mandatory flag too, since messages with no destination (no queue or stream to be routed too), they will be confirmed as received to the publisher, but they will be discarded within RabbitMQ. That's what I remember, it's worth double-checking what is implied in that scenario here: RabbitMQ Data Safety on the Publisher Side

ikavgo commented 2 years ago

Thanks @xtreme-sameer-vohra for this link: https://github.com/knative/specs/blob/main/specs/eventing/control-plane.md

My interpretation of the spec leads to this conclusion:

We have to use publisher confirms first and foremost to satisfy this spec passage:

A Broker MUST publish a URL at status.address.url when it is able to receive events. This URL MUST implement the receiver requirements of event delivery. Before acknowledging an event, the Broker MUST durably enqueue the event (where durability means that the Broker can retry event delivery beyond the duration of receiving the event).

and here are the requirements of event delivery: https://github.com/knative/specs/blob/main/specs/eventing/data-plane.md#event-delivery. From here I deriver that responding 200 is valid only if broker durably enqueued.

There is also one more interesting find: 429 | Too Many Requests / Overloaded. We can limit the number of unacked messages and shoot 429 right away if too many in flight.

embano1 commented 2 years ago

I personally don't see much value in this yet, since having routable message doesn't imply or guarantee something useful will come out. The receiving end can be down or not existing or overloaded and so on. In my view apps should be ready to process lost events regardless of the reason.

As you already quoted from the Knative SPEC, this would violate the safety guarantees of the SPEC. In the VEBA project the most common question we receive is "do you guarantee reliable delivery of an event". Let's not debate on what a "guarantee" here is for now. Talking to our users, they want at-least-once semantics from the Knative source, e.g. vSphere events, to the trigger/dispatcher (sink). Currently we only provide weaker at-most-once semantics, see related issues.

In the Knative world, RabbitMQ is mostly used for fire-and-forget events and so standard messaging patterns, e.g. channels, retries, correlation IDs, etc. don't apply. Once the sender (source) gets a 200 from the ingress, it considers this event delivered in compliance with the SPEC (which currently is not the case).

If an event is accepted by the broker and there is a configured dispatcher (trigger), the event will remain in the queue until the dispatcher is either deleted or comes back online. So I don't fully understand why "nothing useful" would come out of this setup. Also, not sure how one can "process lost events"? Perhaps I'm not fully understanding your context :)

The simplest way - make all requests synchronous inside sink instance by using mutex. Do not process next HTTP request till this one not acked. I think it kills parallelism. Maintain concurrent counter and let users configure the batch size - how many unacknowledged messages one sink instance allowed to have. Question here - what if we don't control how many sink instances we will have? can we still overload Broker by multiplication?

I am not sure why we need this? Are you referring to the dispatcher/trigger as sink here?

We have support for inbound rate limiting in the Go CloudEvents SDK, which was exactly built for this reason (protect ingress and sinks from overload by applying 429 back-pressure signals).

ikavgo commented 2 years ago

Thanks for the reply.

Ticket

This ticket started without any spec reference and was about back-pressure. Once Sameer pointed to the relevant spec part the discussion comes much simplified for me.

Context

So I don't fully understand why "nothing useful" would come out of this setup.

What I meant is that if it's possible that event won't be processed in-time from sender PoV (think RPC-style) it doesn't matter for sender why it was like that - because broker failed or receiver.

VEBA

I totally get VEBA use case and we always have it in mind during our discussions.

200

We can produce 200 in spec sense only we we make sure RabbitMQ accepted the message. Hence we need publisher-confirms. So we send the message and wait till it acked or nacked by RabbitMQ. I'm not set on mutex btw since I don't like killing parallelism.

429

I love 429! Question is - how we generate it. I.e. how we on ingress side can detect that RabbitMQ doesn't feel well. We can track time-to-ack and unacked count. For both we need publisher confirms.

I'm actually thinking creating a new issue or renaming this one.

ikavgo commented 2 years ago

We have support for inbound rate limiting in the Go CloudEvents SDK, which was exactly built for this reason (protect ingress and sinks from overload by applying 429 back-pressure signals).

@xtreme-sameer-vohra @benmoss can we expect the same from CE SDKs in other languages?

ikavgo commented 2 years ago

This is how I want to approach the implementation:

  1. Get 200 working - for this we have to send response only after message acked. Since on ingress side we dial new connection for each message (why not reuse though?) and create a new channel, I think mutex not needed. We don't share confirm go channel here and buffer size can be just set to 1.

  2. Get 429 working - Introduce a new setting UnackedLimit and keep global atomic counter. Increase when sending, decrease when acked. If counter approached UnackedLimit return 429 immediately.

ikavgo commented 2 years ago

Update to the post above - Ingress reuses single connection and channel for all messages

embano1 commented 2 years ago

Get 200 working - for this we have to send response only after message acked.

ACK

Ingress reuses single connection and channel for all messages

Thx for confirmation, that was also my understanding. And any form of additional locking/synchronization should also not be needed, correct?

Get 429 working - Introduce a new setting UnackedLimit and keep global atomic counter. Increase when sending, decrease when acked. If counter approached UnackedLimit return 429 immediately

This sounds like it can be done with the CE Go SDK RateLimiter, no?

ikavgo commented 2 years ago

And any form of additional locking/synchronization should also not be needed, correct?

not quite like this. When we put channel in confirm mode each message has associated delivery tag and amqp091-go sends confirmations to the single go channel. So publishers have to pick up "their" confirmations. IIUC this usually approached with concurrent hashmaps. So say having separate go thread that just consumes confirmations and then consults hash maps and resends confirmation to exact publisher thread via delivery tag mapping. What's also interesting - rabbitmq can confirm multiple messages at once. Delivery tag is basically an uint and RabbitMQ can say "I confirm up to this Tag".

This sounds like it can be done with the CE Go SDK RateLimiter, no?

Thank you for the hint, very useful for someone like me who's just scratching golang surface.

embano1 commented 2 years ago

Quick update after our discussion yesterday:

Question: will RMQ only ever ack a particular message or send batched responses, e.g. "I ack up to ID X (incl. other previous in-flight requests)? This would complicate the goroutine logic a bit IMHO if we don't have a 1:1 mapping between HTTP request -> publish to RMQ.

ikavgo commented 2 years ago

We will have 1:1 mapping between HTTP request -> publish to RMQ. But because RMQ can batch acks, exactly like you describe "I ack up to ID X", we won't have 1:1 between acks/nacks and publishing. Therefore I'm search for a ?good? thread-safe map implementation.

Looking at this post https://medium.com/@deckarep/the-new-kid-in-town-gos-sync-map-de24a6bf7c2c I understand that I maybe have three options:

  1. Use regular map and RWLock
  2. Use sync.Map
  3. Use third-part skip-list implementation, a-la https://github.com/sean-public/fast-skiplist
embano1 commented 2 years ago

Question: can you please describe how the delivery confirmation works in RMQ? I hope we can find a way to simplify parallel in-flight request handling without heavy locking.

Say I have three messages in flight (sent concurrently via separate goroutines calling channel.Publish()), message 1, 2 and 3. RMQ successfully processed 1 and 2. Will I only receive one confirmation? What will the deliveryTag look like, i.e. how can I correlate message 1 or 2 to the corresponding int returned in the deliveryTag?

ikavgo commented 2 years ago

Here is how it works:

The go client will always synchronize publishes via mutex. It won't return deliveryTag. When channel was put in confirm mode channel user and server independently track it by incrementing for each message.

For us this means that as soon we are in confirm mode we initialize the counter and wrap channel.Publish, storage and increment in mutex. So we reduce time mutex held by removing confirm waiting.

Regarding number of confirmations. when message 1 2 3 sent any combination possible - three consecutive confirms or one with deliveryTag 3 and multiple-true. Or deliveryTag 2 and multiple=true and then deliveryTag 3 and multiple=false.

What can't happen though is when confirmation for message 3 multiple=false comes before confirmation for message 2.

ikavgo commented 2 years ago

What's new:

amqp-go client resequences confirms. https://github.com/rabbitmq/amqp091-go/blob/main/confirms.go Without client rewrite "out-of-bound" confirms will be buffered and sent to listener channel only after all confirms with DeliveryTag lower are sent to the listener. On plus side, looks like we don't have to deal with mutliple:true.

On scaling side, while it's expected that ingresses will be replicated at some point we want to provide adequate level of parallelism for single-node ingress scenario (capacity planning, failure modes, etc). We want to use up to N rabbitmq channels and round-robin them for sends.