eclipse-mosquitto / mosquitto

Eclipse Mosquitto - An open source MQTT broker
https://mosquitto.org
Other
9.1k stars 2.4k forks source link

Using TLS in external event loop #1507

Open zevv opened 4 years ago

zevv commented 4 years ago

Hello Mosquitto people,

I'm not sure if Github issues is the right place to get user support, but I suspect I might be treading one of the fewer used paths with libmosquitto which maybe does not behave as advertized:

I have a problem understanding the semantics of the various mosquitto API functions for integrating with an external event loop - things work fine for me with plain MQTT, but I am not able to setup and use a connection using TLS. Here is what I do and what I see:

This is what I see, inspection is done by running the app under strace:

socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) = 8
fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
connect(8, {sa_family=AF_INET, sin_port=htons(8883), sin_addr=inet_addr("xx.xx.xx.xx")}, 16) = -1 EINPROGRESS (Operation now in progress)

The socket is correctly set to nonblocking mode and connect returns EINPROGRESS, as it should.

The next thing that happens is that mosquitto tries to send TLS data over the not-yet connected socket:

write(8, "\26\3\1\1'\1\0\1#\3\......, 300) = -1 EAGAIN (Resource temporarily unavailable)

This of course fails, but there is nothing I can do about this from my application. At this time mosquitto_connect_async() returns and I enter my main event loop. I manually check if mosquitto wants me to watch the socket for POLLOUT events, and indeed it wants to so I put it in the poll set.

poll([{fd=3, events=POLLIN}, {fd=8, events=POLLIN}, {fd=8, events=POLLOUT}], 3, 999) = 1 ([{fd=8, revents=POLLOUT}])

So mosquitto wants to write, I honour this request and call mosquitto_loop_write(), but nothing happens here - the call just returns 1 but no action is performed to further establish the TLS TCP connection.

I'm kind of lost here, and can only guess about workarounds to fix this. So some open questions:

Thank you!

zevv commented 4 years ago

Sorry for bumping, but can anyone shed some light on this to see if I'm doing something obviously wrong?

michaeliu commented 4 years ago

Hi there, I am using lib mosq with libevent as external loop, And a new PR #1588 raised for this, please help review it and let me know if any comments.

michaeliu commented 4 years ago

@zevv Hi there, I think the new PR #1626 will truly resolve the issue you metioned.

And per my understanding and when you use code with PR merged:

  1. call mosquitto_connect_async(): if you got ERR_DNS_RESOLVE_PEDING, you should call mosquitto_reconnect() after a while. move to step 2 when it success(connect or reconnect).

  2. Add mosquitto_socket() fd to your event loop and set read notification, if mosquitto_want_write() == true, set write notification too.

  3. Setup event loop and set a small timeout value e.g. 1 second for handle mosquitto_loop_misc() [it will handle heartbeat ],

    • call mosquitto_loop_read() when read is ready;
    • call mosquitto_loop_write() when write is ready.
    • call mosquitto_loop_misc() when read is ready or write is ready or timeout.
    • call mosquitto_want_write() to decide if you should keep write notification or not.
  4. check each call return value for each API call trigger reconnect if you got return code not SUCCESS or disconnect callback invoked. each call means:

    • mosquitto_loop_read()
    • mosquitto_loop_write()
    • mosquitto_loop_misc()
    • mosquitto_pub..
    • mosquitto_sub.. Any API call of mosquitto lib.
  5. Note this: clean up mosq_sock related external event loop items on disconnect callback(if you use epoll or wrapper of epoll), I got issue on some platform epoll will continue report EPOLLHUP if you do NOT clean up it here(before socket closed). since epoll will run into unknown behavior when you close socket before call CTRL_DEL... It will make your program use 100% CPU resource : -/

Hope it's clear for you and let me know if you have any comment.

detly commented 4 years ago

@michaeliu This is extremely useful, thanks. When/how often do you typically call mosquitto_want_write() though?

michaeliu commented 4 years ago

@michaeliu This is extremely useful, thanks. When/how often do you typically call mosquitto_want_write() though?

Maybe go through the code will be more clear ;-)

mqtt.h:

/**
    Copyright(C) 2019 Michael Liu(michael.liu.point@gmail.com). All right reserved.
    @file mqtt.h
    @brief MQTT related defines.
    @author Michael
    @date 2019-07-16
    @warning None.
    @details None.
**/

#ifndef __GMF_PLUGIN_MQTT_H__
#define __GMF_PLUGIN_MQTT_H__

