Azure / azure-iot-sdk-c

A C99 SDK for connecting devices to Microsoft Azure IoT services
https://azure.github.io/azure-iot-sdk-c
Other
588 stars 739 forks source link

Flooding messages causes a crash (LTS_07_2022, any OS, MQTT protocole, Low Level API) #2449

Closed BillyTheFrog closed 1 year ago

BillyTheFrog commented 1 year ago

Development Machine, OS, Compiler (and Other Relevant Toolchain Info) Bug reproduced on arch linux, debian, ubuntu, yocto, under both x86 and arm architectures.

SDK Version (Please Give Commit SHA if Manually Compiling) LTS_07_2022_Ref02

Protocol MQTT

Describe the Bug After a while of intensive sending (count between 1 and 2 mins), a crash occurs. It appears to be a double linked list issue, accessing a recently freed area. The crash tracks back to IoTHubDeviceClient_LL_DoWork. I also saw some errors in a thread sanitizer. To monitor this issue I used valgrind and google's sanitizers (address & thread). See attached logs below for details.

Here is a minimal example that managed to reproduce the issue for me. I checked the documentation and hopefully I didn't misuse the SDK.

Please replace '[ YOUR CONNECTION STRING GOES HERE ]' by your own connection string. Once compiled, running the program should reproduce the issue.

