AtherEnergy / rumqtt

Pure rust mqtt cilent
The Unlicense
202 stars 71 forks source link

Reactor Stopped #73

Closed dbrgn closed 6 years ago

dbrgn commented 6 years ago

I'm trying the new tokio2 branch with a simple subscriber.

I'm trying to subscribe to two topics:

let (mut client, receiver) = MqttClient::start(client_options);
let topics = vec![
    ("+/devices/+/activations", QoS::AtMostOnce),
    ("+/devices/+/up", QoS::AtMostOnce),
];
client.subscribe(topics).expect("Subcription failure");

thread::spawn(move || {
    println!("--> Listening!");
    for packet in receiver {
        println!("received {:?}", packet);
    }
}).join().unwrap();

When I start the client:

INFO 2018-02-03T01:04:16Z: rumqtt::client::connection: mqtt connection successful
DEBUG 2018-02-03T01:04:16Z: rumqtt::client::connection: Sending packet. Subscribe(Subscribe { pid: PacketIdentifier(1), topics: [SubscribeTopic { topic_path: "+/devices/+/activations", qos: AtMostOnce }, SubscribeTopic { topic_path: "+/devices/+/up", qos: AtMostOnce }] })
DEBUG 2018-02-03T01:04:16Z: rumqtt::client::connection: Received packet. "Suback(Suback { pid: PacketIdentifier(1), return_codes: [Success(AtMostOnce), Success(AtMostOnce)] })"
DEBUG 2018-02-03T01:04:16Z: rumqtt::client::connection: Sending packet. Suback(Suback { pid: PacketIdentifier(1), return_codes: [Success(AtMostOnce), Success(AtMostOnce)] })
ERROR 2018-02-03T01:04:16Z: rumqtt::client::connection: Reactor stopped. v = ()

Why is the reactor stopped?

And why is the thread not terminated (= the packet receiver closed) if the reactor is stopped?

(Sorry if this is a stupid question, I'm not too familiar with MQTT 🙂)

tekjar commented 6 years ago

Sorry if this is a stupid question

Definitely not a stupid question. You have discovered a bug. The client is sending wrong suback 2nd time. Mosquitto seems to be lenient with this. Which broker are you using?

I've pushed a fix. Can you test now?

Why is the reactor stopped?

Broker has closed the connection because of the duplicate suback.

dbrgn commented 6 years ago

Which broker are you using?

I don't know, I'm communicating with TTN servers :)

I've pushed a fix. Can you test now?

Will try, hopefully tonight.

dbrgn commented 6 years ago

The double suback seems fixed, but I still lose connection...

 INFO 2018-02-03T19:37:29Z: rumqtt::client::connection: Address resolved to Some(V4(52.169.76.255:1883))
 INFO 2018-02-03T19:37:29Z: rumqtt::client::connection: mqtt connection successful
DEBUG 2018-02-03T19:37:29Z: rumqtt::client::connection: Sending packet. Subscribe(Subscribe { pid: PacketIdentifier(1), topics: [SubscribeTopic { topic_path: "+/devices/+/activations", qos: AtMostOnce }, SubscribeTopic { topic_path: "+/devices/+/up", qos: AtMostOnce }] })
DEBUG 2018-02-03T19:37:29Z: rumqtt::client::connection: Received packet. "Suback(Suback { pid: PacketIdentifier(1), return_codes: [Success(AtMostOnce), Success(AtMostOnce)] })"
received Suback(Suback { pid: PacketIdentifier(1), return_codes: [Success(AtMostOnce), Success(AtMostOnce)] })
ERROR 2018-02-03T19:37:32Z: rumqtt::client::connection: Reactor stopped. v = ()
 INFO 2018-02-03T19:37:32Z: rumqtt::client: Will sleep for Duration { secs: 10, nanos: 0 } seconds before reconnecting

 INFO 2018-02-03T19:37:42Z: rumqtt::client::connection: Address resolved to Some(V4(52.169.76.255:1883))
 INFO 2018-02-03T19:37:42Z: rumqtt::client::connection: mqtt connection successful
ERROR 2018-02-03T19:37:45Z: rumqtt::client::connection: Reactor stopped. v = ()
 INFO 2018-02-03T19:37:45Z: rumqtt::client: Will sleep for Duration { secs: 10, nanos: 0 } seconds before reconnecting

Can I debug something about this without access to server logs?

tekjar commented 6 years ago

@dbrgn Broker is disconnecting the client for some reason. It's difficult to debug this with out logs. One way to cross check is to connect using some other mqtt client and see if it works.

dbrgn commented 6 years ago