#ifdef __cplusplus
extern "C" {
#endif              /* __cplusplus */

/* Include files. */

#include "common.h"
#include "mosquitto.h"
#include "gmf.h"
#include "json-c/json.h"
#include <netdb.h>

/* Macro constant definitions. */

#define MQTT_COMMAND_STR_MAX_SIZE 32

/* Type definitions. */

    typedef e_ret(*cmd_handler) (t_gmf_cfg * cfg, int ver, const char *topic, const json_object * data);
    typedef void (*mqtt_connect_connect_state_change) (void *obj, int status);

    typedef struct {
        char *command;
        cmd_handler handler;
    } t_mqtt_cmd_handler;

    typedef struct {
        void *cfg;
        struct event *rdevent;
        struct event *wrevent;
        struct mosquitto *mosq;
        unsigned char keep_connect;
        unsigned char connected;
        char *client_id;
        char *host;
        int port;
        int keepalive;
        char *ca_file;
        char *username;
        char *password;
        char *def_sub_topic;
        char *will_topic;
        char *will_msg;
        t_mqtt_cmd_handler *handlers;
        struct evdns_getaddrinfo_request *dns_req;
        mqtt_connect_connect_state_change connect_state_change;
    } t_mqtt_cfg;

/* External function declarations. */
    extern e_ret mqtt_init(t_mqtt_cfg ** mqtt, struct event_base *evbase, void *obj, t_mqtt_cmd_handler * handlers,
                   mqtt_connect_connect_state_change connect_state_change, const char *client_id,
                   const char *host, const int port, const int keepalive, const char *cafile,
                   const char *username, const char *password, const char *def_sub_topic,
                   const char *will_topic, const char *will_msg);
    extern e_ret mqtt_start(t_mqtt_cfg * mqtt);
    extern void mqtt_stop(t_mqtt_cfg * mqtt);
    extern void mqtt_run_once(t_mqtt_cfg * mqtt);
    extern void mqtt_cleanup(t_mqtt_cfg * mqtt);
    extern unsigned char mqtt_connected(t_mqtt_cfg * mqtt);

    extern void mqtt_publish(t_mqtt_cfg * mqtt, int *mid, const char *topic, int payloadlen, const void *payload,
                 int qos, bool retain);
    extern void mqtt_subscribe(t_mqtt_cfg * mqtt, int *mid, const char *sub, int qos);
    extern void mqtt_unsubscribe(t_mqtt_cfg * mqtt, int *mid, const char *sub);
    extern int mqtt_compare(t_mqtt_cfg * mqtt, const char *host, const int port, const int keepalive,
                const char *username, const char *password);

/* Macro API definitions. */

/* Global variable declarations. */

#ifdef __cplusplus
}               /* extern "C" */
#endif              /* __cplusplus */
#endif              /* __GMF_PLUGIN_MQTT_H__ */

mqtt.c

/*
**  Copyright (c) 2019 Michael Liu(michael.liu.point@gmail.com).
**
**  Project: Gateway Unified Management Platform
**  File:    mqtt.c
**  Author:  Michael
**  Date:    07/14/2019
**
**  Purpose:
**    mqtt related implement.
*/

/* Include files. */

#include "common.h"
#include "mqtt.h"

/* Macro constant definitions. */

#define DEFAULT_RECONNECT_MIN_TIME 5
#define DEFAULT_RECONNECT_MAX_TIME 60

/* Type definitions. */

/* Local function declarations. */

static void mqtt_static_init(void);
static void mqtt_proc_loop_read_misc(evutil_socket_t fd, short event, void *arg);
static void mqtt_proc_loop_write_misc(evutil_socket_t fd, short event, void *arg);

static void mqtt_log_callback_default(struct mosquitto *mosq, void *obj, int level, const char *str);
static void mqtt_connect_callback_default(struct mosquitto *mosq, void *obj, int rc);
static void mqtt_disconnect_callback_default(struct mosquitto *mosq, void *obj, int rc);
static void mqtt_message_callback_default(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message);
static void mqtt_subscribe_callback_default(struct mosquitto *mosq, void *obj, int mid, int qos_count,
                        const int *granted_qos);
static void mqtt_unsubscribe_callback_default(struct mosquitto *mosq, void *obj, int mid);
static void mqtt_publish_callback_default(struct mosquitto *mosq, void *obj, int mid);

static void mqtt_trigger_reconnect(t_mqtt_cfg * mqtt, e_bool firsttime);
static void mqtt_trigger_reconnect_callback(evutil_socket_t fd, short event, void *arg);

static e_ret mqtt_proc_reconnect(t_mqtt_cfg * mqtt);
static e_ret mqtt_assign_evloop(t_mqtt_cfg * mqtt);

