redboltz / mqtt_cpp

Boost Software License 1.0
438 stars 107 forks source link

Odd infrequent error on reconnect #751

Closed Radagan closed 3 years ago

Radagan commented 3 years ago

I'm seeing a strange error every once in a while after an attempt to connect to AWS IoT Core using mqtt_cpp. This is the error:

terminate called after throwing an instance of 'boost::wrapexcept<boost::system::system_error>'
  what():  handshake: End of file
Aborted

I'm hoping someone here might have an idea what might be going on. I need my mqtt client to run for a very long time with reconnects, so this is a problem I have to find the cause of.

Radagan commented 3 years ago

I'm running with mqtt_cpp from the master branch

kleunen commented 3 years ago

Radagan,

it seems that you are not handling the exceptions. I looked at the examples of mqtt_cpp, and actually, the exceptions are not handled. An End of file can always occur. I believe the exception you mention just means that the connection was closed during the ssl handshake operation.

Can you, instead of running the main event loop without exception handler as show in the example:

ioc.run()

Try handling the exceptions from the event loop, as follows:

for (;;)
{
  try
  {
    ioc.run();
    break; // run() exited normally
  }
  catch (std::exception& e)
  {
    // Deal with exception as appropriate.
  }
}

You will have to perform reconnecting yourself. In the on_error handler of your connection, perform a reconnect:

c->set_error_handler([](boost::system::error_code const& ec){
    if(ec == boost::asio::error::operation_aborted)
        return; // Don't reconnect if operation was aborted

    // Perform reconnect after 5 seconds
     auto mqtt_reconnect_timer = std::make_shared<boost::asio::deadline_timer>(ioc, boost::posix_time::seconds(5));
    mqtt_reconnect_timer->async_wait([mqtt_reconnect_timer](boost::system::error_code error) {
        if(error ==  boost::asio::error::operation_aborted)
            return;

         // Attempt to reconnect here
         ....
    });
});

Please let us know if this helps you,

I think the example of mqtt_cpp could have some update

redboltz commented 3 years ago

When you use sync client then an exception could be thrown. See See https://github.com/redboltz/mqtt_cpp/wiki/Sync-vs-Async

Radagan commented 3 years ago

Thank you both for your replies and assistance. I think I'm now properly guarding for an exception in connect as I am using the sync client, and I've added exception handling around ioc.run() to restart it and reinitialize the client.

@kleunen, I agree about the examples. They are a great start, but it would be very valuable to see best practices incorporated. Not many people use MQTT on a causal basis. ;^) I missed both of these issues following the examples too literally.

Radagan commented 3 years ago

I'll leave the issue open for a little longer while I test...

kleunen commented 3 years ago

Thank you for your feedback and please let us know if the proposed approach is helpful or not in long term.

Radagan commented 3 years ago

Your feedback has helped me greatly, but I would appreciate a moment more of your time. My code is running and I'm recovering gracefully from that periodic "End of file" error, though I'm still not sure what's causing it.

I may still not be approaching this in the best way. My requirements are to have a separate "refresh" thread that closes and reconnects the mqtt client every so many minutes. When used with GCP, this will enable regenerating the authentication token as required. To keep this thread and the error handling situation from interfering with each other, I use an atomic bool and long (for time to reconnect). When it is time, the "refresh" thread calls client_mqtt->disconnect() if it is connected, which causes ioc.run() to exit and, as per this code, restart.

std::cout << "Starting MQTT Service" << std::endl;
        config_client();
        while (!stop_threads) {
            try {
                start_client();
                ioc.run();
                refresh = utils::minutes_from_now(5);
                if (!stop_threads) std::cout << "\033[34;1m\nMQTT Service Restarting. (SK_SIG:" << skip_signals << ")\033[0m" << std::endl;
                ioc.restart();
            }
            catch (const std::exception& ex) {
                refresh = utils::minutes_from_now(5);
                ioc.restart();
                config_client();
                std::cout << "\033[31;1m\nError: " << ex.what() << " -> Setting reinitializing mqtt_client.\033[0m" << std::endl;
                skip_signals = true;
            }
        }