It seems to work with paho-mqtt (python). There I'm connecting via TLS though. Maybe that's the issue? (I would have expected that subscriptions wouldn't even be acked if TLS is required by the server.)

So, how would I set up TLS? This is all I can find in the docs:

screenshot

The code has a comment saying "ca and, optionally, a pair of client cert and client key". So, what is ca? Is it a string to a path to some kind of certificate? If yes, what format? PEM/DER? Or is it a string of the PEM itself?

tekjar commented 6 years ago

@dbrgn Can you paste your MqttOptions here

If your broker expects tls connection, you need to use Tls with pem type ca. But from your logs, connection was successful. Can you try with out subscribe to narrow down the problem?

dbrgn commented 6 years ago

Sure!

let client_options = MqttOptions::new("smartmail", "eu.thethings.network:1883")
        .unwrap_or_else(|e| {
            println!("Could not initialize MqttOptions: {}", e);
            exit(3);
        })
        .set_keep_alive(10)
        .set_reconnect_opts(ReconnectOptions::AfterFirstSuccess(10))
        .set_connection_method(ConnectionMethod::Tcp)
        .set_security_opts(SecurityOptions::UsernamePassword((
            conf.ttn_app_id.clone(),
            conf.ttn_access_key.clone(),
        )));

Even if commenting out the subscription and everything except the basic auth:

let client_options = MqttOptions::new("smartmail", "eu.thethings.network:1883")
        .unwrap_or_else(|e| {
            println!("Could not initialize MqttOptions: {}", e);
            exit(3);
        })
        .set_security_opts(SecurityOptions::UsernamePassword((
            conf.ttn_app_id.clone(),
            conf.ttn_access_key.clone(),
        )));

...it still fails.

This is not about the authentication though, if I change the basic auth password to "foo", connecting fails:

 INFO 2018-02-04T12:00:44Z: rumqtt::client::connection: Address resolved to Some(V4(52.169.76.255:1883))
ERROR 2018-02-04T12:00:49Z: rumqtt::client: Network connection failed. Error = MqttConnectionRefused(5), Connection count = InitialConnect

I tried TLS:

.set_connection_method(ConnectionMethod::Tls("mqtt-ca.pem".into(), None))

...where mqtt-ca.pem is this file in the current directory. (This is also used successfully for the connection through Python.)

When running this, connecting fails:

 INFO 2018-02-04T12:05:01Z: rumqtt::client::connection: Address resolved to Some(V4(52.169.76.255:1883))
ERROR 2018-02-04T12:05:01Z: rumqtt::client: Network connection failed. Error = Io(Custom { kind: Other, error: Error { code: ErrorCode(5), cause: Some(Io(Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" })) } }), Connection count = InitialConnect

I asked the server maintainers for logs, but I don't know yet if I'm going to get any.

tekjar commented 6 years ago

@dbrgn If you are sure that the username and password that you are reading from config is right, this should've worked. I'll check if basic username password auth works with mosquitto broker

dbrgn commented 6 years ago

Here's a network trace:

screenshot

The TCP connection is closed after 3 seconds by the server. I guess this is probably something on the server side.

tekjar commented 6 years ago

@dbrgn Seems so. Local mosquitto broker with username password is working for me. Unfortunately I can't do much without your full setup :(

dbrgn commented 6 years ago

Could the protocol name difference be relevant?

comparison

tekjar commented 6 years ago

@dbrgn Ahhh yeahh..This library only supports the latest mqtt 3.1.1

dbrgn commented 6 years ago

@tekjar can we do something about it to add support for older versions? and if not, is there a way to show a better error message?

tekjar commented 6 years ago

@dbrgn

can we do something about it to add support for older versions

I'm not very keen about this. Most of the brokers support latest version. Can you check if your broker allows latest mqtt as well. It's been very long (2014 to be precise) since 3.1.1 got released

is there a way to show a better error message

Client can't do much as it is the one initiating the connection. Broker should send connack with proper error return code. I'll add a debug print and see the return code in connack

dbrgn commented 6 years ago

It's been very long (2014 to be precise) since 3.1.1 got released

Ah, I wasn't aware that it's that long. I'll ask the backend folks about it :)

tekjar commented 6 years ago

@dbrgn Is your python client failing with 311 option ?

dbrgn commented 6 years ago

The awesome @htdvisser pointed out to me that only one MQTT client with the same ID is allowed to be connected to the server at the same time, and I used a static client ID ("smartmail"). I had another version of the code (based on the non-async branch) running on the server. I assume that the local connection caused the server to drop my other instance, which then automatically reconnected, resulting in a connection drop of the async client a few seconds later 🙈

Maybe a note could be added to the docs about this. Otherwise, the issue can be closed. Thanks @tekjar for the debugging help! :)

tekjar commented 6 years ago

@dbrgn

only one MQTT client with the same ID is allowed to be connected to the server at the same time Ahh yeah, broker should do this to recover from halfopen connections. Would've been easier if you had access to broker logs (: