xHasKx / luamqtt

luamqtt - Pure-lua MQTT v3.1.1 and v5.0 client
https://xhaskx.github.io/luamqtt/
MIT License
154 stars 41 forks source link

In OpenResty error #19

Closed miaovictor closed 4 years ago

miaovictor commented 4 years ago

I wrap luamqtt like this:

local require = require

local mqtt = require("mqtt")
local ioloop = require("mqtt.ioloop")

local log = require("core.log")

local setmetatable = setmetatable
local timer_at = ngx.timer.at
local str_format = string.format

local function on_timer(premature, ctx)
  if premature then
    log.warn("exit mqtt timer!")
    return
  end

  log.info("starting mqtt client...")
  local loop = ioloop.create{
    timeout = ctx.cli.args.keep_alive,
    sleep_function = ngx.sleep,
  }
  loop:add(ctx.cli)
  ctx.cli:start_connecting()
  loop:run_until_clients()

  timer_at(1, on_timer, ctx)
end

local _M = {}
local mt = { __index = _M }

function _M:new(conf)
  local cli = mqtt.client{
    id = str_format("server/center/%d", ngx.worker.id()),
    uri = conf.broker,
    username = conf.username,
    password = conf.password,
    clean = true,
    connector = require("mqtt.ngxsocket")
  }

  cli:on{
    connect = function(connack)
      if connack.rc ~= 0 then
        log.error("connect mqtt broker failed!")
        return
      end

      log.info("mqtt client connected!")

      cli:publish{
        topic = "luamqtt/simpletest",
        payload = "hello",
        qos = 0
      }
    end,
    message = function(msg)
      cli:acknowledge(msg)
      log.info("receive mqtt message! ", msg)
    end,
    error = function(err)
      log.error("mqtt client error: ", err)
    end,
    close = function(conn)
      log.warn("mqtt conn closed: ", conn.close_reason)
    end
  }

  return setmetatable({
    conf = conf,
    cli = cli
  }, mt)
end

function _M:start()
  return timer_at(1, on_timer, self)
end

function _M:publish()
  timer_at(0, function ()
    self.cli:publish{
      topic = "luamqtt/simpletest",
      payload = "hello",
      qos = 0
    }
  end)
end

return _M

When mqtt connected, in connect callback function, the publish is worked, but i want publish a message when i received a http request, so i call the wrap function publish, but it failed:

2020/04/24 16:54:17 [error] 2002#2002: *3424 lua entry thread aborted: runtime error: /mnt/d/Workspace/Demo/Openresty/test//lib/mqtt/client.lua:1147: bad request
stack traceback:
coroutine 0:
        [C]: in function 'send'
        /mnt/d/Workspace/Demo/Openresty/test//lib/mqtt/client.lua:1147: in function '_send_packet'
        /mnt/d/Workspace/Demo/Openresty/test//lib/mqtt/client.lua:455: in function 'publish'
        /mnt/d/Workspace/Demo/Openresty/test//lua/core/mqtt.lua:82: in function </mnt/d/Workspace/Demo/Openresty/test//lua/core/mqtt.lua:81>, context: ngx.timer, client: 127.0.0.1, server: 0.0.0.0:7500
2020/04/24 16:54:46 [info] 2004#2004: *3423 client closed connection while waiting for request, client: 127.0.0.1, server: 0.0.0.0:7500
xHasKx commented 4 years ago

HI @miaovictor,

It looks you run into an issue related to this: https://github.com/openresty/lua-nginx-module/issues/706#issuecomment-195564706

Please try to follow recommendations there

miaovictor commented 4 years ago

So if i want publish a message when received a request, i must create a mqtt.client and publish message in connected callback function then close mqtt.client every time?

xHasKx commented 4 years ago

No, as I understand, you have to transfer the value you want to publish (should be a plain Lua value) from the one context (request) to another context (timer) and publish your value in this timer context. I assume, openresty should provide you some functions to transfer values between contexts

miaovictor commented 4 years ago

i'm sorry, my english is not good. I want to publish a message to mqtt when i received a http request, but openresty not allowed share a cosocket on the nginx worker level. because i created mqtt.client on a timer A, but i publish message on another timer B,so it throw exception:

