eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
516 stars 102 forks source link

.unwrap() inside the library causes whole program to crash #152

Closed bekircanagaoglu closed 2 years ago

bekircanagaoglu commented 2 years ago

Hi,

Initially I used auto_reconnect in connection options with (inital_wait = 1, max_wait = 5) but when I disconnected from WiFi for 15-20 mins and reconnected, I didn't received new messages in following code.

Note: I'm using sync client.

let rx = mqtt_client.start_consuming()
for msg in rx.iter() {
    if let Some(message) = msg {
       process(messages)
    } else {
       // Wait for client to reconnect  
    }
}

Then I disabled auto reconnect and changed the code as following:

let rx = mqtt_client.start_consuming()
for msg in rx.iter() {
    if let Some(message) = msg {
       process(messages)
    } else {
        // Wait for client to reconnect  
        let mut sleep_secs = 1;
        let max_sleep = 6;
        while !client.is_connected() {
            let token = client.reconnect();
            if let Ok(_) = token {
                log::info!("MQTT client reconnected");
                break;
             }
            sleep(Duration::from_secs(sleep_secs));
            sleep_secs = sleep_secs * 2 % max_sleep;
        }
    }
}

Host that is running the MQTT broker was under load and my client disconnected and connected few times, then crashed on "unwrap" call at async_client.rs, line 1022. At tx.send(msg).unwrap();

    pub fn start_consuming(&mut self) -> Receiver<Option<Message>> {
        let (tx, rx) = channel::unbounded::<Option<Message>>();

        // Make sure at least the low-level connection_lost handler is in
        // place to notify us when the connection is lost (sends a 'None' to
        // the receiver).
        let ctx: &InnerAsyncClient = &self.inner;
        unsafe {
            ffi::MQTTAsync_setConnectionLostCallback(
                self.inner.handle,
                ctx as *const _ as *mut c_void,
                Some(AsyncClient::on_connection_lost),
            );
        }

        // Message callback just queues incoming messages.
        self.set_message_callback(move |_, msg| {
            tx.send(msg).unwrap();
        });

        rx
    }

Here is the stacktrace:


thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /home/msi/.cargo/registry/src/github.com-1ecc6299db9ec823/paho-mqtt-0.10.0/src/async_client.rs:1022:26
stack backtrace:
   0:     0x55cd5edc7e0c - std::backtrace_rs::backtrace::libunwind::trace::h09f7e4e089375279
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
   1:     0x55cd5edc7e0c - std::backtrace_rs::backtrace::trace_unsynchronized::h1ec96f1c7087094e
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
   2:     0x55cd5edc7e0c - std::sys_common::backtrace::_print_fmt::h317b71fc9a5cf964
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:67:5
   3:     0x55cd5edc7e0c - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::he3555b48e7dfe7f0
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:46:22
   4:     0x55cd5ede968c - core::fmt::write::h513b07ca38f4fb1b
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/fmt/mod.rs:1149:17
   5:     0x55cd5edc2685 - std::io::Write::write_fmt::haf8c932b52111354
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/io/mod.rs:1697:15
   6:     0x55cd5edc99e0 - std::sys_common::backtrace::_print::h195c38364780a303
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:49:5
   7:     0x55cd5edc99e0 - std::sys_common::backtrace::print::hc09dfdea923b6730
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:36:9
   8:     0x55cd5edc99e0 - std::panicking::default_hook::{{closure}}::hb2e38ec0d91046a3
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:211:50
   9:     0x55cd5edc9595 - std::panicking::default_hook::h60284635b0ad54a8
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:228:9
  10:     0x55cd5edca094 - std::panicking::rust_panic_with_hook::ha677a669fb275654
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:606:17
  11:     0x55cd5edc9b70 - std::panicking::begin_panic_handler::{{closure}}::h976246fb95d93c31
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:502:13
  12:     0x55cd5edc82b4 - std::sys_common::backtrace::__rust_end_short_backtrace::h38077ee5b7b9f99a
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:139:18
  13:     0x55cd5edc9ad9 - rust_begin_unwind
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:498:5
  14:     0x55cd5e931441 - core::panicking::panic_fmt::h35f3a62252ba0fd2
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/panicking.rs:107:14
  15:     0x55cd5e931533 - core::result::unwrap_failed::hb53671404b9e33c2
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/result.rs:1613:5
  16:     0x55cd5eae856d - paho_mqtt::async_client::AsyncClient::start_consuming::{{closure}}::h2cf75e50990fa7bf
  17:     0x55cd5eae7558 - paho_mqtt::async_client::AsyncClient::on_message_arrived::h1e59fd8ee9e52a67
  18:     0x55cd5eb08104 - Protocol_processPublication
  19:     0x55cd5eb0a5f7 - MQTTProtocol_handlePublishes
  20:     0x55cd5eb06d8e - MQTTAsync_receiveThread
  21:     0x7fb318211947 - start_thread
                               at ./nptl/./nptl/pthread_create.c:435:8
  22:     0x7fb3182a1a44 - __GI___clone
                               at ./misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:100
  23:                0x0 - <unknown>
