eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
516 stars 102 forks source link

Cannot reconnect using the `async_subscribe.rs` official example #168

Closed Ks89 closed 1 year ago

Ks89 commented 1 year ago

Hi I'm running the official example async_subscribe.rs in this repo (master branch)!

I'm using mosquitto locally via it's official docker container in this way:

docker run -it --name mosquitto -p 1883:1883 
--rm -v $PWD/mosquitto/mosquitto-no-security.conf:/mosquitto/config/mosquitto.conf
-v /mosquitto/data -v /mosquitto/log eclipse-mosquitto

with this config file:

persistence false
allow_anonymous true
connection_messages true
log_type all
listener 1883

Connection

I'm able to connect to the server and these are my logs:

async_subscribe.rs:

cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s
     Running `target/debug/rust-mosquitto`
Connecting to the MQTT server...
Subscribing to topics: ["test", "hello"]
Waiting for messages...

mosquitto:

1662814092: mosquitto version 2.0.15 starting
1662814092: Config loaded from /mosquitto/config/mosquitto.conf.
1662814092: Opening ipv4 listen socket on port 1883.
1662814092: Opening ipv6 listen socket on port 1883.
1662814092: mosquitto version 2.0.15 running
1662814097: New connection from 172.17.0.1:64556 on port 1883.
1662814097: New client connected from 172.17.0.1:64556 as rust_async_subscribe (p2, c0, k30).
1662814097: Will message specified (32 bytes) (r0, q1).
1662814097:     test
1662814097: Sending CONNACK to rust_async_subscribe (0, 0)
1662814097: Received SUBSCRIBE from rust_async_subscribe
1662814097:     test (QoS 1)
1662814097: rust_async_subscribe 1 test
1662814097:     hello (QoS 1)
1662814097: rust_async_subscribe 1 hello
1662814097: Sending SUBACK to rust_async_subscribe

Send message

When I send a message via

mosquitto_pub -m "text message payload" -t test

I receive this logs:

async_subscribe.rs:

...
test: text message payload

mosquitto:

...
1662814128: Received PINGREQ from rust_async_subscribe
1662814128: Sending PINGRESP to rust_async_subscribe
1662814159: Received PINGREQ from rust_async_subscribe
1662814159: Sending PINGRESP to rust_async_subscribe
1662814190: Received PINGREQ from rust_async_subscribe
1662814190: Sending PINGRESP to rust_async_subscribe
1662814213: New connection from 172.17.0.1:64558 on port 1883.
1662814213: New client connected from 172.17.0.1:64558 as auto-3483FE90-944C-5B0B-9C65-6E53BA3260F8 (p2, c1, k60).
1662814213: No will message specified.
1662814213: Sending CONNACK to auto-3483FE90-944C-5B0B-9C65-6E53BA3260F8 (0, 0)
1662814213: Received PUBLISH from auto-3483FE90-944C-5B0B-9C65-6E53BA3260F8 (d0, q0, r0, m0, 'test', ... (20 bytes))
1662814213: Sending PUBLISH to rust_async_subscribe (d0, q0, r0, m0, 'test', ... (20 bytes))
1662814213: Received DISCONNECT from auto-3483FE90-944C-5B0B-9C65-6E53BA3260F8
1662814213: Client auto-3483FE90-944C-5B0B-9C65-6E53BA3260F8 disconnected.
1662814220: Received PINGREQ from rust_async_subscribe
1662814220: Sending PINGRESP to rust_async_subscribe

Reconnect

However, when I kill and restart the mosquitto container, async_subscribe.rs won't be able to receive messages.

async_subscribe.rs:

...
test: Async subscriber lost connection
Lost connection. Attempting reconnect.
Error reconnecting: PahoDescr(-1, "TCP connect completion failure")
Error reconnecting: PahoDescr(-1, "TCP/TLS connect failure")
Error reconnecting: PahoDescr(-1, "TCP/TLS connect failure")
Error reconnecting: PahoDescr(-1, "TCP/TLS connect failure")
Error reconnecting: PahoDescr(-1, "TCP/TLS connect failure")
...

mosquitto:

1662814436: mosquitto version 2.0.15 starting
1662814436: Config loaded from /mosquitto/config/mosquitto.conf.
1662814436: Opening ipv4 listen socket on port 1883.
1662814436: Opening ipv6 listen socket on port 1883.
1662814436: mosquitto version 2.0.15 running
1662814437: New connection from 172.17.0.1:64560 on port 1883.
1662814437: New client connected from 172.17.0.1:64560 as rust_async_subscribe (p2, c0, k30).
1662814437: Will message specified (32 bytes) (r0, q1).
1662814437:     test
1662814437: Sending CONNACK to rust_async_subscribe (0, 0)

Mosquitto won't receive the Received SUBSCRIBE from rust_async_subscribe.

Why is not working? it there a bug in the official example or am I missing something?

thanks.

vicat47 commented 1 year ago

same question

paho-mqtt