static void mqtt_process_req_command(struct mosquitto *mosq, const char *topic, const char *data, void *usr_data);

static char *mqtt_get_command_ver(const char *topic, char *command, int *ver);

static void mqtt_handle_rc(t_mqtt_cfg * mqtt, int rc, const char *function, const char *file, unsigned long line);

static void mqtt_start_host_resovle(t_mqtt_cfg * mqtt);
static void mqtt_host_resovle_cb(int errcode, struct evutil_addrinfo *addr, void *arg);

/* Macro API definitions. */

#define mqtt_prepare_write(mqtt) \
    do { \
        if (mqtt->mosq && mosquitto_want_write(mqtt->mosq) && (mqtt->wrevent != NULL)) { \
            int rc = event_add(mqtt->wrevent, NULL); \
            if (rc != 0) { \
                gmf_err("event_add(fd:%d, type:%d) failed.\n", mosquitto_socket(mqtt->mosq), EV_WRITE); \
            } \
        } \
    } while(0)

#define mqtt_handle_return(mqtt, retval)                                                                               \
    do {                                                                                                           \
        mqtt_handle_rc((mqtt), (rc), __FUNCTION__, __FILE__, __LINE__);                                        \
    } while (0)

/* Global variable declarations. */

static int mqtt_initialed;

void mqtt_publish(t_mqtt_cfg * mqtt, int *mid, const char *topic, int payloadlen, const void *payload, int qos,
          bool retain)
{
    int rc = mosquitto_publish(mqtt->mosq, mid, topic, payloadlen, payload, qos, retain);
    mqtt_handle_return(mqtt, rc);
    if (MOSQ_ERR_SUCCESS == rc) {
        mqtt_prepare_write(mqtt);
    }
}

void mqtt_subscribe(t_mqtt_cfg * mqtt, int *mid, const char *sub, int qos)
{
    int rc = mosquitto_subscribe(mqtt->mosq, mid, sub, qos);
    mqtt_handle_return(mqtt, rc);
    if (MOSQ_ERR_SUCCESS == rc) {
        mqtt_prepare_write(mqtt);
    }
}

void mqtt_unsubscribe(t_mqtt_cfg * mqtt, int *mid, const char *sub)
{
    int rc = mosquitto_unsubscribe(mqtt->mosq, mid, sub);
    mqtt_handle_return(mqtt, rc);
    if (MOSQ_ERR_SUCCESS == rc) {
        mqtt_prepare_write(mqtt);
    }
}

int mqtt_compare(t_mqtt_cfg * mqtt, const char *host, const int port, const int keepalive, const char *username,
         const char *password)
{
    if (NULL == mqtt) {
        return -1;
    }

    if (mqtt->keepalive != keepalive) {
        return -1;
    }
    if (mqtt->port != port) {
        return -1;
    }
    if (safe_str_cmp(mqtt->host, host)) {
        return -1;
    }
    if (safe_str_cmp(mqtt->username, username)) {
        return -1;
    }
    if (safe_str_cmp(mqtt->password, password)) {
        return -1;
    }
    return 0;
}

