AtherEnergy / rumqtt

Pure rust mqtt cilent
The Unlicense
202 stars 71 forks source link

WIP: Better story about in flight messages and rate limit #141

Closed flxo closed 5 years ago

flxo commented 5 years ago

Hi.

the way rumqtt handles kind of overload situations seems not optimal to me. The current behaviour just blocks for a configured amount of time when the request channel is full. If the application sends any command during this delay, rumqtt will block again when polling next time. This leeds to a rate of 1/configured delay. If the clients keeps sending you will stay at this rate.

I changed this implementation and removed the fixed delay with a stream interception when the publication queue is at a configured level (MqttOptions::in_flight). The essential part is here:

    // Apply outgoing queue limit (in flights) by answering stream poll with not ready if queue is full
    // by returning NotReady.
    fn limit_in_flight_request_stream(&self, requests: impl RequestStream) -> impl RequestStream {
        let mqtt_state = self.mqtt_state.clone();
        let in_flight = self.mqttoptions.in_flight();
        let mut stream = requests.peekable();
        poll_fn(move || -> Poll<Option<Request>, NetworkError> {
            if mqtt_state.borrow().publish_queue_len() >= in_flight {
                match stream.peek() {
                    Err(_) => stream.poll(),
                    _ => Ok(Async::NotReady)
                }
            } else {
                stream.poll()
            }
        })
    }

This affects any command on the request queue. I'm not sure if there is a definition for the in flight window and if it should contain e.g Subscribes or others. I personally think it doesn't matter if they're included.

Next the stream throttling implementation is also a little bit misleading. I replaced the sleep time that happen when a configured rate is reached with a simple StreamExt::throttle that is a prefect match.

This patch changes the client API.

What do you think? I consider this as work in progress and appreciate any feedback!

cheers!

tekjar commented 5 years ago

If the application sends any command during this delay, rumqtt will block again when polling next time.

I didn't completely understand what you mean here but overall you solution looks much elegant and clean than what I did :).

You seem to have cleaned up Prepend stream as well. Can we make that a separate PR? That just makes testing and visiting back this merge in future easier

tekjar commented 5 years ago

Or never mind. Prepend isn't a big change. We'll leave it there :)

flxo commented 5 years ago

If the application sends any command during this delay, rumqtt will block again when polling next time.

I didn't completely understand what you mean here but overall you solution looks much elegant and clean than what I did :).

Ok. Imagine a client continuously publishing messages and the connection or broker isn't fast enough to handle them the rumqtt internal queue MqttState::outgoing_pub reaches MqttOptions::outgoing_queuelimit.0. The currently implementation detects this and waits for MqttOptions::outgoing_queuelimit.. If during this period of time the client pushes another Publish, rumqtt will do the same in the next iteration, since MqttState:: outgoing_pub is (again) full. This results in processing one element per MqttOptions::outgoing_queuelimit.1. Not very helpful to resolve the overload... Exactly the same happens for MqttOptions::outgoing_ratelimit.

You seem to have cleaned up Prepend stream as well. Can we make that a separate PR? That just makes testing and visiting back this merge in future easier

Yeah. During this refactoring I removed prepend and used Stream::chain but later on realised that we need some kind of buffer over reconnects. Then I renamed prepend::StreamExt since it collides with futures::stream::StreamExt I wanted to use for Stream::throttle. I renamed Prepend::merge_session to insert since Prepend doesn't care about sessions and inserts item in between the buffer and the wrapped stream. I will put this into a dedicated commit.

flxo commented 5 years ago

The most crucial point to me is if I made something stupid when returning NotReady or not. Could this lead to a deadlock? I'm not sure but think not because if the broker stops replying to our Publishes the Ping will timeout as well and we're reconnecting or just failing.

tekjar commented 5 years ago

Could this lead to a deadlock

One way I think deadlock could happen is, if publish request queue is full (sender is blocked now) and there is no one to notify the RequestStream to pull out an item when publish_queue_len is less than in_flight length. Select should take care of this as long as there is someone to wake up the event loop. I think it makes sense to write unit tests to test these scenarios.

tekjar commented 5 years ago

These changes already looks good to me. I'll merge this in a different branch and experiment with some test cases that I already wrote. Thanks a lot for the contribution :)

flxo commented 5 years ago

Ok. Thanks. I didn't have time to work on this the last days. Last week I ran a couple of load tests with the patch without any issues but the tricky part are definitely reconnections and connections flaws. Let me know if there's anything I can do.

tekjar commented 5 years ago

I didn't have time to work on this the last days

No problem. I'll finish this :)

Let me know if there's anything I can do.

I just have a few doubts about stream progress when the request channel is full and stream NotReady return during rate limiting. I'll confirm those and raise a PR against the master. A review during that time would help

flxo commented 5 years ago

I just have a few doubts about stream progress when the request channel is full and stream NotReady return during rate limiting. I'll confirm those and raise a PR against the master. A review during that time would help

Yes. This is legitime. I played today evening and and learnt that mixing crossbeam and futures channels isn't a good idea. Something needs to wake the reactor but I don't have a deeper understanding of what happens inside.

Looking forward to your final patch!