2020/04/24 16:54:17 [error] 2002#2002: *3424 lua entry thread aborted: runtime error: /mnt/d/Workspace/Demo/Openresty/test//lib/mqtt/client.lua:1147: bad request
stack traceback:
coroutine 0:
        [C]: in function 'send'
        /mnt/d/Workspace/Demo/Openresty/test//lib/mqtt/client.lua:1147: in function '_send_packet'
        /mnt/d/Workspace/Demo/Openresty/test//lib/mqtt/client.lua:455: in function 'publish'
        /mnt/d/Workspace/Demo/Openresty/test//lua/core/mqtt.lua:82: in function </mnt/d/Workspace/Demo/Openresty/test//lua/core/mqtt.lua:81>, context: ngx.timer, client: 127.0.0.1, server: 0.0.0.0:7500
2020/04/24 16:54:46 [info] 2004#2004: *3423 client closed connection while waiting for request, client: 127.0.0.1, server: 0.0.0.0:7500

like lua_resty_mysql, it callsock:setkeepalive to reuse cosocket, but luamqtt not use.

miaovictor commented 4 years ago

I wrap like this, it worked fine!

local require = require

local mqtt = require("mqtt")
local ioloop = require("mqtt.ioloop")
local semaphore = require("ngx.semaphore")

local log = require("core.log")
local json = require("core.json")

local setmetatable = setmetatable
local timer_at = ngx.timer.at
local spawn = ngx.thread.spawn

local str_format = string.format

local mqtt_cache = ngx.shared.mqtt_cache

local sema = semaphore.new()

local function publish_handler(ctx)
  local ok, val, err
  while true do
    ok, err = sema:wait(5)
    if ok then
      val, err = mqtt_cache:lpop('publish')
      if val then
        local pub = json.decode(val)
        ok, err = ctx.cli:publish({
          topic = pub.topic,
          payload = pub.payload,
          qos = pub.qos
        })
        if not ok then
          log.error("publish message failed!", err)
        end
      else
        log.warn("pop publish message from cache failed!")
      end
    else
      if err ~= "timeout" then
        log.warn("semaphore wait error: ", err)
      end
    end
  end
end

local function on_timer(premature, ctx)
  if premature then
    log.warn("exit mqtt timer!")
    return
  end

  log.info("starting mqtt client...")

  ctx.cli:on({
    connect = function(connack)
      if connack.rc ~= 0 then
        log.error("connect mqtt broker failed!")
        return
      end

      log.info("mqtt client connected!")

      spawn(publish_handler, ctx)
    end,
    message = function(msg)
      ctx.cli:acknowledge(msg)
      log.info("receive mqtt message! ", msg)
    end,
    error = function(err)
      log.error("mqtt client error: ", err)
    end,
    close = function(conn)
      log.warn("mqtt conn closed: ", conn.close_reason)
    end
  })

  local loop = ioloop.create( {
    timeout = ctx.cli.args.keep_alive,
    sleep_function = ngx.sleep,
  })
  loop:add(ctx.cli)
  ctx.cli:start_connecting()
  loop:run_until_clients()

  timer_at(1, on_timer, ctx)
end

local _M = {}
local mt = { __index = _M }

function _M:new(conf)
  local cli = mqtt.client({
    id = str_format("server/center/%d", ngx.worker.id()),
    uri = conf.broker,
    username = conf.username,
    password = conf.password,
    clean = true,
    connector = require("mqtt.ngxsocket")
  })

  return setmetatable({
    conf = conf,
    cli = cli
  }, mt)
end

function _M:start()
  return timer_at(1, on_timer, self)
end

function _M:publish(topic, qos, payload)
  local len, err = mqtt_cache:rpush('publish', json.encode({
    topic = topic,
    qos = qos,
    payload = payload
  }))
  if not len then
    log.error("push publish message to cache failed!", err)
  else
    sema:post()
  end
end

return _M
xHasKx commented 4 years ago

@miaovictor , yes, that's exactly what I meant. When you've received an HTTP request - you transfer data you need to publish to the timer context using serializable data (json).

Also please note you are starting an ngx thread (spawn) in an MQTT client connect handler, which may be called several times (reconnect logic). So after reconnect, you will spawn another thread that may conflict with the first one.

miaovictor commented 4 years ago

thanks very much