AtherEnergy / rumqtt

Pure rust mqtt cilent
The Unlicense
202 stars 71 forks source link

Client reconnects after calling pause #134

Open denizs opened 5 years ago

denizs commented 5 years ago

Hey there! I'm using v0.30.1 of rumqtt. After some testing, I found that calling resume actually does no behave as expected. When I call pause on the client, it automatically reconnects.

tekjar commented 5 years ago

Is your title supposed to be Client reconnects after calling pause? That shouldn't happen. I'll check :)

denizs commented 5 years ago

@tekjar Sorrry - yeah 😁 I've verified this behaviour with the v.0.30.1 tag. The problem is that after executing mqtt_io, receiving a UserDisconnect and setting the is_network_enabled flag to false, the reconnection loop continues and executes connect_timeout, which in turn reconnects the client to the server. I'm still in the phase of understsanding the logic behind it, so I think this issue should be easy for you to grasp.

The entire reason I'm trying to achieve this was to implement a reconnect_with_opts method to allow updating a password (as necessary for GcloudIoT and such), so if you're already working on this, let me know 👍

tekjar commented 5 years ago

@denizs Can you send me an example to reproduce this. Working as it should in my examples

denizs commented 5 years ago

Yeah for sure. The code below does it for me. init_logging simply configures env logger to see what's going on in rumqtt. I'm building rumqtt from source after resetting to v.0.30.1 at hash c5bcd9f. I'm running mosquitto locally and see an immediate reconnection after the pause call, even though I'm blocking on my thread before calling resume again.

fn main() {
    init_logging();
    let broker = "localhost";
    let port = 1883;

    let reconnection_options = ReconnectOptions::Always(10);
    let mqtt_options = MqttOptions::new("test", broker, port)
                                    .set_keep_alive(10)
                                    .set_reconnect_opts(reconnection_options)
                                    .set_clean_session(false);

    let (mut client, notifications) = MqttClient::start(mqtt_options).unwrap();
    loop {
        thread::sleep(Duration::from_secs(7));
        error!("Pausing!");
        client.pause();
        thread::sleep(Duration::from_secs(7));
        error!("Resuming!");
        client.resume();
    }

}
tekjar commented 5 years ago

Oh yeah. It's connecting immediately. I'll fix this. But if it is gcloud, there is inbuilt support. You just need to provide key to encrypt the generated jwt. rumqtt takes care of generating jwt periodically.

tekjar commented 5 years ago
    let client_id = "projects/".to_owned()
        + &config.project
        + "/locations/us-central1/registries/"
        + &config.registry
        + "/devices/"
        + &config.id;

    let security_options = SecurityOptions::GcloudIot(config.project, include_bytes!("../../certs/rsa_private.der").to_vec(), 60);

    let ca = include_bytes!("../../certs/roots.pem").to_vec();
    let connection_method = ConnectionMethod::Tls(ca, None);

    let mqtt_options = MqttOptions::new(client_id, "mqtt.googleapis.com", 8883)
        .set_keep_alive(10)
        .set_connection_method(connection_method)
        .set_security_opts(security_options);

Why don't you do this?

denizs commented 5 years ago

In case you're busy right now, I'd be happy to open a PR, if you share your approach! I've been hitting my head against the circular dependency for the past 6 hours 😄

denizs commented 5 years ago

To be honest, I thought it would be nice to contribute to this project, as I' ve been using it quite a bit and thought that a reconnect_with_opts method would generally be a nice thing for other users. Another reason was that I simply did not think that refreshing the JWT would be taken care of (awesome of you to think of this).

tekjar commented 5 years ago

To be honest, I thought it would be nice to contribute to this project

Awesome :) . Please do

denizs commented 5 years ago

I just finished a long running test and it seems like the token does not get refreshed. Last error message was also the last publish message and it says "token expired". I think it's due to the fact that the username and password fields are set in the mqtt state, which is not affected by the reconnect loop (correct me if I'm wrong).

Edit: Ok never mind, the connect packet should actually handle this properly. I'll have a look at this tomorrow.

Edit 2: The problem is that if you're publishing and receiving a NetworkError::ConnectionClosed, the eventloop fails. So this is really simple to fix. I'll post a PR tomorrow.

tekjar commented 5 years ago

The entire reason I'm trying to achieve this was to implement a reconnect_with_opts

Looks like this is functionally implemented but I didn't test.

tekjar commented 5 years ago

The problem is that if you're publishing and receiving a NetworkError::ConnectionClosed, the eventloop fails

Sorry I didn't understand this completely. The eventloop is supposed to fail and then reconnect again. Can you give me an example which isn't working as expected?

denizs commented 5 years ago

When the jwt expires and you try to publish, mqtt_io returns a NetworkError::NetworkStreamClosed error, which is returned with Err(false). After, the client cannot recover again.

rumqtt::client::connection - Event loop returned. Error = NetworkStreamClosed
ERROR - Publishing error: Failed sending request to connection thread. Error = send failed because receiver is gone
denizs commented 5 years ago

That was simply because my reconnection options were set to never. Never mind!

denizs commented 5 years ago

I opened a PR concerning graceful shutdown, as I noticed that on reconnect, the client would simply close the connection. That said, I'm still trying to wrap my head around a solution for the automatic reconnect after client.pause but it seems like it would encompass an entire rewrite of mqtt_eventloop 😂

In case to achieve this, we need to skip the mqtt_connect_timeout and go right to mqtt_io. However, to call mqtt_io, we need to call mqtt_connect_timeout first 😁

tekjar commented 5 years ago

This will be fixed in 0.31. I know it's super late but would you like to take a stab at this when I'm done with 0.31? ( in a week)

tekjar commented 5 years ago

Hi @denizs. This is implemented in develop branch. Can you check if this is working as expected?