fpagliughi commented 2 years ago

Yeah, none of that is good!

First, the unwrap() on the channel send doesn't belong there. That will crash out any time the application drops the receiver. It should probably ignore the error. But really the app should stop consuming when it drops the receiver. I still need to implement stop_consuming().

This should probably happen automatically. Maybe instead of a raw channel Receiver, the start_consuming() call should return a guard that stops consuming when it gets dropped.

But.... I'm not seeing where the receiver is being dropped in your code snippet. I'll have to test it out. The purpose of the double option channel was so that the receiver could be held during disconnects and continue when the connection was re-established.

And all of this wouldn't have been necessary if the auto-reconnect worked. I did receive a report that reconnect didn't work if it was down for a while (15-20 min sounds familiar!)

I have a vague memory of suspecting the SSL/TLS layer. Can you share your create & connect options? (Omit any user names, passwords, private URLs, etc, but show if you're using them)

Also, what platform are you using? Linux/Windows/Mac, etc

bekircanagaoglu commented 2 years ago

First, the unwrap() on the channel send doesn't belong there. That will crash out any time the application drops the receiver. It should probably ignore the error. But really the app should stop consuming when it drops the receiver. I still need to implement stop_consuming().

This should probably happen automatically. Maybe instead of a raw channel Receiver, the start_consuming() call should return a guard that stops consuming when it gets dropped.

But.... I'm not seeing where the receiver is being dropped in your code snippet. I'll have to test it out. The purpose of the double option channel was so that the receiver could be held during disconnects and continue when the connection was re-established.

It does get dropped, seems like I skipped including that part. Also I think this happened when I run two instances of the program by mistake, and since they had same client id, they kept disconnecting each other so consumer just dropped over and over again in short period of time.

Here is the full snippet:

loop {
  let resp =  mqtt_client.subscribe_many(vec!["topic1", "topic2"].as_slice(), vec![1, 1].as_slice()); 

  if let Err(e) = resp {
      sleep 1 second
      continue;
  }

  let rx = mqtt_client.start_consuming()
  for msg in rx.iter() {
      if let Some(message) = msg {
         process(messages)
      } else {
          // Wait for client to reconnect  
          let mut sleep_secs = 1;
          let max_sleep = 6;
          while !client.is_connected() {
              let token = client.reconnect();
              if let Ok(_) = token {
                  log::info!("MQTT client reconnected");
                  break;
               }
              sleep(Duration::from_secs(sleep_secs));
              sleep_secs = sleep_secs * 2 % max_sleep;
          }
      }
  }
}

On reconnect, I just re-subscribe and start consuming again. Maybe I shouldn't?

And I've just realized, if client disconnect between reconnect and subscribe, this might get me into an infinite loop. I guess I should add another reconnect logic there (if subscribe_many returns an error).

And all of this wouldn't have been necessary if the auto-reconnect worked. I did receive a report that reconnect didn't work if it was down for a while (15-20 min sounds familiar!)

I have a vague memory of suspecting the SSL/TLS layer. Can you share your create & connect options? (Omit any user names, passwords, private URLs, etc, but show if you're using them)

Also, what platform are you using? Linux/Windows/Mac, etc

Broker (mosquitto) runs on a Raspberry Pi on my local network, there is no TLS.

I'm using Linux (Ubuntu) and connect as following:

pub fn create_mqtt_client(mqtt_details: MqttDetails) -> Result<paho_mqtt::Client, Error> {
    let create_options = paho_mqtt::CreateOptionsBuilder::new()
        .server_uri(mqtt_details.url /* this is IP of the raspberry pi in local network */)
        .client_id("some_client_id")
        .finalize();

    let cli = paho_mqtt::Client::new(create_options)?;

    let mut conn_opts = paho_mqtt::ConnectOptionsBuilder::new();
    conn_opts
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(mqtt_details.clean_session /* this is true */);

    cli.connect(conn_opts.finalize())?;
    Ok(cli)
}
fpagliughi commented 2 years ago

Cool. Thanks for the additional information.

What I meant with "I don't see rx being dropped", was that the receiver loop should not exit just because the connection closes:

for msg in rx.iter() {
    // Should keep running until you manually break out
}

The receiver's iterator should only exit from the loop when the transmitter is closed, but the transmitter is still there and gets an error when it tried to send a message to the receiver which has closed. So, somehow, the rx loop exited, rx went out of scope, and then a message arrived.

fpagliughi commented 2 years ago

I'mg going to push a commit today that will at least prevent the program from exiting if this happens. Instead, it will log the error and keep running.

In addition, the commit contains new code to stop consuming (or streaming if you're using async/await), and functions to remove callbacks. If you were to call stop_consuming() from another thread, then I think the rx loop should exit, because that should drive the tx out of scope. I'm about to test that out.

If that all looks good, I'll move on to trying to figure out why the auto-reconnect wan't working. But the new Rust release will also sit atop the new C v1.3.10 release, so that's bringing in a few bug fixes.

bekircanagaoglu commented 2 years ago

I'mg going to push a commit today that will at least prevent the program from exiting if this happens. Instead, it will log the error and keep running.

In addition, the commit contains new code to stop consuming (or streaming if you're using async/await), and functions to remove callbacks. If you were to call stop_consuming() from another thread, then I think the rx loop should exit, because that should drive the tx out of scope. I'm about to test that out.

If that all looks good, I'll move on to trying to figure out why the auto-reconnect wan't working. But the new Rust release will also sit atop the new C v1.3.10 release, so that's bringing in a few bug fixes.

Thank you!

fpagliughi commented 2 years ago

OK. I pushed an intermediate fix to master. It will, at least, not panic if the receiver disappears while the consumer is still running.

There is also the new stop_consuming() call in the clients. This removes the callback, thus dropping the closure and the transmitter it holds. When that happens, the consumer's rx loop should definitely end.

To test that, I added a Ctrl-C handler to the sync_consume and sync_consume_v5 examples, with the handler calling the stop_consuming() function. So now, if you hit ^C, the program should disconnect and exit cleanly.

https://github.com/eclipse/paho.mqtt.rust/blob/dc061e158c1b2f271736ef78de8c1f66d814aad5/examples/sync_consume.rs#L138-L156

fpagliughi commented 2 years ago

The fix for the crash (remove the unwrap()) went out in v0.11. I'm going to keep this open for a while to remember to address the additional, underlying issues, although I may create separate issues to deal with them individually.

bekircanagaoglu commented 2 years ago

The fix for the crash (remove the unwrap()) went out in v0.11. I'm going to keep this open for a while to remember to address the additional, underlying issues, although I may create separate issues to deal with them individually.

I've just realized, there is a break after reconnect. It breaks out of rx.iter() loop after reconnect happens. This is why rx has been dropped.

fpagliughi commented 2 years ago

From what you posted, it appears that the break; should be getting you out of the inner while loop, not the outer for loop.

In some of the examples, I do use a break to get out of the for loop if the reconnect doesn't work after some amount of time. Depending on the application you may want to restart the app or keep trying reconnect() forever. It's up to you. But either way, you may have thrown some copypasta into your test that you didn't mean to.

The code is a little obfuscated. It could be like:

    for msg in rx.iter() {
        if let Some(msg) = msg {
            process(msg);
        }
        else {
            let mut delay = Duration::from_secs(1);
            const MAX_DELAY: Duration = Duration::from_secs(6);

            println!("Disconnected. Trying to reconnect...");
            while cli.reconnect().is_err() {
                thread::sleep(delay);
                delay = std::cmp::min(2 * delay, MAX_DELAY);
            }
            println!("Reconnected");
        }
    }
bekircanagaoglu commented 2 years ago

From what you posted, it appears that the break; should be getting you out of the inner while loop, not the outer for loop.

In some of the examples, I do use a break to get out of the for loop if the reconnect doesn't work after some amount of time. Depending on the application you may want to restart the app or keep trying reconnect() forever. It's up to you. But either way, you may have thrown some copypasta into your test that you didn't mean to.

What I meant was there were 2 breaks in the original code, I missed the one while re-typing here. More specifically there is a function called "block_until_reconnect()" which looks like this:

fn block_until_reconnect(client: &Client) {
        while !client.is_connected() {
            let token = client.reconnect();
            if let Ok(_) = token {
                log::info!("MQTT client reconnected");
                break;
             }
            sleep(Duration::from_secs(sleep_secs));
            sleep_secs = sleep_secs * 2 % max_sleep;
        }
  }

And original loop looks like this


let rx = mqtt_client.start_consuming()
for msg in rx.iter() {
    if let Some(message) = msg {
       process(messages)
    } else {
        if !client.is_connected() {
            block_until_reconnect(&client)
        }
        break;
    }
}

I merged them for sake of brevity but missed that part. Sorry about that.

fpagliughi commented 2 years ago

Oh, OK. Great; thanks for letting me know so I didn’t waste time chasing a ghost bug.

Next I’ll start testing issues with auto reconnect.

fpagliughi commented 2 years ago

The partial fix went out with v0.11.0. I'm going to close this and open a new issue concerning auto-reconnect. Feel free to re-open this if there are still problems.