AtherEnergy / rumqtt

Pure rust mqtt cilent
The Unlicense
202 stars 71 forks source link

Re-subscribe after reconnect #85

Open dbrgn opened 6 years ago

dbrgn commented 6 years ago

I've experienced the situation a few times now that the connection would reconnect, but no messages would be received afterwards.

(reverse log)

Apr 10 01:50:26 sepp smartmail[8637]: INFO 2018-04-09T23:50:26Z: rumqtt::client::connection: mqtt connection successful
Apr 10 01:50:26 sepp smartmail[8637]: INFO 2018-04-09T23:50:26Z: rumqtt::client::connection: Address resolved to Some(V4(52.169.76.255:1883))
Apr 10 01:50:23 sepp smartmail[8637]: INFO 2018-04-09T23:50:23Z: rumqtt::client: Will sleep for Duration { secs: 3, nanos: 0 } seconds before reconnecting
Apr 10 01:50:23 sepp smartmail[8637]: ERROR 2018-04-09T23:50:23Z: rumqtt::client: Network connection failed. Error = Outgoing, Connection count = ConnectedBefo
Apr 10 01:50:23 sepp smartmail[8637]: ERROR 2018-04-09T23:50:23Z: rumqtt::client::connection: Reactor stopped. e = Outgoing
Apr 10 01:50:23 sepp smartmail[8637]: ERROR 2018-04-09T23:50:23Z: rumqtt::client::connection: Handling outgoing packet failed. Error = AwaitPingResp
Apr 10 01:50:23 sepp smartmail[8637]: ERROR 2018-04-09T23:50:23Z: rumqtt::client::state: Error awaiting for last ping response
Apr 10 01:50:23 sepp smartmail[8637]: DEBUG 2018-04-09T23:50:23Z: rumqtt::client::state: Last outgoing before 9 seconds. Last incoming packet before 69 seconds
Apr 10 01:50:13 sepp smartmail[8637]: DEBUG 2018-04-09T23:50:13Z: rumqtt::client::connection: Sending packet. Pingreq

Afterwards, ping messages are sent and received, but no messages from the server arrive. This would suggest that the subscriptions are not re-established.

tekjar commented 6 years ago

@dbrgn Yeah. Resubscribe is not implemented for clean sessions. I'll add that feature when I start working on rumqtt again.

dbrgn commented 6 years ago

Cool! Is there a workaround I can implement in the meantime?