e_ret mqtt_init(t_mqtt_cfg ** mqtt, struct event_base * evbase, void *cfg, t_mqtt_cmd_handler * handlers,
        mqtt_connect_connect_state_change connect_state_change, const char *client_id, const char *host,
        const int port, const int keepalive, const char *cafile, const char *username, const char *password,
        const char *def_sub_topic, const char *will_topic, const char *will_msg)
{
    int rc;
    mqtt_static_init();

    t_mqtt_cfg *new_mqtt = calloc(1, sizeof(t_mqtt_cfg));
    if (NULL == new_mqtt) {
        gmf_err("calloc(1, %d) failed.\n", sizeof(t_mqtt_cfg));
        goto out;
    }

    new_mqtt->cfg = cfg;
    new_mqtt->mosq = NULL;
    new_mqtt->rdevent = NULL;
    new_mqtt->wrevent = NULL;
    new_mqtt->keep_connect = 0;
    new_mqtt->client_id = safe_strdup(client_id);
    new_mqtt->host = safe_strdup(host);
    new_mqtt->port = port;
    new_mqtt->keepalive = keepalive;
    new_mqtt->ca_file = safe_strdup(cafile);
    new_mqtt->username = safe_strdup(username);
    new_mqtt->password = safe_strdup(password);
    new_mqtt->def_sub_topic = safe_strdup(def_sub_topic);
    new_mqtt->will_topic = safe_strdup(will_topic);
    new_mqtt->will_msg = safe_strdup(will_msg);
    new_mqtt->handlers = handlers;
    new_mqtt->connect_state_change = connect_state_change;

    new_mqtt->mosq = mosquitto_new(new_mqtt->client_id, true, new_mqtt);
    if (NULL == new_mqtt->mosq) {
        switch (errno) {
        case ENOMEM:
            gmf_err("mosquitto_new(), Error: Out of memory.\n");
            break;
        case EINVAL:
            gmf_err("mosquitto_new(), Error: Invalid id and/or clean_session.\n");
            break;
        default:
            gmf_err("mosquitto_new(), errno:%d.\n", errno);
        }
        goto out;
    }

    rc = mosquitto_username_pw_set(new_mqtt->mosq, new_mqtt->username, new_mqtt->password);
    if (MOSQ_ERR_SUCCESS != rc) {
        gmf_err("mosquitto_username_pw_set(), Error[%d][%s]\n", rc, mosquitto_strerror(rc));
        goto out;
    }

    rc = mosquitto_tls_set(new_mqtt->mosq, new_mqtt->ca_file, NULL, NULL, NULL, NULL);
    if (MOSQ_ERR_SUCCESS != rc) {
        gmf_err("mosquitto_tls_set(), Error[%d][%s]\n", rc, mosquitto_strerror(rc));
        goto out;
    }

    rc = mosquitto_threaded_set(new_mqtt->mosq, 0);
    if (MOSQ_ERR_SUCCESS != rc) {
        gmf_err("mosquitto_threaded_set(), Error[%d][%s]\n", rc, mosquitto_strerror(rc));
        goto out;
    }
#ifdef INSECURE_TLS_ENV
    rc = mosquitto_tls_insecure_set(mosq, true);
    if (MOSQ_ERR_SUCCESS != rc) {
        gmf_err("mosquitto_tls_insecure_set(), Error[%d][%s]\n", rc, mosquitto_strerror(rc));
        goto out;
    }
#endif /* INSECURE_TLS_ENV */

    mosquitto_log_callback_set(new_mqtt->mosq, mqtt_log_callback_default);
    mosquitto_connect_callback_set(new_mqtt->mosq, mqtt_connect_callback_default);
    mosquitto_disconnect_callback_set(new_mqtt->mosq, mqtt_disconnect_callback_default);
    mosquitto_subscribe_callback_set(new_mqtt->mosq, mqtt_subscribe_callback_default);
    mosquitto_unsubscribe_callback_set(new_mqtt->mosq, mqtt_unsubscribe_callback_default);
    mosquitto_publish_callback_set(new_mqtt->mosq, mqtt_publish_callback_default);
    mosquitto_message_callback_set(new_mqtt->mosq, mqtt_message_callback_default);

    mosquitto_int_option(new_mqtt->mosq, MOSQ_OPT_TCP_NODELAY, 1);

    if (new_mqtt->will_topic && new_mqtt->will_msg) {
        rc = mosquitto_will_set(new_mqtt->mosq, new_mqtt->will_topic, (int)strlen(new_mqtt->will_msg),
                    new_mqtt->will_msg, 0, 1);
        if (MOSQ_ERR_SUCCESS != rc) {
            gmf_err("mosquitto_will_set(), Error[%d][%s]\n", rc, mosquitto_strerror(rc));
            goto out;
        }
    }

    *mqtt = new_mqtt;
    return eRET_SUCCESS;

out:
    mqtt_cleanup(new_mqtt);
    return eRET_FAILURE;
}

e_ret mqtt_start(t_mqtt_cfg * mqtt)
{
    mqtt->keep_connect = 1;
    mqtt_trigger_reconnect(mqtt, eBOOL_TRUE);
    return eRET_SUCCESS;
}

void mqtt_run_once(t_mqtt_cfg * mqtt)
{
    if (mqtt->rdevent) {
        event_active(mqtt->rdevent, EV_READ | EV_TIMEOUT, 0);
    }
    if (mqtt->wrevent) {
        event_active(mqtt->wrevent, EV_WRITE, 0);
    }
}

void mqtt_stop(t_mqtt_cfg * mqtt)
{
    mqtt->keep_connect = 0;
    mosquitto_disconnect(mqtt->mosq);
    mqtt_run_once(mqtt);
}

