aws / aws-iot-device-sdk-cpp-v2

Next generation AWS IoT Client SDK for C++ using the AWS Common Runtime
Apache License 2.0
185 stars 110 forks source link

Crash on s_process_write_message #173

Closed anatolyaliev closed 3 years ago

anatolyaliev commented 4 years ago

Hi, we have a crash on macos. happens randomly. sdk version 1.8.0. stack:

Thread 12 Crashed: 0 libsystem_kernel.dylib 0x00007fff203554f2 __pthread_kill + 10 1 libsystem_pthread.dylib 0x00007fff20383661 pthread_kill + 263 2 libsystem_c.dylib 0x00007fff202d6720 abort + 120 3 libsystem_malloc.dylib 0x00007fff201b7530 malloc_vreport + 548 4 libsystem_malloc.dylib 0x00007fff201ba508 malloc_report + 151 5 com.apple.security 0x00007fff224a7f10 SSLRecordServiceWriteQueueInternal + 122 6 com.apple.security 0x00007fff222833d7 SSLWrite + 292 7 fpneocommond 0x000000010cbdd582 s_process_write_message + 82 8 fpneocommond 0x000000010cbb721c s_handler_process_write_message + 268 9 fpneocommond 0x000000010cbd2f15 s_try_write_outgoing_frames + 1461 10 fpneocommond 0x000000010cbd0768 s_move_synced_data_to_thread_task + 376 11 fpneocommond 0x000000010cbaf0cc s_run_all + 348 12 fpneocommond 0x000000010cbd6387 s_event_thread_main + 1863 13 fpneocommond 0x000000010cbad3f8 thread_fn + 88 14 libsystem_pthread.dylib 0x00007fff203839a4 _pthread_start + 224 15 libsystem_pthread.dylib 0x00007fff2037f4c7 thread_start + 15

I have logs from the crash time, will send privately by email... thanks

jmklix commented 4 years ago

Can you please include the following information so we can better help you.

Confirm by changing [ ] to [x] below to ensure that it's a bug:

Known Issue

Describe the bug A clear and concise description of what the bug is.

Platform/OS/Device What are you running the sdk on?

SDK version number Example: v1.5.2

To Reproduce (observed behavior) Steps to reproduce the behavior (please share code)

Expected behavior A clear and concise description of what you expected to happen.

Logs/output If applicable, add logs or error output.

REMEMBER TO SANITIZE YOUR PERSONAL INFO

ApiHandle::InitializeLogging(...)

Additional context Add any other context about the problem here.

anatolyaliev commented 4 years ago

Can you please include the following information so we can better help you.

Confirm by changing [ ] to [x] below to ensure that it's a bug:

X I've searched for previous similar issues and didn't find any solution

Describe the bug Hi, we have a crash on macos. happens randomly. sdk version 1.8.0. stack:

Platform/OS/Device macOS Catalina 10.15.6

SDK version number Example: v1.8.0

To Reproduce (observed behavior) Random crash cannot reproduce Below is the stack dump

Thread 12 Crashed: 0 libsystem_kernel.dylib 0x00007fff203554f2 __pthread_kill + 10 1 libsystem_pthread.dylib 0x00007fff20383661 pthread_kill + 263 2 libsystem_c.dylib 0x00007fff202d6720 abort + 120 3 libsystem_malloc.dylib 0x00007fff201b7530 malloc_vreport + 548 4 libsystem_malloc.dylib 0x00007fff201ba508 malloc_report + 151 5 com.apple.security 0x00007fff224a7f10 SSLRecordServiceWriteQueueInternal + 122 6 com.apple.security 0x00007fff222833d7 SSLWrite + 292 7 fpneocommond 0x000000010cbdd582 s_process_write_message + 82 8 fpneocommond 0x000000010cbb721c s_handler_process_write_message + 268 9 fpneocommond 0x000000010cbd2f15 s_try_write_outgoing_frames + 1461 10 fpneocommond 0x000000010cbd0768 s_move_synced_data_to_thread_task + 376 11 fpneocommond 0x000000010cbaf0cc s_run_all + 348 12 fpneocommond 0x000000010cbd6387 s_event_thread_main + 1863 13 fpneocommond 0x000000010cbad3f8 thread_fn + 88 14 libsystem_pthread.dylib 0x00007fff203839a4 _pthread_start + 224 15 libsystem_pthread.dylib 0x00007fff2037f4c7 thread_start + 15