(I guess I'll just restart the service every 15 min :slightly_smiling_face:)

By the way, that's probably an important limitation that should be mentioned in the README.

tekjar commented 6 years ago

How about using clean_session=false?

dbrgn commented 6 years ago

Ah, I didn't realize that this was a configurable option. Thanks!

hmvp commented 6 years ago

I am not sure if this feature request is actually wanted. I would say that documenting this behavior and giving a way to detect reconnects should be enough. If it would be implemented it should be configurable.

This also because it gives a false sense of security: When using clean session the subscribe will always take some time. Any message that is received by the broker in between the disconnect and the subscribe is lost for the client... Thus if you need to be sure that you receive stuff you need clean_session=false If you need clean_session=true there is a reasonable chance that you want to change your subscriptions anyway..

tekjar commented 6 years ago

@dbrgn I kinda agree with @hmvp here. Automatic resubscription during reconnections might create confusion when people want their subscriptions to be reset during reconnections. User resubscribing manually when he receives Disconnect notification might be a better idea here

fooker commented 5 years ago

Usually, libraries provide some kind of connected-callback which can be used to (re)subscribe. AFAIK currently there is no way of getting those events or am I missing something?

tekjar commented 5 years ago

@fooker I'll make changes to enable disconnect and reconnect event notifications. Next week most probably

bbigras commented 5 years ago

I'm using set_reconnect_opts(ReconnectOptions::Always(10)) and set_clean_session(false).

When I kill the broker (mosquitto), it reconnects and I see that the keepalives are working but I don't receive data anymore.

I'm using 3ecd3fecf3f9284d9d8a93274d6ad61066e91a43

[2019-02-05T18:09:12Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 9 secs,
            last outgoing packet before 15 secs
[2019-02-05T18:09:12Z DEBUG tokio_reactor] dropping I/O source: 0
[2019-02-05T18:09:12Z ERROR rumqtt::client::connection] Event loop returned. Error = NetworkStreamClosed

[2019-02-05T18:09:22Z INFO  rumqtt::client::connection] Mqtt connect response = Some(Connack(Connack { session_present: false, code: Accepted }))
[2019-02-05T18:09:22Z INFO  rumqtt::client::connection] Mqtt connection successful!!
[2019-02-05T18:09:32Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 10 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:09:32Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
[2019-02-05T18:09:42Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 10 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:09:42Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
[2019-02-05T18:09:52Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 9 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:09:52Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
[2019-02-05T18:10:02Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 9 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:10:02Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
[2019-02-05T18:10:12Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 10 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:10:12Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
tekjar commented 5 years ago

I'm assuming you are restarting the broker. Unless broker writes session subscription to disk and reuses it in the next restart, you won't see data. cleansession is more in terms of n/w disconnection, not broker crashes

bbigras commented 5 years ago

You mean for the retain stuff right?

I mean that after I restart the broker, I publish again but I don't receive it on the subscribed side.

tekjar commented 5 years ago

Unless you don't resubscribe again after broker restart, it won't work

bbigras commented 5 years ago

Oh. I totally misread your last comment. Sorry. and thanks!

I guess it's a bad idea for me to use clean_session=false then.

tekjar commented 5 years ago

clean_session=false is the way to make broker remember subscriptions after network reconnections. Otherwise, broker will clean all the client state.

bbigras commented 5 years ago

Is there a way to ensure being resubscribed after a broker crash? I think maybe ReconnectOptions::never and opening a new connection in a loop.

tekjar commented 5 years ago

This isn't usually a problem with gcloud/aws/azure SaaS brokers.

I'm not sure if rumqtt is sending a Disconnect/Reconnect events on notification channel. Idea is to leverage those.

opensourcegeek commented 5 years ago

Just wanted to check, is this behavior only applicable for subscribe/resubscribe? I've noticed that in poor connections publish doesn't work until the process is restarted but it could be anything in our network. Since this issue is so close to what I'm seeing just want to make sure that it's not related to publish. I only publish data from clients at the moment.

tekjar commented 5 years ago

@opensourcegeek Yeah. Just applicable to resubscriptions. Can you open a new issue and provide more details on the problem. I might not be able to work immediately but I'll be spending a lot more time on this crate from June

TotalKrill commented 5 years ago

I will base my opinion on the assumption that MQTT systems are long running message brokers/notifications systems, thus from a user perspective, when subscribed to a topic, I will be getting messages on that topic until such time I manually unsubscribe from them.

Being able to react on network disconnect manually and then resubscribing seems like a workaround. Personally since the default for clean_session is "false", which is fine otherwise it will have an effect and memory footprint on the broker side, I think an option for automatic resubscription should be included and should default to true.

That said, being able to react to disconnects would be great anyway, since there are brokers can communicate permission errors by disconnecting the client, this is application specific i would say though, so if the library allows for reaction, that would be fine.

opensourcegeek commented 5 years ago

@tekjar Thanks for getting back - I don't think it's a problem with this library or at least I haven't got enough evidence to say this library is the issue. In my case I don't think broker restarted, instead the connectivity from client to server is pretty bad and once connectivity is re-established on network level the library doesn't seem to publish data. However there are various other parts to that system and it could be because of any of the other moving parts the data isn't getting pushed. All I wanted to check was all the conversation in this thread about bad connectivity whether it would be applicable for publishing data too. If so I'll give more attention and perhaps raise another ticket otherwise I'll rule this out for now in my investigation.

michaelmarconi commented 5 years ago

I'm struggling with the issue of resubscription on network disconnects. I've read through this issue and issue 143 and I'm none the wiser!

In my case, after an initial successful connection, I see this message:

2019-09-05T07:56:18Z ERROR rumqtt::client::connection] Event loop returned. Error = NetworkStreamClosed
[2019-09-05T07:56:28Z INFO  rumqtt::client::connection] Mqtt connect response = Some(Connack(Connack { session_present: false, code: Accepted }))
[2019-09-05T07:56:28Z INFO  rumqtt::client::connection] Mqtt connection successful!!

And that appears to drop all of my subscriptions, as I never hear from the MQTT broker again (Google IOT Core in my case). Note that my client is setting .set_clean_session(false).

@tekjar says:

Reconnection notifications and notification drop handling are part of master now.

I can't see any documentation about how to receive these however, so I don't know how to handle the disconnects myself.

Can anyone point me in the right direction please?

TotalKrill commented 5 years ago

Which version are you using?

michaelmarconi commented 5 years ago

@TotalKrill I'm using 0.30.1, which is what cargo search turned up.

tekjar commented 5 years ago

@michaelmarconi Automatic resubscription isn't implemented yet. But I'm not sure if that's the problem you are facing. Are you connecting with clean_session = false so that broker remembers the client after disconnection?

michaelmarconi commented 5 years ago

Hi @tekjar, I believe I'm setting clean_session to false:

    // MQTT options
    let mqtt_broker_host = env::var("MQTT_BROKER_HOST").expect("The MQTT broker host environment setting is missing!");
    let mqtt_broker_port: u16 = env::var("MQTT_BROKER_PORT").expect("The MQTT broker port environment setting is missing!").parse().unwrap();
    let mqtt_heartbeat_interval: u16 = env::var("MQTT_HEARTBEAT_INTERVAL").expect("The MQTT heartbeat interval environment setting is missing!").parse().unwrap();
    let mqtt_reconnect_interval: u64 = env::var("MQTT_RECONNECT_INTERVAL").expect("The MQTT heartbeat interval environment setting is missing!").parse().unwrap();
    let mqtt_options = MqttOptions::new(client_id, mqtt_broker_host, mqtt_broker_port)
        .set_connection_method(ConnectionMethod::Tls(root_cert, None))
        .set_keep_alive(mqtt_heartbeat_interval)
        .set_clean_session(false) // <-- IS THIS THE CORRECT SETTING?
        .set_reconnect_opts(ReconnectOptions::Always(mqtt_reconnect_interval))
        .set_security_opts(SecurityOptions::GcloudIot(
            String::from(project_id),
            private_key,
            60,
        ));

    // Start MQTT client
    let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).expect("Failed to connect to MQTT broker!");

Here's a recent set of logs:

2019-09-05T12:16:52Z INFO  rumqtt::client::connection] Mqtt connect response = Some(Connack(Connack { session_present: false, code: Accepted }))
[2019-09-05T12:16:52Z INFO  rumqtt::client::connection] Mqtt connection successful!!
[2019-09-05T12:16:52Z INFO  rumqtt::client::mqttstate] Subscribe. Topics = [SubscribeTopic { topic_path: "/devices/a7137d512884f41c/config", qos: AtLeastOnce }], Pkid = PacketIdentifier(1)
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Outgoing request = "Subscribe(Subscribe { pkid: PacketIdentifier(1), topics: [SubscribeTopic { topic_path: \"/devices/a7137d512884f41c/config\", qos: AtLeastOnce }] })"
[2019-09-05T12:16:52Z INFO  rumqtt::client::mqttstate] Subscribe. Topics = [SubscribeTopic { topic_path: "/devices/a7137d512884f41c/commands/#", qos: AtLeastOnce }], Pkid = PacketIdentifier(2)
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Outgoing request = "Subscribe(Subscribe { pkid: PacketIdentifier(2), topics: [SubscribeTopic { topic_path: \"/devices/a7137d512884f41c/commands/#\", qos: AtLeastOnce }] })"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "Suback(Suback { pkid: PacketIdentifier(1), return_codes: [Success(AtLeastOnce)] })"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "Suback(Suback { pkid: PacketIdentifier(2), return_codes: [Success(AtLeastOnce)] })"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "topic = /devices/a7137d512884f41c/config, qos = AtLeastOnce, pkid = Some(PacketIdentifier(1)), payload size = 137 bytes"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "topic = /devices/a7137d512884f41c/config, qos = AtLeastOnce, pkid = Some(PacketIdentifier(2)), payload size = 137 bytes"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Outgoing request = "topic = /devices/a7137d512884f41c/state, qos = AtLeastOnce, pkid = Some(PacketIdentifier(3)), payload size = 170 bytes"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "Puback(PacketIdentifier(3))"
[2019-09-05T12:17:02Z DEBUG rumqtt::client::mqttstate] Ping = None. keep alive = 10,
            last incoming packet before 9 secs,
            last outgoing packet before 9 secs
[2019-09-05T12:17:08Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 15 secs,
            last outgoing packet before 5 secs
[2019-09-05T12:17:08Z ERROR rumqtt::client::connection] Event loop returned. Error = NetworkStreamClosed
[2019-09-05T12:17:18Z INFO  rumqtt::client::connection] Mqtt connect response = Some(Connack(Connack { session_present: false, code: Accepted }))
[2019-09-05T12:17:18Z INFO  rumqtt::client::connection] Mqtt connection successful!!
[2019-09-05T12:17:28Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 10 secs,
            last outgoing packet before 10 secs

I'm cutting the network connection to my development machine (see the error at 2019-09-05T12:17:08Z) and then plugging it in again. The client reconnects to the Google IOT Core MQTT broker but the state doesn't appear to be restored, as I never receive any further notifications from the broker, which I am sending manually to be sure.

Looking at the logs at the reconnect event (Some(Connack(Connack { session_present: false, code: Accepted }))), it looks to me like the Google MQTT broker isn't persisting session data. Indeed, it seems that their documentation indicates that persistent sessions are not supported.

Automatic resubscription isn't implemented yet.

Is there a way I can register for disconnect notifications so I can resubscribe to the topics manually instead?

michaelmarconi commented 5 years ago

@tekjar, I just noticed your comment that states that the disconnect handling is implemented on master.

I've just tried pulling the master branch into my cargo dependencies but it won't compile:

 Compiling rumqtt v0.31.0 (https://github.com/AtherEnergy/rumqtt#eadd783a)
error[E0308]: mismatched types
   --> .../.cargo/git/checkouts/rumqtt-97ac1717aa98c207/eadd783/src/client/network.rs:130:34
    |
130 |             config.set_protocols(&self.alpn_protocols);
    |                                  ^^^^^^^^^^^^^^^^^^^^ expected slice, found struct `std::vec::Vec`
    |
    = note: expected type `&[std::string::String]`
               found type `&std::vec::Vec<std::vec::Vec<u8>>`

Any chance you could release a new version with an example of how to register for disconnection notices?

tekjar commented 5 years ago

Looking at the logs at the reconnect event (Some(Connack(Connack { session_present: false, code: Accepted }))), it looks to me like the Google MQTT broker isn't persisting session data. Indeed, it seems that their documentation indicates that persistent sessions are not supported.

Oh wow. I didn't know of this

I've just tried pulling the master branch into my cargo dependencies but it won't compile:

Remove your target and Cargo.lock and rebuild

michaelmarconi commented 5 years ago

Thanks @tekjar, that did the trick. I'm receiving the Reconnection notification now and resubscribing to topics when that happens.