void mqtt_cleanup(t_mqtt_cfg * mqtt)
{
    if (mqtt) {
        if (mqtt->rdevent != NULL) {
            event_free(mqtt->rdevent);
            mqtt->rdevent = NULL;
        }

        if (mqtt->wrevent != NULL) {
            event_free(mqtt->wrevent);
            mqtt->wrevent = NULL;
        }

        if (mqtt->mosq != NULL) {
            mosquitto_destroy(mqtt->mosq);
            mqtt->mosq = NULL;
        }

        if (mqtt->dns_req) {
            evdns_getaddrinfo_cancel(mqtt->dns_req);
            mqtt->dns_req = NULL;
        }

        safe_free_strdup(mqtt->client_id);
        safe_free_strdup(mqtt->host);
        safe_free_strdup(mqtt->ca_file);
        safe_free_strdup(mqtt->username);
        safe_free_strdup(mqtt->password);
        safe_free_strdup(mqtt->def_sub_topic);
        safe_free_strdup(mqtt->will_topic);
        safe_free_strdup(mqtt->will_msg);

        free(mqtt);

        gmf_dbg("mqtt_initialed:%d\n", mqtt_initialed);
        if (--mqtt_initialed == 0) {
            gmf_dbg("call mosquitto_lib_cleanup()\n");
            mosquitto_lib_cleanup();
        }
    }
}

unsigned char mqtt_connected(t_mqtt_cfg * mqtt)
{
    if (mqtt) {
        return mqtt->connected;
    }
    return 0;
}

void mqtt_static_init(void)
{
    gmf_dbg("mqtt_initialed:%d\n", mqtt_initialed);
    if (mqtt_initialed++ == 0) {
        /* Init mosquitto library */
        gmf_dbg("call mosquitto_lib_init()\n");
        mosquitto_lib_init();

        /* Check mosquitto version. */
        {
            int major = 0, minor = 0, revision = 0;
            mosquitto_lib_version(&major, &minor, &revision);
            gmf_dbg("Mosquitto library version : %d.%d.%d\n", major, minor, revision);
        }
    }
}

void mqtt_start_host_resovle(t_mqtt_cfg * mqtt)
{
    struct evutil_addrinfo hints;
    char port[16];
    t_gmf_cfg *cfg = mqtt->cfg;

    gmf_trace_enter();
    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;

    if (mqtt->dns_req) {
        evdns_getaddrinfo_cancel(mqtt->dns_req);
        mqtt->dns_req = NULL;
    }

    snprintf(port, sizeof(port), "%d", mqtt->port);

    mqtt->dns_req = evdns_getaddrinfo(cfg->dnsbase, mqtt->host, port, &hints, mqtt_host_resovle_cb, mqtt);
    if (mqtt->dns_req == NULL) {
        gmf_dbg("[request for %s returned immediately]\n", mqtt->host);
    }
    gmf_trace_exit();
}

void mqtt_host_resovle_cb(int errcode, struct evutil_addrinfo *addr, void *arg)
{
    t_mqtt_cfg *mqtt = arg;
    gmf_trace_enter();
    mqtt->dns_req = NULL;

    if (!errcode && addr) {
        gmf_dbg("MQTT[%s] Host[%s] lookup done, err: %d, addr:%p.\n", mqtt->client_id, mqtt->host, errcode,
            addr);
        mosquitto_void_option(mqtt->mosq, MOSQ_OPT_FREE_AINFO_CALLBACK, evutil_freeaddrinfo);
        mosquitto_void_option(mqtt->mosq, MOSQ_OPT_HOST_AINFO, addr);
        mqtt_proc_reconnect(mqtt);
    } else {
        gmf_err("MQTT[%s] Host[%s] lookup failed, err: %d, addr:%p.\n", mqtt->client_id, mqtt->host, errcode,
            addr);
        mqtt_trigger_reconnect(mqtt, eBOOL_FALSE);
    }

    gmf_trace_exit();
}

e_ret mqtt_assign_evloop(t_mqtt_cfg * mqtt)
{
    int rc;
    t_gmf_cfg *cfg = mqtt->cfg;

    int mosq_sock = mosquitto_socket(mqtt->mosq);
    if (mosq_sock <= 0) {
        gmf_err("mosquitto_socket(), ret %d\n", mosq_sock);
        goto out;
    }

    evutil_make_socket_nonblocking(mosq_sock);

    if (mqtt->rdevent) {
        event_free(mqtt->rdevent);
        mqtt->rdevent = NULL;
    }
    if (mqtt->wrevent) {
        event_free(mqtt->wrevent);
        mqtt->wrevent = NULL;
    }

    struct timeval tv;
    tv.tv_usec = 0;
    tv.tv_sec = 1;
    mqtt->rdevent =
        event_new(cfg->evbase, mosq_sock, EV_READ | EV_TIMEOUT | EV_PERSIST, mqtt_proc_loop_read_misc, mqtt);
    if (mqtt->rdevent == NULL) {
        gmf_err("event_new(fd:%d, type:%d) failed.\n", mosq_sock, EV_READ | EV_TIMEOUT | EV_PERSIST);
        goto out;
    }

    rc = event_add(mqtt->rdevent, &tv);
    if (rc != 0) {
        gmf_err("event_add(fd:%d, type:%d) failed.\n", mosq_sock, EV_READ | EV_TIMEOUT | EV_PERSIST);
        goto out;
    }

    mqtt->wrevent = event_new(cfg->evbase, mosq_sock, EV_WRITE, mqtt_proc_loop_write_misc, mqtt);
    if (mqtt->wrevent == NULL) {
        gmf_err("event_new(fd:%d, type:%d) failed.\n", mosq_sock, EV_WRITE);
        goto out;
    }

    mqtt_prepare_write(mqtt);

    return eRET_SUCCESS;

out:
    if (mqtt->rdevent) {
        event_free(mqtt->rdevent);
        mqtt->rdevent = NULL;
    }
    if (mqtt->wrevent) {
        event_free(mqtt->wrevent);
        mqtt->wrevent = NULL;
    }
    return eRET_FAILURE;
}