Expected behavior I expect stable work

Logs/output If applicable, add logs or error output.

REMEMBER TO SANITIZE YOUR PERSONAL INFO Can I send the logs by the email?

anatolyaliev commented 4 years ago

The issue is related to the https://github.com/aws/aws-iot-device-sdk-cpp-v2/issues/156 . It happens in the same project

jmklix commented 4 years ago

Can you email the logs the jkl@amazon.com. Make sure to still redact any personal info and all keys.

anatolyaliev commented 4 years ago

Logs are sent

anatolyaliev commented 3 years ago

Are there any updates with the investigation? There is a similar crash which may provide more info. Here is a extract from the dump:

Thread 11 Crashed: 0 libsystem_kernel.dylib 0x00007fff203884f2 __pthread_kill + 10 1 libsystem_pthread.dylib 0x00007fff203b6661 pthread_kill + 263 2 libsystem_c.dylib 0x00007fff20309720 abort + 120 3 libsystem_malloc.dylib 0x00007fff201ea530 malloc_vreport + 548 4 libsystem_malloc.dylib 0x00007fff201fe742 malloc_zone_error + 183 5 libsystem_malloc.dylib 0x00007fff201e31a5 small_free_list_remove_ptr_no_clear + 1243 6 libsystem_malloc.dylib 0x00007fff201e016b free_small + 749 7 libcoretls.dylib 0x00007fff29dce562 tls_free_buffer_list + 32 8 libcoretls.dylib 0x00007fff29dd2787 tls_handshake_destroy + 577 9 com.apple.security 0x00007fff224d820f SSLContextDestroy + 23 10 com.apple.CoreFoundation 0x00007fff20579a12 _CFRelease + 244 11 fpneocommond 0x000000010226e96f s_destroy + 31 12 fpneocommond 0x0000000102268206 s_final_channel_deletion_task + 86 13 fpneocommond 0x000000010226c212 s_on_client_channel_on_shutdown + 194 14 fpneocommond 0x00000001022400cc s_run_all + 348 15 fpneocommond 0x0000000102267387 s_event_thread_main + 1863 16 fpneocommond 0x000000010223e3f8 thread_fn + 88 17 libsystem_pthread.dylib 0x00007fff203b69a4 _pthread_start + 224 18 libsystem_pthread.dylib 0x00007fff203b24c7 thread_start + 15

anatolyaliev commented 3 years ago

We are experiencing crashes of our product several times a day due to this specific issue. So the code can not go to the production. Is it possible to update with the status of the investigation? Is there more data required from our side? Is there a workaround for the issue? Are there estimations for the fix?

rccarper commented 3 years ago

Based off of where the crash is happening, it looks like the process might be running out of memory. I'm still trying to find additional clues in the log. When these crashes happen, would you happen to have any indicators of high memory usage on the machine or process?

rccarper commented 3 years ago

I apologize, I misread the "malloc_report" part of the callstack as "malloc". That's likely not out of memory, but could be a result of corrupted memory. We'll continue investigating and respond back when we know more.

rccarper commented 3 years ago

Quick update on this: unfortunately, we haven't been able to isolate the issue yet. There are some errors present in the log, but nothing that looks like a smoking-gun. As I mentioned before, this does look like possible memory corruption. Would it be possible to send us a crash dump? It may give us more clues as to what's going on. Minimally, hopefully it points to what, if anything, is getting corrupted.

