eclipse / paho.mqtt.cpp

Other
1.03k stars 437 forks source link

Destructor Hangs When Actions are in Process #204

Open kmiller15211 opened 5 years ago

kmiller15211 commented 5 years ago

Background I had a question about shutdown/object destruction (using the SSL version of the SDK). We are using the C++ SDK on an Embedded Linux mobile device, and we have no power button - only a small cap to keep the processor alive while we shutdown. This means that our code must react to the shutdown almost immediately and terminate as fast as possible.

Issue In our use case, I have a background upload thread that's connecting, then pushing data to AWS IoT. This thread does a connect, gets the connect token back, and polls for shutdowns or connect token wait_for() success. While this thread is running, and is in the midst of waiting for connect() to complete, the main thread detects a system shutdown, and sets our shutdown flag, which our polling upload thread in the midst of waiting for connect() completion detects. From here, the connecting upload thread destructs the mqtt::token_ptr object it was using to wait on, and returns. This is all good, but then when the mqtt::async_client connection destructor is called from the context of the main thread that's shutting down, it hangs for approximately a minute before returning. I've tried calling stop_consuming() in the shutdown, as well, to no avail. I'm guessing the async_client object is still processing the connect() call in the background when the destructor is called, and blocks. Is there another API usage that would allow this to respond quickly and cancel the current action? I can't block on shutdown in any circumstance.

Note that to more easily test this, I nerf the network connection using netem, like this: tc qdisc add dev eth0 root netem loss 95%, and, as soon as connect() is called, I attempt to kill the app.

Summary Does anyone have tips for using the SDK API calls such that we can interrupt these MQTT actions like connect/publish/disconnect to shut down and destruct promptly?

Note that I've tried both the master and develop branches of Paho C++ SDK, and this hang appears in both branches, from what I can tell. I'm using the master branch of the Paho C SDK.

kmiller15211 commented 5 years ago

Note: I also tried using the develop branch of the Paho C library as well - same issue, so this doesn't seem to have been fixed there.

Other possibly interesting notes:

fpagliughi commented 5 years ago

That's an interesting one. If you look at the async_client destructor, you'll see it's a single line call into the C lib, MQTTAsync_destroy(&cli_). So it's getting hung up in the C lib, and seems like it would be most proper to fix it there. So we should probably reopen this:

https://github.com/eclipse/paho.mqtt.c/issues/690#event-2512755655

I guess there could be a hack if you're going into a hard shutdown. We could maybe skip the destruction. Certainly it doesn't matter if the app leaks memory. The only problem might be with persistence, if you're using it, as the file system could be in the middle of an operation.

But let's move this over to the C issue and look for a proper resolution first.

kmiller15211 commented 5 years ago

In case it would be useful, here's a hacked up version of ssl_publish that exhibits the same issue. I used AWS IoT for this, and you'll want to update the server address and client ID fields that I named "PUT_YOUR...".

Prior to running this test, I was severely limiting the network by using the two modules: opkg install iproute2-tc opkg install kernel-module-sch-netem

And then calling this to do the limiting: tc qdisc add dev eth0 root netem loss 95%

And call this to bring it back: tc qdisc del dev eth0 root

Here's the code (note that I also had to link against "Threads::Threads" in the CMakeLists.txt file:

// ssl_publish.cpp
//
// This is a Paho MQTT C++ client, sample application.
//
// It's an example of how to connect to an MQTT broker securely, and then
// send messages as an MQTT publisher using the C++ asynchronous client
// interface.
//
// The sample demonstrates:
//  - Connecting to an MQTT server/broker securely
//  - Setting SSL/TLS options
//  - Last will and testament
//  - Publishing messages
//  - Using asynchronous tokens
//  - Implementing callbacks and action listeners
//
// We can test this using mosquitto configured with certificates in the
// Paho C library. The C library has an SSL/TSL test suite, and we can use
// that to test:
//     mosquitto -c paho.mqtt.c/test/tls-testing/mosquitto.conf
//
// Then use the file "test-root-ca.crt" from that directory
// (paho.mqtt.c/test/tls-testing) for the trust store for this program.
//

/*******************************************************************************
 * Copyright (c) 2013-2017 Frank Pagliughi <fpagliughi@mindspring.com>
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Frank Pagliughi - initial implementation and documentation
 *******************************************************************************/

#include <iostream>
#include <cstdlib>
#include <string>
#include <chrono>
#include <cstring>
#include "mqtt/async_client.h"
#include <signal.h>

const std::string DFLT_SERVER_ADDRESS   { "ssl://PUT_YOUR_AWS_URL_HERE:8883" };
const std::string DFLT_CLIENT_ID        { "PUT_YOUR_THING_NAME_HERE" };

const std::string TOPIC { "test" };

const char* PAYLOAD1 = "Hello World!";
const char* PAYLOAD2 = "Hi there!";

const char* LWT_PAYLOAD = "Last will and testament.";

const int  QOS = 1;
const auto TIMEOUT = std::chrono::seconds(10);

bool g_shutdown = false;

/////////////////////////////////////////////////////////////////////////////