e_ret mqtt_proc_reconnect(t_mqtt_cfg * mqtt)
{
    int rc = 0;
    e_ret ret;

    gmf_trace_enter();

    rc = mosquitto_reconnect_async(mqtt->mosq);
    if (MOSQ_ERR_INVAL == rc) {
        /* Not initialed  try connect again */
        rc = mosquitto_connect_async(mqtt->mosq, mqtt->host, mqtt->port, mqtt->keepalive);
    }
    mqtt_handle_return(mqtt, rc);

    if (rc <= MOSQ_ERR_SUCCESS) {
        ret = mqtt_assign_evloop(mqtt);
        if (eRET_SUCCESS != ret) {
            gmf_err("mqtt_assign_evloop(), failed: %d\n", ret);
            mqtt_trigger_reconnect(mqtt, eBOOL_FALSE);
        }
    }
    gmf_trace_exit();
    return (eRET_SUCCESS);
}

void mqtt_trigger_reconnect_callback(evutil_socket_t fd, short event, void *arg)
{
    t_mqtt_cfg *mqtt = arg;
    os_dep_notused(fd);

    gmf_trace_enter();
    if (event & EV_TIMEOUT) {
        mqtt_start_host_resovle(mqtt);
    }
    gmf_trace_exit();
}

void mqtt_trigger_reconnect(t_mqtt_cfg * mqtt, e_bool firsttime)
{
    t_gmf_cfg *cfg = mqtt->cfg;
    gmf_trace_enter();
    if (mqtt->rdevent) {
        event_free(mqtt->rdevent);
        mqtt->rdevent = NULL;
    }

    if (mqtt->wrevent) {
        event_free(mqtt->wrevent);
        mqtt->wrevent = NULL;
    }

    struct timeval tv;
    if (firsttime) {
        tv.tv_sec = 0;
        tv.tv_usec = 50;
    } else {
        int rand =
            (os_dep_rand() % (DEFAULT_RECONNECT_MAX_TIME - DEFAULT_RECONNECT_MIN_TIME)) +
            DEFAULT_RECONNECT_MIN_TIME;
        tv.tv_sec = rand;
        tv.tv_usec = 0;
    }
    mqtt->rdevent = evtimer_new(cfg->evbase, mqtt_trigger_reconnect_callback, mqtt);
    if (mqtt->rdevent == NULL) {
        gmf_cannot_handle("evtimer_new() failed.\n");
    } else {
        evtimer_add(mqtt->rdevent, &tv);
    }
    gmf_trace_exit();
}

void mqtt_handle_rc(t_mqtt_cfg * mqtt, int rc, const char *function, const char *file, unsigned long line)
{
    //debug_log_print(gmf_log, eLOG_LEVEL_DEBUG, function, file, line,
    //              "MQTT[%s] handle rc:[%d][%s]\n", mqtt->client_id, rc, mosquitto_strerror(rc));
    int mqtt_sock = mosquitto_socket(mqtt->mosq);
    if ((rc == MOSQ_ERR_SUCCESS) && (mqtt_sock >= 0)) {
        return;
    } else if (rc < MOSQ_ERR_SUCCESS) {
        debug_log_print(gmf_log, eLOG_LEVEL_INFOR, function, file, line,
                "MQTT[%s] handle rc:[%d][%s] errno:[%d], sock:%d\n", mqtt->client_id, rc,
                mosquitto_strerror(rc), errno, mqtt_sock);
    } else {
        debug_log_print(gmf_log, eLOG_LEVEL_ERROR, function, file, line,
                "MQTT[%s] handle rc:[%d][%s] errno:[%d], sock:%d\n", mqtt->client_id, rc,
                mosquitto_strerror(rc), errno, mqtt_sock);
    }

    if (mqtt->keep_connect) {
        mqtt->connected = eBOOL_FALSE;
        if (mqtt->connect_state_change) {
            mqtt->connect_state_change(mqtt->cfg, eBOOL_FALSE);
        }
        mqtt_trigger_reconnect(mqtt, eBOOL_FALSE);
    } else {
        mqtt->connected = eBOOL_FALSE;
        if (mqtt->rdevent) {
            event_free(mqtt->rdevent);
            mqtt->rdevent = NULL;
        }
        if (mqtt->wrevent) {
            event_free(mqtt->wrevent);
            mqtt->wrevent = NULL;
        }
    }
}