The data is coming in from another thread that listens to a DBus channel and publishes. I use skip_signals to skip over those signals when the mqtt_client is not in configured, but I don't want to block that on a normal reconnect from the "refresh" thread.

start_client() and config_client() do pretty much only what their name states. Here is the part of config_client() that creates the sync client:

...
        client_mqtt = MQTT_NS::make_tls_sync_client(ioc, broker_url, broker_port, mqtt::protocol_version::v3_1_1);
        client_mqtt->set_keep_alive_sec(30);

        client_mqtt->set_client_id(client_id);
        client_mqtt->get_ssl_context().load_verify_file(root_ca);
        client_mqtt->get_ssl_context().use_certificate_chain_file(cert);
        client_mqtt->get_ssl_context().use_private_key_file(private_key, boost::asio::ssl::context::file_format::pem);

#if OPENSSL_VERSION_NUMBER >= 0x10101000L
        SSL_CTX_set_keylog_callback(
            client_mqtt->get_ssl_context().native_handle(),
            [](SSL const*, char const* line) {
                std::cout << line << std::endl;
            }
        );
#endif // OPENSSL_VERSION_NUMBER >= 0x10101000L

        // Setup handlers
...

Any comment on this design would be appreciated.

Radagan commented 3 years ago

Hum... I'm also now getting a Segmentation fault very rarely right after the "refresh" thread disconnects the connection. So, I'm definitely missing something further.

I noticed that the mqtt_client closed handler had not yet been called, so I added a three second sleep before calling ioc.restart(). Will keep testing.

kleunen commented 3 years ago

To be honest I do not completely understand what you are doing. But I think you do not need a separate thread to reconnect the client on a timeout. You can start a deadline timer on the same thread / io_service as the mqtt client, and request a disconnect on the mqtt client after this timeout. You should handle in the on_close of the mqtt_client that you do not want to close the client, but restart a connection.


bool system_shutdown = false;

client_mqtt->set_close_handler([](){
     if(!system_shutdown)
        config_client();
});

auto mqtt_reconnect_timer = std::make_shared<boost::asio::steady_timer>(ioc, std::chrono::minutes(5));
    mqtt_reconnect_timer->async_wait([mqtt_reconnect_timer](boost::system::error_code error) {
        if(error ==  boost::asio::error::operation_aborted)
            return;

         // Attempt to reconnect here
         .client_mqtt->disconnect();
    });

// Some time later

Perform actual shutdown:
system_shutdown = true;
.client_mqtt->disconnect();
Radagan commented 3 years ago

What I had is actually similar to this, but your approach is better. I replaced my refresh thread with your solution above. It works well except for the need to expire it if there is an error. I made the mqtt_reconnect_timer a member of my class and have this in my error handler:

        client_mqtt->set_error_handler(
            [this](MQTT_NS::error_code ec) {
                if(ec == boost::asio::error::operation_aborted)
                    return; // Don't reconnect if operation was aborted

                std::cout << "\033[31;1m[MQTT Error: (" << ec.value() << ") " << ec.message() << ". Attempting reconnect.\033[0m" << std::endl;
                skip_signals = true;
                mqtt_reconnect_timer->expires_from_now(std::chrono::seconds(1));
       });

This causes my ioc.run() to exit and the client is restarted there in the normal loop--not the exception:

std::cout << "Starting MQTT Service" << std::endl;
        config_client();
        while (!stop_threads) {
            try {
                start_client();
                ioc.run();
                if (!stop_threads) std::cout << "\033[34;1m\nMQTT Service Restarting. (SK_SIG:" << skip_signals << ")\033[0m" << std::endl;
                ioc.restart();
            }
            catch (const std::exception& ex) {
                std::cout << "\033[31;1m\nError: " << ex.what() << " -> Setting reinitializing mqtt_client.\033[0m" << std::endl;
                skip_signals = true;
                ioc.restart();
                config_client();
            }
        }