anatolyaliev commented 3 years ago

The crash dump was sent to jkl@amazon.com

anatolyaliev commented 3 years ago

@rccarper , Have you managed to receive the email with the crash dump?

jmklix commented 3 years ago

Are you still encountering this crash on s_process write_message?

anatolyaliev commented 3 years ago

The memory corruption issue was reproduced in basic-pub-sub sample (minor changes were added) . AWS-IOT-SDK version 1.10.9 Platform MacOS test execution:

jmklix commented 3 years ago

You can email them to me at jkl@amazon.com

anatolyaliev commented 3 years ago

All the relevant data was sent to jkl@amazon.com.

anatolyaliev commented 3 years ago

Is there any progress regarding the issue? Is any addition information required?

anatolyaliev commented 3 years ago

Is there any updates regarding the issue?

jmklix commented 3 years ago

Sorry for the long wait. At least in the example that you gave me you are not waiting for the messages to publish. This can be resolved by adding sleep(1) anywhere if the while loop. Does this remedy the crash in your original program?

anatolyaliev commented 3 years ago

Thank you for the answer. Unfortunately this is not our case. We've implemented throttling mechanism to ensure access to AWS once per 200 ms or lesser. With this implementation we are getting the crash, described above, 2-3 times a day. Could you please continue the investigation and update us on the findings?

jmklix commented 3 years ago