void mqtt_proc_loop_read_misc(evutil_socket_t fd, short event, void *arg)
{
    int rc = 0;
    t_mqtt_cfg *mqtt = arg;
    os_dep_notused(fd);

    gmf_trace_enter();
    gmf_dbg("MQTT[%s] fd[%d] event: %d\n", safe_str_ptr(mqtt->client_id), fd, event);

    rc = mosquitto_loop_misc(mqtt->mosq);
    mqtt_handle_return(mqtt, rc);
    if (MOSQ_ERR_SUCCESS != rc) {
        gmf_trace_exit();
        return;
    }

    /* Handle want_connect when use ssl connection */
    if (event & EV_READ) {
        rc = mosquitto_loop_read(mqtt->mosq, 8);
        mqtt_handle_return(mqtt, rc);
        if (MOSQ_ERR_SUCCESS != rc) {
            gmf_trace_exit();
            return;
        }
    }

    mqtt_prepare_write(mqtt);

    gmf_trace_exit();
}

void mqtt_proc_loop_write_misc(evutil_socket_t fd, short event, void *arg)
{
    int rc = 0;
    t_mqtt_cfg *mqtt = arg;
    os_dep_notused(fd);

    gmf_trace_enter();
    gmf_dbg("MQTT[%s] fd[%d] event: %d\n", safe_str_ptr(mqtt->client_id), fd, event);

    if (event & EV_WRITE) {
        rc = mosquitto_loop_write(mqtt->mosq, 8);
        mqtt_handle_return(mqtt, rc);
        if (MOSQ_ERR_SUCCESS != rc) {
            gmf_trace_exit();
            return;
        }
    }

    mqtt_prepare_write(mqtt);

    gmf_trace_exit();
}

void mqtt_log_callback_default(struct mosquitto *mosq, void *obj, int level, const char *str)
{
    os_dep_notused(mosq);
    t_mqtt_cfg *mqtt = obj;

    int log_level = eLOG_LEVEL_DEBUG;

    switch (level) {
    case MOSQ_LOG_INFO:
        log_level = eLOG_LEVEL_INFOR;
        break;

    case MOSQ_LOG_NOTICE:
        log_level = eLOG_LEVEL_WARNING;
        break;

    case MOSQ_LOG_WARNING:
        log_level = eLOG_LEVEL_WARNING;
        break;

    case MOSQ_LOG_ERR:
        log_level = eLOG_LEVEL_ERROR;
        break;

    default:
        log_level = eLOG_LEVEL_DEBUG;
    }

    debug_log_print(gmf_log, log_level, __FUNCTION__, __FILE__, __LINE__, "MQTT[%s] %s\n", mqtt->client_id, str);
}

void mqtt_connect_callback_default(struct mosquitto *mosq, void *obj, int rc)
{
    t_mqtt_cfg *mqtt = obj;

    gmf_dbg("MQTT[%s] Connected rc:[%d][%s].\n", mqtt->client_id, rc, mosquitto_strerror(rc));

    if (rc == 0) {
        if (mqtt->def_sub_topic) {
            rc = mosquitto_subscribe(mosq, NULL, mqtt->def_sub_topic, 0);
            mqtt_handle_return(mqtt, rc);
            if (MOSQ_ERR_SUCCESS == rc) {
                mqtt_prepare_write(mqtt);
            } else {
                return;
            }
        }

        if (mqtt->connect_state_change) {
            mqtt->connected = eBOOL_TRUE;
            mqtt->connect_state_change(mqtt->cfg, eBOOL_TRUE);
        }
    } else {
        mqtt_trigger_reconnect(mqtt, eBOOL_FALSE);
    }
}

