bytebeamio / rumqtt

The MQTT ecosystem in rust
Apache License 2.0
1.63k stars 252 forks source link

rumqttc: Throttling of pending requests can cause starvation on the request side #814

Open flxo opened 8 months ago

flxo commented 8 months ago

Current Behavior

The delay between pending request is awaited in parallel to incoming frames. A incoming frame cancels a potential pending delay. The delay is started from scratch in the next call to poll. Cyclic incoming frames with a period lower than the throttle duration leads to starvation on the request side.

Failure Information (for bugs)

This is a bug. I have no traces - found by review.

swanandx commented 8 months ago

Hey, please check if i understood this correctly:

my thoughts / questions:

due to this, and my limited context, i think this might be a trade-off rather than an issue. If you think otherwise, can you please elaborate the context more?

thank you so much!

flxo commented 8 months ago

Hey, please check if i understood this correctly:

  • if incoming packet comes before the pending throttle ( in case if there were some pending requests ), the next_request future will be cancelled.
  • if the incoming packet keeps coming at higher frequency, it might not get chance to complete the next_request, thus starve. ( i mentioned packet not frame because readb is resolved only when whole packet is read ).

Exactly.

my thoughts / questions:

  • pending_throttle is 0ms by default and is configurable.
  • correct me if wrong, if we give priority to pending requests, that would mean we are starving on incoming packets right?

Both - the rx and the sleep - are polled in the select!. So we cannot starve on the rx path but incoming packets can lead (as you wrote above) to starvation on the pending requests. Only the requests in the VecDec are affected - not the requests from the channel.

A scenario where this can happen is when there are pending requests and a configured throttle. The client connects with clean_session false and instantly gets publications from the broker.

due to this, and my limited context, i think this might be a trade-off rather than an issue. If you think otherwise, can you please elaborate the context more?

Think it's just a bug in the implementation. The poll fn doesn't store any context regarding the throttle. The sleep delay is always started from scratch. To solve this I'd create a Stream of Requests and like this and hold that stream in EventLoop. The code behind the link has the same bug as main. The throttled stream is created newly upon each call of poll. Getting that stream in EventLoop is probably hard because of lifetime issues. To tackle this an option could be to spawn a task that loops on poll and communicates the events via channel. The channel is fed directly from State. This would also avoid the nasty Ok(self.state.events.pop_front().unwrap()) that feel odd in current implementation. Such a task would also allow to interleave rx and tx similar to this. This all would be a rather big refactoring with a lot of room for errors.

de-sh commented 8 months ago

Loved the Codec implementation, let's start by implementing that in the first PR? I'll take this up if you are ok with it. We can go about this by dividing up the work.

flxo commented 8 months ago

Loved the Codec implementation, let's start by implementing that in the first PR? I'll take this up if you are ok with it. We can go about this by dividing up the work.

I'm absolutely fine with that but think this is a general and architectural decision which must be approved within the rumqtt team.

Some notes:

The branch is probably a good starting point but needs something more:

de-sh commented 8 months ago

I'm absolutely fine with that but think this is a general and architectural decision which must be approved within the rumqtt team.

Noted, we will discuss this and get back to you.

Alternatively reuse the Framed instance from Network.

That sounds like a great option, we could just as well re-establish a connection and use the buffers as it was from the previous connection, dropping corrupted/incomplete packet bytes.

Moving the loop into a task would require even more: Introduce a channel for the events (best use a Sink interface with explicit flushing to get less task switches). poll would just asynchronically receive on the channel (Even better: implement Stream for EventLoop).

Forgive me if my understanding is naive, but I believe it is better to not spawn a separate task and use something like a timer that doesn't get cancelled to handle this, we could ignore it and set it to never when nothing is pending?

flxo commented 8 months ago

Alternatively reuse the Framed instance from Network.

That sounds like a great option, we could just as well re-establish a connection and use the buffers as it was from the previous connection, dropping corrupted/incomplete packet bytes.

Hm. The tx buffer should be fine to keep in total because it's flushed. The rx buffer is likely to contain incomplete packets. I don't know the MQTT standard by heart but probably it would be fine the process the complete frames and drop the possible last incomplete. The implementation today discards an incomplete received frame upon network errors. Nothing you can do here...

Moving the loop into a task would require even more: Introduce a channel for the events (best use a Sink interface with explicit flushing to get less task switches). poll would just asynchronically receive on the channel (Even better: implement Stream for EventLoop).

Forgive me if my understanding is naive, but I believe it is better to not spawn a separate task and use something like a timer that doesn't get cancelled to handle this, we could ignore it and set it to never when nothing is pending?

Also possible and less invasive. This would be another select! branch. The request_rx branch is guarded with self.pending.is_empty().