xHasKx / luamqtt

luamqtt - Pure-lua MQTT v3.1.1 and v5.0 client
https://xhaskx.github.io/luamqtt/
MIT License
154 stars 41 forks source link

luamqtt on OpenResty #13

Closed irimiab closed 5 years ago

irimiab commented 5 years ago

Hello

I was using with success the release v1.4 with OpenResty, by providing my own connector which uses ngx.socket.tcp. But now, with the latest version (v3.0), I can't make it work due to the new ioloop implementation, which apparently depends on luasocket.

Is it possible to run this latest version on OpenResty? Maybe by providing the necessary settings (sleep function, maybe) to ioloop or to use a different implementation for ioloop (which is suited for OpenResty)?

Thank you.

xHasKx commented 5 years ago

Hi @irimiab,

Can you please provide a shortest example of your OpenResty usage of luamqtt v1.4? I just need to run something to see how it worked in that release.

For now I can say connectors should work without ioloop, if not - I'll fix

irimiab commented 5 years ago

Sure, here it is:

    local client = mqtt.client{ uri = topic.broker.hostname, clean = true, connector = require "mqtt.ngxsocket" }
    if client:connect() then
        if client:subscribe{ topic = topic.topic } then
            client:on("message", function(msg)
                -- do something
            end)

            client:on("error", function(err)
                log(ERR, err)
            end)

            client:on("close", function()
                log(INFO, "MQTT connection closed")
                -- restart connection
            end)

            client:receive_loop()
        else
            log(INFO, "MQTT failed to subscribe")
            -- restart connection
        end
    else
        log(INFO, "MQTT connection failed")
        -- restart connection
    end

This runs in a timer context. Should you want the timer functions to be included, let me know and I will post them here too.

The module mqtt.ngxsocket is this:

-- module table
local luasocket = {}

-- Open network connection to .host and .port in conn table
-- Store opened socket to conn table
-- Returns true on success, or false and error text on failure
function luasocket.connect(conn)
    local socket = ngx.socket.tcp()
    socket:settimeout(2147483647) -- timeout in ~24 days
    local sock, err = socket:connect(conn.host, conn.port)
    if not sock then
        return false, "socket.connect failed: "..err
    end

    conn.sock = socket
    return true
end

-- Shutdown network connection
function luasocket.shutdown(conn)
    local ok, err = conn.sock:close()
end

-- Send data to network connection
function luasocket.send(conn, data, i, j)
    return conn.sock:send(data)
end

-- Receive given amount of data from network connection
function luasocket.receive(conn, size)
    local ok, err = conn.sock:receive(size)
    return ok, err
end

-- export module table
return luasocket
xHasKx commented 5 years ago

This runs in a timer context. Should you want the timer functions to be included, let me know and I will post them here too.

Yes, post it please

irimiab commented 5 years ago

There you go:


local mqtt = require "mqtt"

local log = ngx.log
local timer_at = ngx.timer.at

local ERR = ngx.ERR
local WARN = ngx.WARN
local INFO = ngx.INFO

local start_mqtt_client, run_mqtt_timer

run_mqtt_timer = function(topic)
    local ok, err = timer_at(1, start_mqtt_client, topic)
    if not ok then
        log(ERR, 'Failed to start the MQTT client: ', err)
    end
end

start_mqtt_client = function(premature, topic)    
    log(INFO, "Starting MQTT client for topic ", topic.topic, " of broker ", topic.broker.id)

    local client = mqtt.client{ uri = topic.broker.hostname, clean = true, connector = require "mqtt.ngxsocket" }
    if client:connect() then
        if client:subscribe{ topic = topic.topic } then
            client:on("message", function(msg)
                -- do some processing here
            end)

            client:on("error", function(err)
                log(ERR, err)
            end)

            client:on("close", function()
                log(INFO, "MQTT connection closed")
                run_mqtt_timer(topic)
            end)

            client:receive_loop()
        else
            log(INFO, "MQTT failed to subscribe")
            run_mqtt_timer(topic)
        end
    else
        log(INFO, "MQTT connection failed")
        run_mqtt_timer(topic)
    end
end

for _, topic in ipairs(config.topics) do
    run_mqtt_timer(topic)
end

The structure config.topics is a table with topics information (we are spawning a timer for each topic we need to subscribe). For testing, you should define it manually (we are generating the data structure from some JSON files during the init phase).

This is an example that should work, even though I didn't test it:

local config = { topics = { 
  {
    broker = {
      hostname = "test.mosquitto.org",
      id = "broker_id_1",
      port = 1883
    },
    topic = "some/topic"
  }, {
    broker = {
      hostname = "broker.hivemq.com",
      id = "broker_id_2",
      port = 1883
    },
    topic = "another/topic"
  } 
} }
irimiab commented 5 years ago

This is run with an NginX configuration like this:

worker_processes  1;
error_log logs/error.log;

events {
    use kqueue;
    worker_connections 1024;
    multi_accept on;
}

stream {
    lua_package_path ";;${prefix}../app/?.lua;";
    lua_code_cache on;

    resolver 8.8.8.8;

    init_worker_by_lua_file "../app/main.lua";
}

The file main.lua is the one I posted above. Please note that we are using only one worker, so there is no need to use a semaphore to prevent duplicate operations.

This is just a proof of concept, of course.

xHasKx commented 5 years ago

@irimiab, I've reproduced your runtime. I have to think how to adapt new version for using with your ngx connector - it may take all weekends, but I hope to get success here

irimiab commented 5 years ago

Ok, great. If we can make it work next week, it's great. Thanks.

xHasKx commented 5 years ago

@irimiab, I've implemented the new version to be usable in openresty again. Please see an examples here: https://github.com/xHasKx/luamqtt/tree/master/examples/openresty

Also I've included your ngxsocket connector into the sources, I hope you will not be offended

Please leave any feedback here

irimiab commented 5 years ago

Thank you very much. We'll give feedback as soon as we integrate this new version.

irimiab commented 5 years ago

I'm sorry for getting back to you so late, but we've had a week off from work. During this time, the MQTT lib has been left in tests and I can say it worked exactly as expected.

Good work and thank you for your prompt implementation.

xHasKx commented 5 years ago

@irimiab, thanks for feedback!