void mqtt_disconnect_callback_default(struct mosquitto *mosq, void *obj, int rc)
{
    os_dep_notused(mosq);
    os_dep_notused(rc);
    t_mqtt_cfg *mqtt = obj;
    gmf_dbg("MQTT[%s] Disconnected rc:[%d][%s].\n", mqtt->client_id, rc, mosquitto_strerror(rc));
    mqtt->connected = eBOOL_FALSE;
    /* Free read and write event to avoid epoll issue(continue report EPOLLHUP on closed fd) */
    if (mqtt->rdevent) {
        event_free(mqtt->rdevent);
        mqtt->rdevent = NULL;
    }
    if (mqtt->wrevent) {
        event_free(mqtt->wrevent);
        mqtt->wrevent = NULL;
    }
}

void mqtt_message_callback_default(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
    os_dep_notused(mosq);
    os_dep_notused(message);

    t_mqtt_cfg *mqtt = obj;
    if (message->payloadlen) {
        gmf_dbg("MQTT[%s] topic is [%s], data is [%s]\n", mqtt->client_id, message->topic, message->payload);
        mqtt_process_req_command(mosq, message->topic, message->payload, obj);
    }
}

void mqtt_subscribe_callback_default(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
    os_dep_notused(mosq);
    os_dep_notused(granted_qos);

    t_mqtt_cfg *mqtt = obj;
    gmf_dbg("MQTT[%s] Subscribe Message MID[%d] QoS Count[%d] Done.\n", mqtt->client_id, mid, qos_count);
}

void mqtt_unsubscribe_callback_default(struct mosquitto *mosq, void *obj, int mid)
{
    os_dep_notused(mosq);
    t_mqtt_cfg *mqtt = obj;
    gmf_dbg("MQTT[%s] Unsubscribe Message MID[%d] Done.\n", mqtt->client_id, mid);
}

void mqtt_publish_callback_default(struct mosquitto *mosq, void *obj, int mid)
{
    os_dep_notused(mosq);
    t_mqtt_cfg *mqtt = obj;
    gmf_dbg("MQTT[%s] Publish Message MID[%d] Done.\n", mqtt->client_id, mid);
}

char *mqtt_get_command_ver(const char *topic, char *command, int *ver)
{
    int cmd_ver = 0;
    char *cmd_pos = rindex(topic, '/');
    if (!cmd_pos || strlen(cmd_pos++) < 2) {
        return NULL;
    }

    safe_strcpy(command, cmd_pos, MQTT_COMMAND_STR_MAX_SIZE);

    char *cmd_ver_pos = rindex(command, '_');
    if (cmd_ver_pos) {
        *cmd_ver_pos = '\0';
        cmd_ver_pos++;
        if (strlen(cmd_ver_pos) > 0) {
            cmd_ver = (int)strtol(cmd_ver_pos, NULL, 10);
            *ver = cmd_ver;
        } else {
            *ver = 0;
        }
    } else {
        *ver = 0;
    }

    return command;
}

void mqtt_process_req_command(struct mosquitto *mosq, const char *topic, const char *data, void *usr_data)
{
    int i;
    e_ret ret;
    int cmd_ver = 0;
    char cmd[MQTT_COMMAND_STR_MAX_SIZE];
    t_mqtt_cmd_handler *handlers;

    os_dep_notused(mosq);

    t_mqtt_cfg *mqtt = usr_data;

    if (mqtt_get_command_ver(topic, cmd, &cmd_ver) == NULL) {
        gmf_info("Cannot get command and version.\n");
        return;
    }

    gmf_dbg("Get Msg: topic[%s], cmd[%s], ver[%d]\n", topic, cmd, cmd_ver);

    handlers = mqtt->handlers;
    if (handlers == NULL) {
        gmf_info("No handler registed.\n");
        return;
    }

    json_object *json_request = json_tokener_parse(data);
    if (NULL == json_request) {
        gmf_info("User request is not valid: json parse failed.\n");
        return;
    }

    for (i = 0; handlers[i].command != NULL; i++) {
        if (evutil_ascii_strcasecmp(cmd, handlers[i].command) == 0) {
            if (handlers[i].handler != NULL) {
                ret = handlers[i].handler(mqtt->cfg, cmd_ver, topic, json_request);
                gmf_dbg("Handler: topic[%s] done, ret:%d\n", topic, ret);
            } else {
                gmf_dbg("Null-Handler found: command[%s] topic[%s], ignore it.\n", cmd, topic);
            }
            goto done;
        }
    }

    gmf_info("Handler not found: topic[%s], data[%s] failed\n", topic, data);

done:
    json_object_put(json_request);
}
michaeliu commented 4 years ago

@michaeliu This is extremely useful, thanks. When/how often do you typically call mosquitto_want_write() though?

@detly

The code is part from a project of mine, you can just use it as open license code.

Feel free to let me know if you get any question, hope that's useful.