2022-11-22T09:21:51 [ERROR] - Error reconnecting: [-1] TCP connect timeout
2022-11-22T09:22:54 [ERROR] - Error reconnecting: [-1] TCP connect timeout
2022-11-22T09:23:57 [ERROR] - Error reconnecting: [-1] TCP connect timeout
2022-11-22T09:25:02 [ERROR] - Error reconnecting: [-1] TCP connect timeout
2022-11-22T09:26:05 [ERROR] - Error reconnecting: [-1] TCP connect timeout
2022-11-22T09:27:09 [ERROR] - Error reconnecting: [-1] TCP connect timeout
2022-11-22T09:28:12 [ERROR] - Error reconnecting: [-1] TCP connect timeout
# reconnected
# log nothing

mosquitto

1669080717: New connection from 10.0.0.1:52240 on port 8883.
1669080717: New client connected from 10.0.0.1:52240 as ***********-service (p2, c1, k30).
vicat47 commented 1 year ago

need to re-subscribe topic after reconnect.

while let Some(msg_opt) = strm.next().await {
    if let Some(msg) = msg_opt {
        log::info!("receved message from topic: {}", msg.topic());
        log::info!("receved message content: {}", msg.payload_str());
    }
    else {
        // A "None" means we were disconnected. Try to reconnect...
        log::error!("Lost connection. Attempting reconnect.");
        while let Err(err) = cli.reconnect().await {
            log::error!("Error reconnecting: {}", err);
            // For tokio use: tokio::time::delay_for()
            async_std::task::sleep(Duration::from_millis(1000)).await;
        }
        log::info!("Reconnect successful, continue listening...");
        log::info!("Resubscribing to topics: {:?}", "test/#");
        cli.subscribe("test/#", 1).await?;
    }
}
msdrigg commented 1 year ago

@vicat47 's solution fixed it for me :). Thank you

I would recommend updating the official solution so that it re-subscribes upon reconnection

tobdub-snce commented 1 year ago

Maybe persistence false option to mosquitto is the problem? I think the subscription should be restored if clean_session is false.

It could be nice to have an option in the library to automatically re-subscribe upon reconnection even when using clean_session = true.

fpagliughi commented 1 year ago

Apologies for the slow response.

The async_subscribe examples are explicitly meant to show that you don't need to resubscribe to topics when you are using persistent sessions via clean_session(false) for v3.x and clean_start(false) for v5.

With a persistent session, if you are disconnected from the server - either purposely with disconnect() or accidentally with a network drop - the server will remember all your subscriptions and keep a copy of any matching messages for your client while you're disconnected. When you reconnect, it will blast all of those messages at you immediately. (You will likely start getting message callbacks before the connect() even returns, so be careful!)

The problem with your test is that rather than yanking the network cable/connection between your client and the broker you are killing the broker - and discarding all of the persistent session information that it's holding. That's not "normal" in the real world.

In a normal production environment, the broker should not go down, and even if it did, it would likely have some type of persistence so that when it came back up, it would remember the existing sessions and the messages that it had saved for them.

That said, in reality, we actually only need to subscribe on the initial connect if this is the first time we're running the example against a particular broker and need to establish the persistent session for the first time. On subsequent runs we would just be reconnecting, and the server should remember us.

So, how do you know?

When you connect, the server sends back a "connection response" which contains a session_present flag. If true it means that the server remembers you and you don't need to subscribe. If false, the server does not remember you and you do need to subscribe. The sync_consume example demonstrates this:

https://github.com/eclipse/paho.mqtt.rust/blob/eee12c62aaba6309f95fb15ba1ebe1a2eb721e6e/examples/sync_consume.rs#L108-L129

Also note, though, that the concept of "reconnect" is not part of the MQTT protocol, it's a convence function in the library. It means "connect again with the connection parameters already specified". So even on a reconnect, you might want to check the sync_consume flag to be 100% sure. Honestly, I don't remember if the library gives the server response back to you via reconnect(), but if not, it should be. And if not, instead of reconnect, you can always just call connect() again with a clone of the connection parameters.

Ks89 commented 1 year ago

@fpagliughi finally I found some time to work on this again. Thank you for the amazing explanation, it's very clear and detailed. Using persistent sessions I solved my problem.

Ks89 commented 1 year ago

@fpagliughi

With async_subscribe.rs example everything works as expected using:

persistence true
persistence_location /mosquitto/data/

however the official legacy_async_subscribe.rs example returns:

...
test - Async subscriber lost connection
Connection lost. Attempting reconnect.
Connection attempt failed with error code -1.

Connection attempt failed with error code -1.

asyncsubscribelegacy(42831,0x700003cc8000) malloc: *** error for object 0x6000035b4060: pointer being freed was not allocated
asyncsubscribelegacy(42831,0x700003cc8000) malloc: *** set a breakpoint in malloc_error_break to debug
[1]    42831 abort      cargo run

Which is the correct configuration to try legacy_async_subscribe.rs reconnection without errors?