/**
 * A callback class for use with the main MQTT client.
 */
class callback : public virtual mqtt::callback
{
public:
    void connection_lost(const std::string& cause) override {
        std::cout << "\nConnection lost" << std::endl;
        if (!cause.empty())
            std::cout << "\tcause: " << cause << std::endl;
    }

    void delivery_complete(mqtt::delivery_token_ptr tok) override {
        std::cout << "\tDelivery complete for token: "
            << (tok ? tok->get_message_id() : -1) << std::endl;
    }
};

/////////////////////////////////////////////////////////////////////////////

using namespace std;

bool WaitForActionCompletion(const mqtt::token_ptr& waitToken,
                             const char* actionName,
                             int secsToWait)
{
    using clock = std::chrono::system_clock;
    using sec = std::chrono::duration<double>;

    auto start = clock::now();
    while (!g_shutdown)
    {
        cout << "About to wait for token for: " << actionName << endl;
        auto actionCompleted = waitToken->wait_for(100);
        cout << "Wait_for completed for: " << actionName << endl;

        // The action has completed.
        if (actionCompleted)
        {
            if (0 == waitToken->get_return_code())
            {
                cout << actionName << " completed successfully" << endl;
                return true;
            }
            else
            {
                cout << "MQTT failed with err: " << waitToken->get_return_code() << " while waiting for: " << actionName << endl;
                return false;
            }
        }

        const sec duration = clock::now() - start;
        if (duration.count() > secsToWait)
        {
            cout << "MQTT timed out waiting for " << actionName << " to complete" << endl;
            return false;
        }
    }

    cout << "Throwing g_shutdown exception from action: " << actionName << endl;
    string errStr = "Shutdown signal received while waiting for: ";
    errStr += actionName;
    throw std::runtime_error(errStr.c_str());
}

bool Connect(mqtt::async_client& client, mqtt::connect_options& connopts)
{
    try
    {
        cout << "About to connect" << endl;
        auto spConnectionToken = client.connect(connopts);
        cout << "Connect call completed, waiting for action completion" << endl;

        return WaitForActionCompletion(spConnectionToken, "Connect", 10 * 1000);
    }
    catch(const std::exception& exc)
    {
        cout << "Caught exception while connecting: " << exc.what() << endl;
    }

    return false;
}

void ConnectAndPublish(int argc, char* argv[])
{
    string  address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS,
        clientID = (argc > 2) ? string(argv[2]) : DFLT_CLIENT_ID;

    cout << "Initializing for server '" << address << "'..." << endl;
    mqtt::async_client client(address, clientID);

    callback cb;
    client.set_callback(cb);

    //mqtt::connect_options connopts("testuser", "testpassword");
    mqtt::connect_options connopts;

    mqtt::ssl_options sslopts;
    //sslopts.set_trust_store("test-root-ca.crt");

    //--------------------------------------------------
    // KCM add
    connopts.set_mqtt_version(MQTTVERSION_3_1_1);

    // The keep alive interval will ping the server. We'll ping halfway through our
    // timeout, if we're not getting data.
    constexpr const unsigned g_inactivityTimeoutSecs = 5 * 60;
    constexpr unsigned g_connectTimeoutSecs = 5;

    connopts.set_keep_alive_interval(std::chrono::seconds(g_inactivityTimeoutSecs / 2));
    connopts.set_connect_timeout(g_connectTimeoutSecs);

    // Don't remember any information from the previous session; start clean.
    connopts.set_clean_session(true);

    // Our class will handle reconnections; don't let the SDK reconnect automatically.
    connopts.set_automatic_reconnect(false);

    constexpr const char* iotRootCAFileName = "IotRootCA.pem";
    constexpr const char* deviceCertificateFileName = "DeviceCertificate.crt";
    constexpr const char* devicePrivateKeyFileName = "DevicePrivateKey.key";

    // The trust store is simply a path to the CRT file
    sslopts.set_trust_store(iotRootCAFileName); //< This is the root CA, what we trust.
    sslopts.set_key_store(deviceCertificateFileName); //< Our own cert, per device
    sslopts.set_private_key(devicePrivateKeyFileName);
    //--------------------------------------------------

    mqtt::message willmsg(TOPIC, LWT_PAYLOAD, 1, true);
    mqtt::will_options will(willmsg);

    connopts.set_will(will);
    connopts.set_ssl(sslopts);

    cout << "  ...OK" << endl;

    try
    {
        Connect(client, connopts);
    }
    catch (const mqtt::exception& exc) {
        cerr << exc.what() << endl;
        return;
    }
}

void sig_handler(int signo)
{
    cout << "Signal handler got signo: " << signo << endl;

    if (signo == SIGINT)
    {
        g_shutdown = true;
    }
}

int main(int argc, char* argv[])
{
    if (signal(SIGINT, sig_handler) == SIG_ERR)
    {
        throw std::runtime_error("Failed to set up signal handler");
    }

    std::thread workerThread(ConnectAndPublish, argc, argv);

    cout << "Joining worker thread." << endl;
    workerThread.join();
    cout << "Joined worker thread." << endl;
    return 0;
}