Can you try using a promise to prevent used-after-free error. Do this by adding the following promise: publishFinishedPromise

        while (true)
        {
            std::promise<void> publishFinishedPromise;
            fprintf(
            auto onPublishComplete =
                [payloadPtr, &publishFinishedPromise](Mqtt::MqttConnection &, uint16_t packetId, int errorCode) {
                    aws_byte_buf_clean_up(payloadPtr);
                    else
                    {
                        fprintf(stdout, "Operation failed with error %s\n", aws_error_debug_str(errorCode));
                    }
                    publishFinishedPromise.set_value();
                };
            connection->Publish(topic.c_str(), AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload, onPublishComplete);
            publishFinishedPromise.get_future().wait();
        }
anatolyaliev commented 3 years ago

I will try. The code snippet is not complete. Could you please post it again?

jmklix commented 3 years ago
/**
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */
#include <aws/crt/Api.h>
#include <aws/crt/StlAllocator.h>
#include <aws/crt/auth/Credentials.h>
#include <aws/crt/io/TlsOptions.h>

#include <aws/iot/MqttClient.h>

#include <algorithm>
#include <aws/crt/UUID.h>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <ctime>
#include <unistd.h>

using namespace Aws::Crt;

static void s_printHelp()
{
    fprintf(stdout, "Usage:\n");
    fprintf(
        stdout,
        "basic-pub-sub --endpoint <endpoint> --cert <path to cert>"
        " --key <path to key> --topic <topic> --ca_file <optional: path to custom ca>"
        " --use_websocket --signing_region <region> --proxy_host <host> --proxy_port <port>"
        " --x509 --x509_role_alias <role_alias> --x509_endpoint <endpoint> --x509_thing <thing_name>"
        " --x509_cert <path to cert> --x509_key <path to key> --x509_rootca <path to root ca>\n\n");
    fprintf(stdout, "endpoint: the endpoint of the mqtt server not including a port\n");
    fprintf(
        stdout,
        "cert: path to your client certificate in PEM format. If this is not set you must specify use_websocket\n");
    fprintf(stdout, "key: path to your key in PEM format. If this is not set you must specify use_websocket\n");
    fprintf(stdout, "topic: topic to publish, subscribe to. (optional)\n");
    fprintf(stdout, "client_id: client id to use (optional)\n");
    fprintf(
        stdout,
        "ca_file: Optional, if the mqtt server uses a certificate that's not already"
        " in your trust store, set this.\n");
    fprintf(stdout, "\tIt's the path to a CA file in PEM format\n");
    fprintf(stdout, "use_websocket: if specified, uses a websocket over https (optional)\n");
    fprintf(
        stdout,
        "signing_region: used for websocket signer it should only be specific if websockets are used. (required for "
        "websockets)\n");
    fprintf(stdout, "proxy_host: if you want to use a proxy with websockets, specify the host here (optional).\n");
    fprintf(
        stdout, "proxy_port: defaults to 8080 is proxy_host is set. Set this to any value you'd like (optional).\n");

    fprintf(stdout, "  x509: Use the x509 credentials provider while using websockets (optional)\n");
    fprintf(stdout, "  x509_role_alias: Role alias to use with the x509 credentials provider (required for x509)\n");
    fprintf(stdout, "  x509_endpoint: Endpoint to fetch x509 credentials from (required for x509)\n");
    fprintf(stdout, "  x509_thing: Thing name to fetch x509 credentials on behalf of (required for x509)\n");
    fprintf(
        stdout,
        "  x509_cert: Path to the IoT thing certificate used in fetching x509 credentials (required for x509)\n");
    fprintf(
        stdout,
        "  x509_key: Path to the IoT thing private key used in fetching x509 credentials (required for x509)\n");
    fprintf(
        stdout,
        "  x509_rootca: Path to the root certificate used in fetching x509 credentials (required for x509)\n\n");
}

bool s_cmdOptionExists(char **begin, char **end, const String &option)
{
    return std::find(begin, end, option) != end;
}

char *s_getCmdOption(char **begin, char **end, const String &option)
{
    char **itr = std::find(begin, end, option);
    if (itr != end && ++itr != end)
    {
        return *itr;
    }
    return 0;
}

int main(int argc, char *argv[])
{

    /************************ Setup the Lib ****************************/
    /*
     * Do the global initialization for the API.
     */
    ApiHandle apiHandle;

    String endpoint;
    String certificatePath;
    String keyPath;
    String caFile;
    String topic("test/topic");
    String clientId(String("test-") + Aws::Crt::UUID().ToString());
    String signingRegion;
    String proxyHost;
    uint16_t proxyPort(8080);

    String x509Endpoint;
    String x509ThingName;
    String x509RoleAlias;
    String x509CertificatePath;
    String x509KeyPath;
    String x509RootCAFile;

    bool useWebSocket = false;
    bool useX509 = false;

    /*********************** Parse Arguments ***************************/
    if (!s_cmdOptionExists(argv, argv + argc, "--endpoint"))
    {
        s_printHelp();
        return 1;
    }

    endpoint = s_getCmdOption(argv, argv + argc, "--endpoint");

    if (s_cmdOptionExists(argv, argv + argc, "--key"))
    {
        keyPath = s_getCmdOption(argv, argv + argc, "--key");
    }

    if (s_cmdOptionExists(argv, argv + argc, "--cert"))
    {
        certificatePath = s_getCmdOption(argv, argv + argc, "--cert");
    }

    if (keyPath.empty() != certificatePath.empty())
    {
        fprintf(stdout, "Using mtls (cert and key) requires both the certificate and the private key\n");
        s_printHelp();
        return 1;
    }
    if (s_getCmdOption(argv, argv + argc, "--topic"))
    {
        topic = s_getCmdOption(argv, argv + argc, "--topic");
    }
    if (s_cmdOptionExists(argv, argv + argc, "--ca_file"))
    {
        caFile = s_getCmdOption(argv, argv + argc, "--ca_file");
    }
    if (s_cmdOptionExists(argv, argv + argc, "--client_id"))
    {
        clientId = s_getCmdOption(argv, argv + argc, "--client_id");
    }

    if (s_cmdOptionExists(argv, argv + argc, "--use_websocket"))
    {
        if (!s_cmdOptionExists(argv, argv + argc, "--signing_region"))
        {
            fprintf(stdout, "Websockets require a signing region to be specified.\n");
            s_printHelp();
            return 1;
        }
        useWebSocket = true;
        signingRegion = s_getCmdOption(argv, argv + argc, "--signing_region");

        if (s_cmdOptionExists(argv, argv + argc, "--proxy_host"))
        {
            proxyHost = s_getCmdOption(argv, argv + argc, "--proxy_host");
        }

        if (s_cmdOptionExists(argv, argv + argc, "--proxy_port"))
        {
            proxyPort = static_cast<uint16_t>(atoi(s_getCmdOption(argv, argv + argc, "--proxy_port")));
        }
    }

    bool usingMtls = !certificatePath.empty() && !keyPath.empty();

    /* one or the other, but not both nor neither */
    if (useWebSocket == usingMtls)
    {
        if (useWebSocket && usingMtls)
        {
            fprintf(stdout, "You must use either websockets or mtls for authentication, but not both.\n");
        }
        else
        {
            fprintf(stdout, "You must use either websockets or mtls for authentication.\n");
        }

        s_printHelp();
        return 1;
    }

    // x509 credentials provider configuration
    if (s_cmdOptionExists(argv, argv + argc, "--x509"))
    {
        if (!useWebSocket)
        {
            fprintf(stdout, "X509 credentials sourcing requires websockets to be enabled and configured.\n");
            s_printHelp();
            return 1;
        }

        if (!s_cmdOptionExists(argv, argv + argc, "--x509_role_alias"))
        {
            fprintf(stdout, "X509 credentials sourcing requires an x509 role alias to be specified.\n");
            s_printHelp();
            return 1;
        }
        x509RoleAlias = s_getCmdOption(argv, argv + argc, "--x509_role_alias");

        if (!s_cmdOptionExists(argv, argv + argc, "--x509_endpoint"))
        {
            fprintf(stdout, "X509 credentials sourcing requires an x509 endpoint to be specified.\n");
            s_printHelp();
            return 1;
        }
        x509Endpoint = s_getCmdOption(argv, argv + argc, "--x509_endpoint");

        if (!s_cmdOptionExists(argv, argv + argc, "--x509_thing"))
        {
            fprintf(stdout, "X509 credentials sourcing requires an x509 thing name to be specified.\n");
            s_printHelp();
            return 1;
        }
        x509ThingName = s_getCmdOption(argv, argv + argc, "--x509_thing");

        if (!s_cmdOptionExists(argv, argv + argc, "--x509_cert"))
        {
            fprintf(stdout, "X509 credentials sourcing requires an Iot thing certificate to be specified.\n");
            s_printHelp();
            return 1;
        }
        x509CertificatePath = s_getCmdOption(argv, argv + argc, "--x509_cert");

        if (!s_cmdOptionExists(argv, argv + argc, "--x509_key"))
        {
            fprintf(stdout, "X509 credentials sourcing requires an Iot thing private key to be specified.\n");
            s_printHelp();
            return 1;
        }
        x509KeyPath = s_getCmdOption(argv, argv + argc, "--x509_key");

        if (s_cmdOptionExists(argv, argv + argc, "--x509_rootca"))
        {
            x509RootCAFile = s_getCmdOption(argv, argv + argc, "--x509_rootca");
        }

        useX509 = true;
    }

    /********************** Now Setup an Mqtt Client ******************/
    /*
     * You need an event loop group to process IO events.
     * If you only have a few connections, 1 thread is ideal
     */
    Io::EventLoopGroup eventLoopGroup(1);
    if (!eventLoopGroup)
    {
        fprintf(
            stderr, "Event Loop Group Creation failed with error %s\n", ErrorDebugString(eventLoopGroup.LastError()));
        exit(-1);
    }

    Aws::Crt::Io::DefaultHostResolver defaultHostResolver(eventLoopGroup, 1, 5);
    Io::ClientBootstrap bootstrap(eventLoopGroup, defaultHostResolver);

    if (!bootstrap)
    {
        fprintf(stderr, "ClientBootstrap failed with error %s\n", ErrorDebugString(bootstrap.LastError()));
        exit(-1);
    }

    Aws::Crt::Io::TlsContext x509TlsCtx;
    Aws::Iot::MqttClientConnectionConfigBuilder builder;

    if (!certificatePath.empty() && !keyPath.empty())
    {
        builder = Aws::Iot::MqttClientConnectionConfigBuilder(certificatePath.c_str(), keyPath.c_str());
    }
    else if (useWebSocket)
    {
        std::shared_ptr<Aws::Crt::Auth::ICredentialsProvider> provider = nullptr;

        Aws::Crt::Http::HttpClientConnectionProxyOptions proxyOptions;
        if (!proxyHost.empty())
        {
            proxyOptions.HostName = proxyHost;
            proxyOptions.Port = proxyPort;
            proxyOptions.AuthType = Aws::Crt::Http::AwsHttpProxyAuthenticationType::None;
        }

        if (useX509)
        {
            Aws::Crt::Io::TlsContextOptions tlsCtxOptions =
                Aws::Crt::Io::TlsContextOptions::InitClientWithMtls(x509CertificatePath.c_str(), x509KeyPath.c_str());
            if (!tlsCtxOptions)
            {
                fprintf(
                    stderr,
                    "Unable to initialize tls context options, error: %s!\n",
                    ErrorDebugString(tlsCtxOptions.LastError()));
                return -1;
            }

            if (!x509RootCAFile.empty())
            {
                tlsCtxOptions.OverrideDefaultTrustStore(nullptr, x509RootCAFile.c_str());
            }

            x509TlsCtx = Aws::Crt::Io::TlsContext(tlsCtxOptions, Aws::Crt::Io::TlsMode::CLIENT);
            if (!x509TlsCtx)
            {
                fprintf(
                    stderr,
                    "Unable to create tls context, error: %s!\n",
                    ErrorDebugString(x509TlsCtx.GetInitializationError()));
                return -1;
            }

            Aws::Crt::Auth::CredentialsProviderX509Config x509Config;

            x509Config.TlsOptions = x509TlsCtx.NewConnectionOptions();
            if (!x509Config.TlsOptions)
            {
                fprintf(
                    stderr,
                    "Unable to create tls options from tls context, error: %s!\n",
                    ErrorDebugString(x509Config.TlsOptions.LastError()));
                return -1;
            }

            x509Config.Bootstrap = &bootstrap;
            x509Config.Endpoint = x509Endpoint;
            x509Config.RoleAlias = x509RoleAlias;
            x509Config.ThingName = x509ThingName;

            if (!proxyHost.empty())
            {
                x509Config.ProxyOptions = proxyOptions;
            }

            provider = Aws::Crt::Auth::CredentialsProvider::CreateCredentialsProviderX509(x509Config);
        }
        else
        {
            Aws::Crt::Auth::CredentialsProviderChainDefaultConfig defaultConfig;
            defaultConfig.Bootstrap = &bootstrap;

            provider = Aws::Crt::Auth::CredentialsProvider::CreateCredentialsProviderChainDefault(defaultConfig);
        }

        if (!provider)
        {
            fprintf(stderr, "Failure to create credentials provider!\n");
            return -1;
        }

        Aws::Iot::WebsocketConfig config(signingRegion, provider);
        if (!proxyHost.empty())
        {
            config.ProxyOptions = proxyOptions;
        }

        builder = Aws::Iot::MqttClientConnectionConfigBuilder(config);
    }
    else
    {
        s_printHelp();
    }

    if (!caFile.empty())
    {
        builder.WithCertificateAuthority(caFile.c_str());
    }

    builder.WithEndpoint(endpoint);

    auto clientConfig = builder.Build();

    if (!clientConfig)
    {
        fprintf(
            stderr,
            "Client Configuration initialization failed with error %s\n",
            ErrorDebugString(clientConfig.LastError()));
        exit(-1);
    }

    Aws::Iot::MqttClient mqttClient(bootstrap);
    /*
     * Since no exceptions are used, always check the bool operator
     * when an error could have occurred.
     */
    if (!mqttClient)
    {
        fprintf(stderr, "MQTT Client Creation failed with error %s\n", ErrorDebugString(mqttClient.LastError()));
        exit(-1);
    }

    /*
     * Now create a connection object. Note: This type is move only
     * and its underlying memory is managed by the client.
     */
    auto connection = mqttClient.NewConnection(clientConfig);

    if (!connection)
    {
        fprintf(stderr, "MQTT Connection Creation failed with error %s\n", ErrorDebugString(mqttClient.LastError()));
        exit(-1);
    }

    /*
     * In a real world application you probably don't want to enforce synchronous behavior
     * but this is a sample console application, so we'll just do that with a condition variable.
     */
    std::promise<bool> connectionCompletedPromise;
    std::promise<void> connectionClosedPromise;

    /*
     * This will execute when an mqtt connect has completed or failed.
     */
    auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) {
        if (errorCode)
        {
            fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode));
            connectionCompletedPromise.set_value(false);
        }
        else
        {
            if (returnCode != AWS_MQTT_CONNECT_ACCEPTED)
            {
                fprintf(stdout, "Connection failed with mqtt return code %d\n", (int)returnCode);
                connectionCompletedPromise.set_value(false);
            }
            else
            {
                fprintf(stdout, "Connection completed successfully.");
                connectionCompletedPromise.set_value(true);
            }
        }
    };

    auto onInterrupted = [&](Mqtt::MqttConnection &, int error) {
        fprintf(stdout, "Connection interrupted with error %s\n", ErrorDebugString(error));
    };

    auto onResumed = [&](Mqtt::MqttConnection &, Mqtt::ReturnCode, bool) { fprintf(stdout, "Connection resumed\n"); };

    /*
     * Invoked when a disconnect message has completed.
     */
    auto onDisconnect = [&](Mqtt::MqttConnection &) {
        {
            fprintf(stdout, "Disconnect completed\n");
            connectionClosedPromise.set_value();
        }
    };

    connection->OnConnectionCompleted = std::move(onConnectionCompleted);
    connection->OnDisconnect = std::move(onDisconnect);
    connection->OnConnectionInterrupted = std::move(onInterrupted);
    connection->OnConnectionResumed = std::move(onResumed);

    connection->SetOnMessageHandler([](Mqtt::MqttConnection &,
                                       const String &topic,
                                       const ByteBuf &payload,
                                       bool /*dup*/,
                                       Mqtt::QOS /*qos*/,
                                       bool /*retain*/) {
        fprintf(stdout, "Generic Publish received on topic %s, payload:\n", topic.c_str());
        fwrite(payload.buffer, 1, payload.len, stdout);
        fprintf(stdout, "\n");
    });

    /*
     * Actually perform the connect dance.
     * This will use default ping behavior of 1 hour and 3 second timeouts.
     * If you want different behavior, those arguments go into slots 3 & 4.
     */
    fprintf(stdout, "Connecting...\n");
    if (!connection->Connect(clientId.c_str(), false, 1000))
    {
        fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError()));
        exit(-1);
    }

    if (connectionCompletedPromise.get_future().get())
    {
        /*
         * This is invoked upon the receipt of a Publish on a subscribed topic.
         */
        auto onMessage = [&](Mqtt::MqttConnection &,
                             const String &topic,
                             const ByteBuf &byteBuf,
                             bool /*dup*/,
                             Mqtt::QOS /*qos*/,
                             bool /*retain*/) {
            fprintf(stdout, "Publish received on topic %s\n", topic.c_str());
            fprintf(stdout, "\n Message:\n");
            fwrite(byteBuf.buffer, 1, byteBuf.len, stdout);
            fprintf(stdout, "\n");
        };

        /*
         * Subscribe for incoming publish messages on topic.
         */
        std::promise<void> subscribeFinishedPromise;
        auto onSubAck =
            [&](Mqtt::MqttConnection &, uint16_t packetId, const String &topic, Mqtt::QOS QoS, int errorCode) {
                if (errorCode)
                {
                    fprintf(stderr, "Subscribe failed with error %s\n", aws_error_debug_str(errorCode));
                    exit(-1);
                }
                else
                {
                    if (!packetId || QoS == AWS_MQTT_QOS_FAILURE)
                    {
                        fprintf(stderr, "Subscribe rejected by the broker.");
                        exit(-1);
                    }
                    else
                    {
                        fprintf(stdout, "Subscribe on topic %s on packetId %d Succeeded\n", topic.c_str(), packetId);
                    }
                }
                subscribeFinishedPromise.set_value();
            };

        connection->Subscribe(topic.c_str(), AWS_MQTT_QOS_AT_LEAST_ONCE, onMessage, onSubAck);
        subscribeFinishedPromise.get_future().wait();

        String input;
        int iCount = 0;
        clock_t begin = clock();

        while (true)
        {
            std::promise<void> publishFinishedPromise;
            fprintf(
                stdout,
                "Enter the message you want to publish to topic %s and press enter. Enter 'exit' to exit this "
                "program.\n",
                topic.c_str());
            //std::getline(std::cin, input);

            //if (input == "exit")
            //{
              //  break;
            //}
            iCount++;
            input.clear();
            for (int i = 0; i < 100; ++i) {
                input.append("v");
            }
            fprintf (stdout, "Size to send %lu\n", input.length());
            fprintf (stdout, "Total messages send so far: %i\n", iCount);
            clock_t end = clock();
            double elapsed_secs = double(end - begin) / CLOCKS_PER_SEC;
            fprintf (stdout, "Total time elapsed send so far: %f\n", elapsed_secs);
            //sleep(1);

            ByteBuf payload = ByteBufNewCopy(DefaultAllocator(), (const uint8_t *)input.data(), input.length());
            ByteBuf *payloadPtr = &payload;

            auto onPublishComplete = [payloadPtr, &publishFinishedPromise](Mqtt::MqttConnection &, uint16_t packetId, int errorCode) {
                aws_byte_buf_clean_up(payloadPtr);

                if (packetId)
                {
                    fprintf(stdout, "Operation on packetId %d Succeeded\n", packetId);
                }
                else
                {
                    fprintf(stdout, "Operation failed with error %s\n", aws_error_debug_str(errorCode));
                }
                publishFinishedPromise.set_value();
            };
            connection->Publish(topic.c_str(), AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload, onPublishComplete);
            publishFinishedPromise.get_future().wait();
        }

        /*
         * Unsubscribe from the topic.
         */
        std::promise<void> unsubscribeFinishedPromise;
        connection->Unsubscribe(
            topic.c_str(), [&](Mqtt::MqttConnection &, uint16_t, int) { unsubscribeFinishedPromise.set_value(); });
        unsubscribeFinishedPromise.get_future().wait();
    }

    /* Disconnect */
    if (connection->Disconnect())
    {
        connectionClosedPromise.get_future().wait();
    }
    return 0;
}
anatolyaliev commented 3 years ago

Thank you. Will try and update you with the results

TingDaoK commented 3 years ago

https://github.com/aws/aws-iot-device-sdk-cpp-v2/tree/v0.12.1 We just published a new version, that our library will take care of the payload for you. So, you don't need to wait for the previous publish to finish with the latest version. Closing the issue, feel free to reopen if anything pops up

github-actions[bot] commented 3 years ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.