To call 'IoTHubDeviceClient_LL_DoWork' less frequently, an argument can be given (arg. #1 of the compiled binary) to call the function every X messages passed to the SDK (putting 5 as first argument will pass 5 messages to the SDK before calling the function). By default the function is called after every message passed.

The second and third arguments of the binary are also optional, they allow you to specify the key/value pair in the message sent. By default beeing "Alive" for the key (arg. #2) and "[$count]" for the value (arg. #3) the message will contain an increasing value (to keep track, as a message ID).

#if !defined(unix) || !defined(__unix) || !defined(__unix__)
#warning Please compile under any unix-like OS ; e.g. Linux
#endif

#include <semaphore.h>
#include <sys/time.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>

#include "iothub_device_client_ll.h"
#include "iothub_client_options.h"
#include "iothubtransportmqtt.h"
#include "iothub.h"

#define CONNSTR "[ YOUR CONNECTION STRING GOES HERE ]"

#define COLOURED
#ifdef COLOURED
#define RED "\033[1;31m"
#define GREEN "\033[1;32m"
#define YELLOW "\033[1;33m"
#define BLUE "\033[1;34m"
#define MAGENTA "\033[1;35m"
#define CYAN "\033[1;36m"
#define WHITE "\033[1;37m"
#define STANDARD "\033[0;37m"
#else
#define RED ""
#define GREEN ""
#define YELLOW ""
#define BLUE ""
#define MAGENTA ""
#define CYAN ""
#define WHITE ""
#define STANDARD ""
#endif

////////////////////////////////////////////////////////////////////////////////
// Structs

enum StatsTables {
    E_ST_CONNECTED,
    E_ST_DISCONNECTED,
    E_ST_SENT = 0,
    E_ST_TIMEOUT,
    E_ST_DESTROYED,
    E_ST_DISCARDED
};

typedef struct {
    uint16_t _connected[2];
    uint16_t _sent[4];
} Stats;

typedef struct {
    Stats _stats;
    char const *_connStr;
    char *_msg;
    uint16_t _count;
    IOTHUB_DEVICE_CLIENT_LL_HANDLE _deviceHandle;
    sem_t _semaphore;
    pthread_t _sender;
    struct sigaction _handlerStruct;
} IoTHubLink;

typedef struct {
    IoTHubLink *_link;
    char *_msg;
} MessageCbData;

////////////////////////////////////////////////////////////////////////////////
// Tools

#define FPRINT(F, ...) do {if (0 >= fprintf(F, __VA_ARGS__)) {abort();}} while (0);
#define PRINT(...) FPRINT(stdout, __VA_ARGS__)
#define LOG(V, W, X, Y, Z, ...) do {char *const __LOG__ts = GetTimeStamp(), *const __LOG__str = V ? GetStrError() : NULL; FPRINT(Y, "[%s%s%s] - %s"W"%s : "X"%s%s%s\n", WHITE, __LOG__ts ? __LOG__ts : "YYYY-mm-ddTHH:MM:SS.uuuZ", STANDARD, Z, STANDARD, __VA_ARGS__, (V ? " (" : ""), (V ? __LOG__str ? __LOG__str : "strerror failed" : ""), (V ? ")" : "")); free(__LOG__ts); free(__LOG__str);} while (0)
#define INFO(X, ...) LOG(0, "INFO", X, stdout, BLUE, __VA_ARGS__)
#define STATUS(X, ...) LOG(0, "STATUS", X, stdout, GREEN, __VA_ARGS__)
#define WARNING(X, ...) LOG(0, "WARNING", X, stdout, YELLOW, __VA_ARGS__)
#define ERROR(W, X, ...) LOG(W, "ERROR", X" (%s:%d)", stderr, RED, __VA_ARGS__, __FILE__, __LINE__)
#define CRITICAL(W, X, ...) LOG(W, "CRITICAL", X" (%s:%d)", stderr, MAGENTA, __VA_ARGS__, __FILE__, __LINE__)

static char *GetStrError(void) {
    char *const buf = malloc(1024);
    if (!buf) {
        FPRINT(stderr, "[%s%s%s] - %sCRITICAL%s : %s (%s:%d)\n", WHITE, "YYYY-mm-ddTHH:MM:SS.uuuZ", STANDARD, MAGENTA, STANDARD, "malloc failed", __FILE__, __LINE__);
        return (NULL);
    }
    if (!strerror_r(errno, buf, 1024)) {
        if (!strerror_r(errno, buf, 1024)) {
            FPRINT(stderr, "[%s%s%s] - %sERROR%s : %s (%s:%d)\n", WHITE, "YYYY-mm-ddTHH:MM:SS.uuuZ", STANDARD, RED, STANDARD, "strerror_r failed", __FILE__, __LINE__);
        } else {
            FPRINT(stderr, "[%s%s%s] - %sERROR%s : %s (%s:%d) (%s)\n", WHITE, "YYYY-mm-ddTHH:MM:SS.uuuZ", STANDARD, RED, STANDARD, "strerror_r failed", __FILE__, __LINE__, buf);
        }
        free(buf);
        return (NULL);
    }
    return (buf);
}

static char *GetTimeStamp(void) {
    char *const ts = strdup("YYYY-mm-ddTHH:MM:SS.uuuZ__"), *str;
    if (!ts) {
        str = GetStrError();
        FPRINT(stderr, "[%s%s%s] - %sERROR%s : %s (%s:%d) (%s)\n", WHITE, "YYYY-mm-ddTHH:MM:SS.uuuZ", STANDARD, RED, STANDARD, "strdup failed ; defaulting to NULL timestamp", __FILE__, __LINE__, str ? str : "strerror failed");
        free(str);
        return (NULL);
    }
    struct timeval tv;
    if (gettimeofday(&tv, NULL)) {
        FPRINT(stderr, "[%s%s%s] - %sERROR%s : %s (%s:%d)\n", WHITE, ts, STANDARD, RED, STANDARD, "gettimeofday failed ; defaulting to undefined timestamp", __FILE__, __LINE__);
        24[ts] = 0;
        return (ts);
    }
    struct tm local;
    localtime_r(&tv.tv_sec, &local);
    if (!strftime(ts, 26, "%Y-%m-%dT%H:%M:%S.", &local)) {
        str = GetStrError();
        FPRINT(stderr, "[%s%s%s] - %sERROR%s : %s (%s:%d) (%s)\n", WHITE, ts, STANDARD, RED, STANDARD, "strftime failed ; defaulting to undefined timestamp", __FILE__, __LINE__, str ? str : "strerror failed");
        24[ts] = 0;
        free(str);
        return (ts);
    }
    if (0 >= sprintf(ts + 20, "%3.3d", tv.tv_usec)) {
        str = GetStrError();
        FPRINT(stderr, "[%s%s%s] - %sERROR%s : %s (%s:%d) (%s)\n", WHITE, ts, STANDARD, RED, STANDARD, "sprintf failed ; defaulting to undefined timestamp", __FILE__, __LINE__, str ? str : "strerror failed");
        24[ts] = 0;
        free(str);
        return (ts);
    }
    23[ts] = 'Z';
    24[ts] = 0;
    return (ts);
}

static void PrintStats(IoTHubLink *link) {
    PRINT("___----=== STATISTICS ===---___\n");
    PRINT("\n [[[ MESSAGES ]]]\n");
    PRINT("%sSent%s        ~ %s%d%s\n", WHITE, STANDARD, GREEN, E_ST_SENT[link->_stats._sent], STANDARD);
    PRINT("%sTimed out%s   ~ %s%d%s\n", WHITE, STANDARD, YELLOW, E_ST_TIMEOUT[link->_stats._sent], STANDARD);
    PRINT("%sDestroyed%s   ~ %s%d%s\n", WHITE, STANDARD, RED, E_ST_DESTROYED[link->_stats._sent], STANDARD);
    PRINT("%sDiscarded%s   ~ %s%d%s\n", WHITE, STANDARD, MAGENTA, E_ST_DISCARDED[link->_stats._sent], STANDARD);
    PRINT("\n [[[ CONNECTIONS ]]]\n");
    PRINT("%sConnected%s   ~ %s%d%s\n", WHITE, STANDARD, GREEN, E_ST_CONNECTED[link->_stats._connected], STANDARD);
    PRINT("%sDisonnected%s ~ %s%d%s\n", WHITE, STANDARD, RED, E_ST_DISCONNECTED[link->_stats._connected], STANDARD);
}

////////////////////////////////////////////////////////////////////////////////
// Signal handling in thread

volatile uint8_t __sigintHandlerStatus__ = 0;

static void SigintHandler(int const received) {
    if (SIGINT == received)
        __sigintHandlerStatus__ = 1;
    else
        abort();
}

static void *SigintWaiter(void *vLink) {
    IoTHubLink *link = vLink;
    sigset_t unblockSignals;
    int lockStatus = 0;
    if (sigemptyset(&unblockSignals)) {
        ERROR(1, "%s", "sigemptyset failed");
        if (sem_post(&link->_semaphore))
            ERROR(1, "%s", "sem_post failed");
        pthread_exit(NULL);
    }
    if (sigaddset(&unblockSignals, SIGINT)) {
        ERROR(1, "%s", "sigaddset failed");
        if (sem_post(&link->_semaphore))
            ERROR(1, "%s", "sem_post failed");
        pthread_exit(NULL);
    }
    if (sigprocmask(SIG_UNBLOCK, &unblockSignals, NULL)) {
        ERROR(1, "%s", "sigprocmask failed");
        if (sem_post(&link->_semaphore))
            ERROR(1, "%s", "sem_post failed");
        pthread_exit(NULL);
    }
    if (pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL)) {
        ERROR(0, "%s", "pthread_setcancelstate failed");
        if (sem_post(&link->_semaphore))
            ERROR(1, "%s", "sem_post failed");
        pthread_exit(NULL);
    }
    while (!__sigintHandlerStatus__ && ((lockStatus = sem_trywait(&link->_semaphore)) && EAGAIN == errno))
        usleep(10000);
    INFO("%s", "SIGINT signal received ; stopping");
    if (__sigintHandlerStatus__ && sem_post(&link->_semaphore)) {
        ERROR(1, "%s", "sem_post failed");
        pthread_exit(NULL);
    }
    pthread_exit(NULL);
}

////////////////////////////////////////////////////////////////////////////////
// IoTHub client callbacks

static void IoTHubConnStatusCb(IOTHUB_CLIENT_CONNECTION_STATUS const status, IOTHUB_CLIENT_CONNECTION_STATUS_REASON const reason, void *vLink) {
    IoTHubLink *link = vLink;
    char const *reasonStr = NULL;
    uint8_t level = 0;
    switch (reason) {
    case IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN:
        reasonStr = "SAS token expired";
        level = 2;
        break;
    case IOTHUB_CLIENT_CONNECTION_DEVICE_DISABLED:
        reasonStr = "device disabled";
        level = 2;
        break;
    case IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL:
        reasonStr = "bad credential";
        level = 2;
        break;
    case IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED:
        reasonStr = "retry expired";
        level = 1;
        break;
    case IOTHUB_CLIENT_CONNECTION_NO_NETWORK:
        reasonStr = "no network";
        level = 1;
        break;
    case IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR:
        reasonStr = "communication error";
        level = 2;
        break;
    case IOTHUB_CLIENT_CONNECTION_OK:
        reasonStr = "everything is fine";
        break;
    case IOTHUB_CLIENT_CONNECTION_NO_PING_RESPONSE:
        reasonStr = "no answer to ping";
        level = 1;
        break;
    case IOTHUB_CLIENT_CONNECTION_QUOTA_EXCEEDED:
        reasonStr = "quota exceeded";
        level = 2;
        break;
    default:
        CRITICAL(0, "%s", "IOTHUB_CLIENT_CONNECTION_STATUS_REASON error (unexpected value) ; aborting");
        abort();
    }
    switch (status) {
    case IOTHUB_CLIENT_CONNECTION_AUTHENTICATED:
        ++E_ST_CONNECTED[link->_stats._connected];
        if (!level)
            STATUS("IoTHub client authenticated successfully (%s)", reasonStr);
        else
            ERROR(0, "IoTHub client authenticated successfully but status does not match (%s)", reasonStr);
        break;
    case IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED:
        ++E_ST_DISCONNECTED[link->_stats._connected];
        if (!level)
            STATUS("IoTHub client disconnected (%s)", reasonStr);
        else if (1 == level)
            WARNING("IoTHub client disconnected (%s)", reasonStr);
        else
            ERROR(0, "IoTHub client disconnected (%s)", reasonStr);
        break;
    default:
        CRITICAL(0, "%s", "IOTHUB_CLIENT_CONNECTION_STATUS error (unexpected value) ; aborting");
        abort();
    }
}

static void IoTHubMsgStatusCb(IOTHUB_CLIENT_CONFIRMATION_RESULT const status, void *vData) {
    MessageCbData *data = (MessageCbData *)vData;
    switch (status) {
    case IOTHUB_CLIENT_CONFIRMATION_OK:
        STATUS("Sent message \"%s\" (everything is fine)", data->_msg);
        ++E_ST_SENT[data->_link->_stats._sent];
        break;
    case IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY:
        WARNING("Discarded message \"%s\" (destroyed)", data->_msg);
        ++E_ST_DESTROYED[data->_link->_stats._sent];
        break;
    case IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT:
        WARNING("Discarded message \"%s\" (timed out)", data->_msg);
        ++E_ST_TIMEOUT[data->_link->_stats._sent];
        break;
    case IOTHUB_CLIENT_CONFIRMATION_ERROR:
        ERROR(0, "Discarded message \"%s\" (error)", data->_msg);
        ++E_ST_DISCARDED[data->_link->_stats._sent];
        break;
    default:
        CRITICAL(0, "%s", "IOTHUB_CLIENT_CONFIRMATION_RESULT error (unexpected value) ; aborting");
        abort();
    }
    free(data->_msg);
    free(data);
}

////////////////////////////////////////////////////////////////////////////////
// Send process

static int Send(IoTHubLink *link) {
    IOTHUB_MESSAGE_HANDLE messageHandle;
    char *index, *ts, *msg = strdup(link->_msg);;
    if (!msg) {
        ERROR(1, "%s", "strdup failed");
        return (1);
    }
    if (link->_count) {
        if (!(index = strstr(msg, ":"))) {
            ERROR(0, "%s", "strstr failed");
            free(msg);
            return (1);
        }
        if (0 >= sprintf(index + 1, "%d}", link->_count++)) {
            ERROR(1, "%s", "sprintf failed");
            free(msg);
            return (1);
        }
    }
    if (!(messageHandle = IoTHubMessage_CreateFromString(msg))) {
        ERROR(0, "%s", "IoTHubMessage_CreateFromString failed");
        free(msg);
        IoTHubDeviceClient_LL_DoWork(link->_deviceHandle);
        return (1);
    }
    if (IOTHUB_CLIENT_OK != IoTHubMessage_SetContentTypeSystemProperty(messageHandle, "application%2fjson")) {
        ERROR(0, "%s", "IoTHubMessage_SetContentTypeSystemProperty failed");
        IoTHubMessage_Destroy(messageHandle);
        free(msg);
        IoTHubDeviceClient_LL_DoWork(link->_deviceHandle);
        return (1);
    }
    if (IOTHUB_CLIENT_OK != IoTHubMessage_SetContentEncodingSystemProperty(messageHandle, "utf-8")) {
        ERROR(0, "%s", "IoTHubMessage_SetContentEncodingSystemProperty failed");
        IoTHubMessage_Destroy(messageHandle);
        free(msg);
        IoTHubDeviceClient_LL_DoWork(link->_deviceHandle);
        return (1);
    }
    ts = GetTimeStamp();
    if (IOTHUB_CLIENT_OK != IoTHubMessage_SetMessageCreationTimeUtcSystemProperty(messageHandle, ts ? ts : "YYYY-mm-ddTHH:MM:SS.uuuZ")) {
        ERROR(0, "%s", "IoTHubMessage_SetMessageCreationTimeUtcSystemProperty failed");
        IoTHubMessage_Destroy(messageHandle);
        free(ts);
        free(msg);
        IoTHubDeviceClient_LL_DoWork(link->_deviceHandle);
        return (1);
    }
    MessageCbData *cbData = malloc(sizeof(MessageCbData));
    if (!cbData) {
        ERROR(1, "%s", "malloc failed");
        IoTHubMessage_Destroy(messageHandle);
        free(ts);
        free(msg);
        IoTHubDeviceClient_LL_DoWork(link->_deviceHandle);
        return (1);
    }
    cbData->_link = link;
    cbData->_msg = msg;
    if (IOTHUB_CLIENT_OK != IoTHubDeviceClient_LL_SendEventAsync(link->_deviceHandle, messageHandle, &IoTHubMsgStatusCb, cbData)) {
        ERROR(0, "%s", "IoTHubDeviceClient_LL_SendEventAsync failed");
        IoTHubMessage_Destroy(messageHandle);
        free(ts);
        free(msg);
        IoTHubDeviceClient_LL_DoWork(link->_deviceHandle);
        return (1);
    }
    IoTHubMessage_Destroy(messageHandle);
    free(ts);
    return (0);
}

static int SendLoop(IoTHubLink *link, long int const periodicSending) {
    int lockStatus, ret = 0;
    uint32_t count = 0;
    while (!ret && ((lockStatus = sem_trywait(&link->_semaphore)) && EAGAIN == errno)) {
        usleep(1000000);
        ret = Send(link);
        if (!periodicSending || !(count++ % periodicSending))
            IoTHubDeviceClient_LL_DoWork(link->_deviceHandle);
    }
    if (!ret && lockStatus) {
        ERROR(1, "%s", "sem_trywait failed");
        return (1);
    }
    if (ret && sem_post(&link->_semaphore)) {
        ERROR(1, "%s", "sem_post failed");
        return (1);
    }
    return (ret);
}

////////////////////////////////////////////////////////////////////////////////
// IoTHubLink constructor and destructor

static uint8_t IoTHubLinkConstructSettings(void) {
    tzset();
    sigset_t blockSignals;
    if (sigemptyset(&blockSignals)) {
        ERROR(1, "%s", "sigemptyset failed");
        return (1);
    }
    if (sigaddset(&blockSignals, SIGINT)) {
        ERROR(1, "%s", "sigaddset failed");
        return (1);
    }
    if (pthread_sigmask(SIG_BLOCK, &blockSignals, NULL)) {
        ERROR(1, "%s", "pthread_sigmask failed");
        return (1);
        }
    if (IoTHub_Init()) {
        ERROR(0, "%s", "IoTHub_Init failed");
        return (1);
    }
    return (0);
}

static IoTHubLink *IoTHubLinkConstructStats(IoTHubLink *const link) {
    for (uint8_t i = 0; 2 > i; ++i)
        i[link->_stats._connected] = 0;
    for (uint8_t i = 0; 4 > i; ++i)
        i[link->_stats._sent] = 0;
    return (link);
}

static IoTHubLink *IoTHubLinkConstructStrings(char const *const key, char const *const value, IoTHubLink *const link) {
    link->_connStr = CONNSTR;
    if (!(link->_msg = malloc(strlen(key) + strlen(value) + 8))) {
        ERROR(1, "%s", "malloc failed");
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    strcat(strcat(strcat(strcat(strcpy(link->_msg, "{\""), key), "\":\""), value), "\"}");
    link->_count = strcmp(value, "[$count]") ? 0 : 1;
    return (link);
}

static IoTHubLink *IoTHubLinkConstructIoTClient(IoTHubLink *const link) {
    bool urlEncode = true;
    if (!(link->_deviceHandle = IoTHubDeviceClient_LL_CreateFromConnectionString(link->_connStr, MQTT_Protocol))) {
        ERROR(0, "%s", "IoTHubDeviceClient_LL_CreateFromConnectionString failed");
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    if (IOTHUB_CLIENT_OK != IoTHubDeviceClient_LL_SetOption(link->_deviceHandle, OPTION_AUTO_URL_ENCODE_DECODE, &urlEncode)) {
        ERROR(0, "%s", "IoTHubDeviceClient_LL_SetOption failed");
        IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    if (IOTHUB_CLIENT_OK != IoTHubDeviceClient_LL_SetConnectionStatusCallback(link->_deviceHandle, &IoTHubConnStatusCb, link)) {
        ERROR(0, "%s", "IoTHubDeviceClient_LL_SetConnectionStatusCallback failed");
        IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    return (link);
}

static IoTHubLink *IoTHubLinkConstructSemaphore(IoTHubLink *const link) {
    if (sem_init(&link->_semaphore, 0, 0)) {
        ERROR(1, "%s", "sem_init failed");
        IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    return (link);
}

static IoTHubLink *IoTHubLinkConstructThread(IoTHubLink *const link) {
    if (pthread_create(&link->_sender, NULL, &SigintWaiter, link)) {
        ERROR(1, "%s", "pthread_create failed");
        if (sem_destroy(&link->_semaphore))
            ERROR(1, "%s", "sem_destroy failed");
        IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    return (link);
}

static IoTHubLink *IoTHubLinkConstructSigCatcher(IoTHubLink *const link) {
    link->_handlerStruct.sa_handler = &SigintHandler;
    if (sigemptyset(&link->_handlerStruct.sa_mask)) {
        ERROR(1, "%s", "sigemptyset failed");
        if (sem_post(&link->_semaphore))
            ERROR(1, "%s", "sem_post failed");
        if (pthread_join(link->_sender, NULL))
            ERROR(0, "%s", "pthread_join failed");
        if (sem_destroy(&link->_semaphore))
            ERROR(1, "%s", "sem_destroy failed");
        IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    link->_handlerStruct.sa_flags = SA_RESTART;
    if (sigaction(SIGINT, &link->_handlerStruct, NULL)) {
        ERROR(1, "%s", "sigaction failed");
        if (sem_post(&link->_semaphore))
            ERROR(1, "%s", "sem_post failed");
        if (pthread_join(link->_sender, NULL))
            ERROR(0, "%s", "pthread_join failed");
        if (sem_destroy(&link->_semaphore))
            ERROR(1, "%s", "sem_destroy failed");
        IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    return (link);
}

static IoTHubLink *IoTHubLinkConstruct(char const *const key, char const *const value) {
    IoTHubLink *link;
    if (IoTHubLinkConstructSettings())
        return (NULL);
    if (!(link = malloc(sizeof(IoTHubLink)))) {
        ERROR(1, "%s", "malloc failed");
        IoTHub_Deinit();
        return (NULL);
    }
    if (!IoTHubLinkConstructStats(link) || !IoTHubLinkConstructStrings(key, value, link) || !IoTHubLinkConstructIoTClient(link) || !IoTHubLinkConstructSemaphore(link) || !IoTHubLinkConstructThread(link) || !IoTHubLinkConstructSigCatcher(link))
        return (NULL);
    return (link);
}

static void IoTHubLinkDestruct(IoTHubLink *link) {
    if (pthread_join(link->_sender, NULL))
        ERROR(0, "%s", "pthread_join failed");
    if (sem_destroy(&link->_semaphore))
        ERROR(1, "%s", "sem_destroy failed");
    IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
    free(link->_msg);
    PrintStats(link);
    free(link);
    IoTHub_Deinit();
}

////////////////////////////////////////////////////////////////////////////////
// Main

int main(int const ac, char const *const *const av) {
    int ret = 0;
    IoTHubLink *link = IoTHubLinkConstruct((2 < ac ? 2[av] : "Alive"), 3 < ac ? 3[av] : "[$count]");
    if (!link) {
        ERROR(0, "%s", "IoTHubLinkConstruct failed ; exiting");
        return (1);
    }
    ret = SendLoop(link, atol(1 < ac ? 1[av] : "0"));
    IoTHubLinkDestruct(link);
    return (ret);
}

////////////////////////////////////////////////////////////////////////////////

Console Logs Thread sanitizer log: image

Memory sanitizer log: image

ericwolz commented 1 year ago

The LL APIs are not thread safe. I would avoid using multi threading when calling these APIs. Consider using the non LL APIs are these are thread safe and implement locking in the SDK.

https://github.com/Azure/azure-iot-sdk-c/blob/c894993e8cb4fc5171f54b0ed4ab9122824bd6dd/doc/threading_notes.md

Also, for some reason you are calling IoTHubDeviceClient_LL_DoWork in multiple locations. This is very unusual and should not be required.

The expected pseudo implementation for LL apis is as follows:

main()
{
  while()
  {
    run_application code();
    send_any_iot_message();
    IoTHubDeviceClient_LL_DoWork();
    sleep(100ms);
  }
}

Samples are located here: https://github.com/Azure/azure-iot-sdk-c/tree/main/iothub_client/samples

BillyTheFrog commented 1 year ago

I read about these advices, either in the documentation or the iothub's examples.

To address the first point made, my use of thread is ONLY regarding signal handling, thus does not have any related impact to anything happening in the iothub's C sdk (no functions from the SDK will ever be called from that thread, nor shared resources will be accessed).

Regarding the several calls of the DoWork function, it is in a healthy way only called once per loop, as advised in your pseudo implementation. However in cases of errors before returning I might trigger forced calls in order to attempt sending all messages.

This code can be broken down to your pseudo implementation and then the problem subsists for me.

Flooding messages seems to -after some time- trigger an ill heap-use-after-free.

BillyTheFrog commented 1 year ago

Any news ?

ericwolz commented 1 year ago

As stated above, this is probably a reentry issue calling non-locking APIs from another thread. If possible, please provide a C99 non-multithreaded sample that repos this issue.

As always, we welcome external contributions to this open source repo!

BillyTheFrog commented 1 year ago
#if !defined(unix) || !defined(__unix) || !defined(__unix__)
#warning Please compile under any unix-like OS ; e.g. Linux
#endif

#include <sys/time.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>

#include "iothub_device_client_ll.h"
#include "iothub_client_options.h"
#include "iothubtransportmqtt.h"
#include "iothub.h"

#define CONNSTR "[ YOUR CONNECTION STRING GOES HERE ]"

typedef struct {
    char const *_connStr;
    char *_msg;
    uint16_t _count;
    IOTHUB_DEVICE_CLIENT_LL_HANDLE _deviceHandle;
} IoTHubLink;

typedef struct {
    IoTHubLink *_link;
    char *_msg;
} MessageCbData;

static char *GetTimeStamp(void) {
    char *const ts = strdup("YYYY-mm-ddTHH:MM:SS.uuuZ__"), *str;
    if (!ts)
        return (NULL);
    struct timeval tv;
    struct tm *local;
    time_t tm = time(NULL);
    local = localtime(&tm);
    if (!strftime(ts, 26, "%Y-%m-%dT%H:%M:%S.", local) || gettimeofday(&tv, NULL) || 0 >= sprintf(ts + 20, "%3.3d", tv.tv_usec)) {
        24[ts] = 0;
        return (ts);
    }
    23[ts] = 'Z';
    24[ts] = 0;
    return (ts);
}

static void IoTHubConnStatusCb(IOTHUB_CLIENT_CONNECTION_STATUS const status, IOTHUB_CLIENT_CONNECTION_STATUS_REASON const reason, void *vLink) {
    IoTHubLink *link = vLink;
    char const *reasonStr = NULL;
    uint8_t ok = 0;
    switch (reason) {
    case IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN:
        reasonStr = "SAS token expired";
        break;
    case IOTHUB_CLIENT_CONNECTION_DEVICE_DISABLED:
        reasonStr = "device disabled";
        break;
    case IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL:
        reasonStr = "bad credential";
        break;
    case IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED:
        reasonStr = "retry expired";
        break;
    case IOTHUB_CLIENT_CONNECTION_NO_NETWORK:
        reasonStr = "no network";
        break;
    case IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR:
        reasonStr = "communication error";
        break;
    case IOTHUB_CLIENT_CONNECTION_OK:
        reasonStr = "everything is fine";
        ok = 1;
        break;
    case IOTHUB_CLIENT_CONNECTION_NO_PING_RESPONSE:
        reasonStr = "no answer to ping";
        break;
    case IOTHUB_CLIENT_CONNECTION_QUOTA_EXCEEDED:
        reasonStr = "quota exceeded";
        break;
    default:
        abort();
    }
    switch (status) {
    case IOTHUB_CLIENT_CONNECTION_AUTHENTICATED:
        if (ok)
            printf("IoTHub client authenticated successfully (%s)\n", reasonStr);
        else
            fprintf(stderr, "IoTHub client authenticated successfully but status does not match (%s)\n", reasonStr);
        break;
    case IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED:
        fprintf(stderr, "IoTHub client disconnected (%s)\n", reasonStr);
        break;
    default:
        abort();
    }
}

static void IoTHubMsgStatusCb(IOTHUB_CLIENT_CONFIRMATION_RESULT const status, void *vData) {
    MessageCbData *data = (MessageCbData *)vData;
    switch (status) {
    case IOTHUB_CLIENT_CONFIRMATION_OK:
        printf("Sent message \"%s\" (everything is fine)\n", data->_msg);
        break;
    case IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY:
        fprintf(stderr, "Discarded message \"%s\" (destroyed)\n", data->_msg);
        break;
    case IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT:
        fprintf(stderr, "Discarded message \"%s\" (timed out)\n", data->_msg);
        break;
    case IOTHUB_CLIENT_CONFIRMATION_ERROR:
        fprintf(stderr, "Discarded message \"%s\" (error)\n", data->_msg);
        break;
    default:
        abort();
    }
    free(data->_msg);
    free(data);
}

static int Send(IoTHubLink *link) {
    IOTHUB_MESSAGE_HANDLE messageHandle;
    char *index, *ts, *msg = strdup(link->_msg);;
    MessageCbData *cbData;
    if (!msg)
        return (1);
    if (link->_count && (!(index = strstr(msg, ":")) || 0 >= sprintf(index + 1, "%d}", link->_count++))) {
        free(msg);
        return (1);
    }
    if (!(messageHandle = IoTHubMessage_CreateFromString(msg))) {
        free(msg);
        return (1);
    }
    if (IOTHUB_CLIENT_OK != IoTHubMessage_SetContentTypeSystemProperty(messageHandle, "application%2fjson") || IOTHUB_CLIENT_OK != IoTHubMessage_SetContentEncodingSystemProperty(messageHandle, "utf-8")) {
        IoTHubMessage_Destroy(messageHandle);
        free(msg);
        return (1);
    }
    ts = GetTimeStamp();
    if (IOTHUB_CLIENT_OK != IoTHubMessage_SetMessageCreationTimeUtcSystemProperty(messageHandle, ts ? ts : "YYYY-mm-ddTHH:MM:SS.uuuZ") || !(cbData = malloc(sizeof(MessageCbData)))) {
        IoTHubMessage_Destroy(messageHandle);
        free(ts);
        free(msg);
        return (1);
    }
    cbData->_link = link;
    cbData->_msg = msg;
    if (IOTHUB_CLIENT_OK != IoTHubDeviceClient_LL_SendEventAsync(link->_deviceHandle, messageHandle, &IoTHubMsgStatusCb, cbData)) {
        IoTHubMessage_Destroy(messageHandle);
        free(ts);
        free(msg);
        return (1);
    }
    IoTHubMessage_Destroy(messageHandle);
    free(ts);
    return (0);
}

static int SendLoop(IoTHubLink *link, long int const periodicSending) {
    int ret = 0;
    uint32_t count = 0;
    while (!ret) {
        ret = Send(link);
        if (!periodicSending || !(count++ % periodicSending))
            IoTHubDeviceClient_LL_DoWork(link->_deviceHandle);
    }
    return (ret);
}

static IoTHubLink *IoTHubLinkConstructStrings(char const *const key, char const *const value, IoTHubLink *const link) {
    link->_connStr = CONNSTR;
    if (!(link->_msg = malloc(strlen(key) + strlen(value) + 8))) {
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    strcat(strcat(strcat(strcat(strcpy(link->_msg, "{\""), key), "\":\""), value), "\"}");
    link->_count = strcmp(value, "[$count]") ? 0 : 1;
    return (link);
}

static IoTHubLink *IoTHubLinkConstructIoTClient(IoTHubLink *const link) {
    bool urlEncode = true;
    if (!(link->_deviceHandle = IoTHubDeviceClient_LL_CreateFromConnectionString(link->_connStr, MQTT_Protocol))) {
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    if (IOTHUB_CLIENT_OK != IoTHubDeviceClient_LL_SetOption(link->_deviceHandle, OPTION_AUTO_URL_ENCODE_DECODE, &urlEncode)) {
        IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    if (IOTHUB_CLIENT_OK != IoTHubDeviceClient_LL_SetConnectionStatusCallback(link->_deviceHandle, &IoTHubConnStatusCb, link)) {
        IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
        free(link->_msg);
        free(link);
        IoTHub_Deinit();
        return (NULL);
    }
    return (link);
}

static IoTHubLink *IoTHubLinkConstruct(char const *const key, char const *const value) {
    IoTHubLink *link;
    if (IoTHub_Init())
        return (NULL);
    if (!(link = malloc(sizeof(IoTHubLink)))) {
        IoTHub_Deinit();
        return (NULL);
    }
    if (!IoTHubLinkConstructStrings(key, value, link) || !IoTHubLinkConstructIoTClient(link))
        return (NULL);
    return (link);
}

static void IoTHubLinkDestruct(IoTHubLink *link) {
    IoTHubDeviceClient_LL_Destroy(link->_deviceHandle);
    free(link->_msg);
    free(link);
    IoTHub_Deinit();
}

int main(int const ac, char const *const *const av) {
    int ret = 0;
    IoTHubLink *link = IoTHubLinkConstruct((2 < ac ? 2[av] : "Alive"), 3 < ac ? 3[av] : "[$count]");
    if (!link)
        return (1);
    ret = SendLoop(link, atol(1 < ac ? 1[av] : "0"));
    IoTHubLinkDestruct(link);
    return (ret);
}

The above example is C99, non multithreaded, only calls do_work in one single case, and on my side on my machines reproduces the issue under 10' average (sometimes requires a few tries).

BillyTheFrog commented 1 year ago

crash1.zip

BillyTheFrog commented 1 year ago

crash2.zip

BillyTheFrog commented 1 year ago

crash3.zip

BillyTheFrog commented 1 year ago

Logs uploaded :

OPTION_LOG_TRACE enabled on the LL API

Crash1 : compiled in debug Crash 2 : compiled in debug with an address sanitizer Crash 3 : compiled in debug with a thread sanitizer

ewertons commented 1 year ago

Hi @BillyTheFrog , After looking into your (latest) sample I can tell you that the way you are using the Azure IoT SDK C is not the expected design for a production environment. Three things stand out:

  1. You are running both IoTHubDeviceClient_LL_DoWork and IoTHubDeviceClient_LL_SendEventAsync out of the same loop, with the same frequency;
  2. IoTHubDeviceClient_LL_DoWork is being called way too fast (there is no sleep in your loop; the standard DoWork frequency is every 100 milliseconds);
  3. You are sending too many telemetry messages at once, hitting your hubs throttling limit. That is not expected. Please see https://learn.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-quotas-throttling#basic-and-standard-tier-operations

I have modified one of our samples to have both functions above run in the same loop with no delay, and I'm running it under valgrind to verify if there are any crashes. I'll share details if we get any repro.

BillyTheFrog commented 1 year ago
  1. The variable "periodicSending" can modulate the sending frequency ; every X message sent IoTHubDeviceClient_LL_DoWork will be called. From my comment in this issue To call 'IoTHubDeviceClient_LL_DoWork' less frequently, an argument can be given (arg. #1 of the compiled binary) to call the function every X messages passed to the SDK (putting 5 as first argument will pass 5 messages to the SDK before calling the function). By default the function is called after every message passed.

  2. See 1.

  3. I do not care, as I do not expect this to work, this test is testing the robustness of a C code that shouldn't crash (on what seems to be a freed node in a linked list) and it should only crash if I am doing something wrong with the library flow

"I have modified one of our samples to have both functions above run in the same loop with no delay" why not using my sample ? Does it have anything that seems bad in it ? In this case my sample could be faulty. But if everything seems compliant with both the library and ISO 9899 TC3, then I guess it's worth investigating the sample I provided.