Does this make sense?

I tried just calling client->disconnect(), before this, but that didn't work. The client remained disconnected until the timer expired, when the connection was restored. Since, all the reconnect timer does is call client_mqtt->disconnect(), I'm not sure why this doesn't work.

Radagan commented 3 years ago

Btw, my reconnect time is five minutes to facilitate testing. In production it will be 20 minutes for GCP, or longer for other brokers.

Radagan commented 3 years ago

This is a fantastic library, but an example that shows best practices for a long running client would be very valuable as boost::asio takes a little getting used to.

kleunen commented 3 years ago

Yes. This looks good. I agree about the examples. I will have a look at the examples how it can be improved.

redboltz commented 3 years ago

There is a tutorial on the wiki. See https://github.com/redboltz/mqtt_cpp/wiki

Radagan commented 3 years ago

Thank you for your help. I am still getting a seg fault after about eight hours of run-time, but I believe that's coming from some internal libraries that I'm using. Thank you again for a great library to work with. In time, perhaps I can help contribute to it. It is certainly now a critical part of our code base.

Radagan commented 3 years ago

There is a tutorial on the wiki. See https://github.com/redboltz/mqtt_cpp/wiki

Yes, and I used it. But the wiki does not cover the issues uncovered here at all. The examples are minimal and do not represent important topics like:

They are good examples, but I believe most users of your library would be interested in how best to use it for an actual application. Boost ASIO is not that common a framework for many modern C++ programmers.

redboltz commented 3 years ago

Thank you for the feedback. Unfortunately I don't have much time. Boost Asio is very popular network framework. The next C++ standard and existing TS is based on Boost.Asio, IIUC. See https://github.com/cplusplus/networking-ts So it is almost standard library. mqtt_cpp is based on boost asio. Maybe I should document that "First, read Boost.Asio document. And then use mqtt_cpp".

But it is welcome you write a good example or tutorial on the Wiki.

Radagan commented 3 years ago

I understand. I'm not saying I shouldn't know it, just that very many don't. A good link to a Boost.Asio primer would be a good thing to add to the docs. I wish I had gone and done that first instead of diving in thinking that standard C++ would be sufficient. Many of us work in companies that are still stuck in C++14 (due to compiler limitations in required toolchains). I'm lucky to be C++17 with my project.

I almost suggested that I might contribute to the wiki, but before one teaches, one should know. ;^)

Radagan commented 3 years ago

If I did write an example it would need review. How would it be best for me to go about that? I can think of a few examples that I would like to have vetted by you and your team. Candidly, that would ensure I was doing things in the best possible manner.

redboltz commented 3 years ago

Thanks. It is very difficult to choose/decide what kind of example is good. Users have various kinds of issue and the example too much focused on specific case is usually misleading. From my experience, I usually enter the left-top search box to my problem. e.g. exception And then, click issue or pull-request, or sometimes code. https://github.com/redboltz/mqtt_cpp/search?q=exception&type=issues

Now, this issue #751 is hit. So our conversation would help someone in the future. It is a little but important improvement.

I know the summarized document is more helpful. But I need to prioritize many feature requests ;)

redboltz commented 3 years ago

If I did write an example it would need review. How would it be best for me to go about that? I can think of a few examples that I would like to have vetted by you and your team. Candidly, that would ensure I was doing things in the best possible manner.

I think that creating a new pull request that adds the example is good way. We can use github pull request review (or comment) interface. After the review approved, then your example would be merged into the master.

Radagan commented 3 years ago

Thank you. I will make effort to do so as soon as I have this current code in testing.

kleunen commented 3 years ago

Please have a look at #785