xHasKx / luamqtt

luamqtt - Pure-lua MQTT v3.1.1 and v5.0 client
https://xhaskx.github.io/luamqtt/
MIT License
160 stars 42 forks source link

when called mqtt.ioloop is stuck,finally trace at sync_recv_func(...) stucked in function client_mt:_apply_network_timeout() #38

Closed pxyove closed 1 year ago

pxyove commented 2 years ago

gcc——toolchain-mipsel_24kec+dsp_gcc-4.8-linaro_uClibc-0.9.33.2 lua:5.1.5 skynet:1.5.0 mqtt:3.4.2

My code is as follows:

local io = require("ioloop")
local ioloop = require("mqtt.ioloop")

local mq_loop = ioloop.create {
        sleep = 1,
        timeout = 1,
        sleep_function = skynet.sleep
    }

local client = mqtt.client(conn_params)
    client:on{
        connect = function(connack)
            assert(not mqtt_client)

            if not (connack and connack.rc == 0) then
                logger:error("connection to mqtt fail: %s %s", connack and connack:reason_string() or "", connack)
                return
            end

            logger:info("MQTT client connected server success!!!") -- successful connection
            state_table.connect_ok = true
            local response_count = 0
            for _, item in ipairs(topics) do
                local module, topic = item.module, item.topic
                client:subscribe{
                    topic = topic,
                    qos = 0,
                    callback = function(suback, args)
                        response_count = response_count + 1
                        if suback.rc[1] ~= 0 then
                            client:disconnect(0)
                            logger:error("subscribe [%s %s] fail %s", module, topic, suback.rc[1])
                            return
                        end

                        logger:info("subscribe from cloud [%s, %s] ok", module, topic)
                        if response_count >= #topics then
                            mqtt_client = client
                            logger:info("All topic subscribes OK, MQTT client is ready!!!")
                            --Not support, upload software version with 5mins timer.
                            --[[
                            local mqtt_dispatcher = skynet.queryservice("mqtt_dispatcher")
                            skynet.send(mqtt_dispatcher, "lua", "post")
                            ]]
                        end
                    end
                }
            end
        end,
        message = function(msg)
            local err = client:acknowledge(msg)
            if not err then
                logger:error("received a error msg!")
                return
            end
            table.insert(work_queue, {
                msg.topic,
                msg.payload
            })

            logger:info("Recv from cloud, topic: %s, payload: %s", msg.topic, msg.payload)
            skynet.wakeup(work_loop_co)
            skynet.yield()
        end,
        error = function(err, args, connack)
            logger:error("mqtt client err: %s, args: %s, connack: %s", err, args, connack)
            if not connack then
                logger:error("MQTT client error maybe unable to connect to network")
                return
            end

            if connack.rc == 4 then
                state_table.connect_ok = false
                client.args.reconnect = client.args.reconnect + 5
            end
        end,
        close = function()
            state_table.connect_ok = false
            mqtt_client = nil -- set to nil mqtt_client = nil -- set to nil
            client = nil
            logger:error("mqtt client is close, set to not ready!!!")
        end
    }

    mq_loop:add(client)
    client:start_connecting()
    while true do
       if not client then
            break
        end
        mq_loop:iteration()   --------when run here is stuck
        skynet.yield()
    end

The mqTT_client is ready when the client is connected to the broker and three topics are subscribed. The mqTT_client is then stuck at mq_loop: Iteration ()

What is the cause of the jam? Appreciate receiving

pxyove commented 2 years ago
local function ioloop_recv(self)
    return coroutine_resume(self.connection.coro)
end

-- Perform one input/output iteration, called by ioloop
function client_mt:_ioloop_iteration()
    -- working according state
    local loop = self.ioloop
    local args = self.args

    local conn = self.connection
    if conn then
        -- network connection opened
        -- perform packet receiving using ioloop receive function
        local ok, err
        if loop then
            ok, err = self:_io_iteration(ioloop_recv) -----------enter this branch
        else
            ok, err = self:_sync_iteration()
        end
........
end

function client_mt:_io_iteration(recv)
    local conn = self.connection

    -- first - try to receive packet
    local ok, packet, err = recv(self) -------finally stuck in here
    -- print("received packet", ok, packet, err)
...
end
xHasKx commented 2 years ago

Please write in English in this repo

xHasKx commented 2 years ago

What connector are you using? Without it, the blocking luasocket functions will be used

pxyove commented 2 years ago

What connector are you using? Without it, the blocking luasocket functions will be used

I use default connector,but I have two devices, same code, one communicating normally and the other having this problem,

xHasKx commented 2 years ago

Actually, if you're using a default connector, this is expected behavior to block the whole OS thread on recv() calls. This means no other code (C or Lua) will run in this OS thread until the next MQTT packet is received from the server.

As I can see from your code (by the way, please fix formatting by triple `-char), you're trying to call skyned.yield() after MQTT ioloop iteration. I suspect this will not work at all with the default connector.

I'm not familiar with the skynet module. Do you have some async networking (TCP connections) support in it? It might be wise to write a connector based on it, just like in ngxsocket.lua

pxyove commented 2 years ago

Actually, if you're using a default connector, this is expected behavior to block the whole OS thread on recv() calls. This means no other code (C or Lua) will run in this OS thread until the next MQTT packet is received from the server.

As I can see from your code (by the way, please fix formatting by triple `-char), you're trying to call skyned.yield() after MQTT ioloop iteration. I suspect this will not work at all with the default connector.

I'm not familiar with the skynet module. Do you have some async networking (TCP connections) support in it? It might be wise to write a connector based on it, just like in ngxsocket.lua

Thank you very much. I will try other connectors first to see if I can solve this problem

pxyove commented 2 years ago

Actually, if you're using a default connector, this is expected behavior to block the whole OS thread on recv() calls. This means no other code (C or Lua) will run in this OS thread until the next MQTT packet is received from the server. As I can see from your code (by the way, please fix formatting by triple `-char), you're trying to call skyned.yield() after MQTT ioloop iteration. I suspect this will not work at all with the default connector. I'm not familiar with the skynet module. Do you have some async networking (TCP connections) support in it? It might be wise to write a connector based on it, just like in ngxsocket.lua

Thank you very much. I will try other connectors first to see if I can solve this problem

May I ask you? I tracked the code further and found that it ended up here

function client_mt:_apply_network_timeout()
....

    local sync_recv_func = conn.recv_func
    conn.recv_func = function(...)
        while true do
              local data, err = sync_recv_func(...) 
              if not data and (err == "timeout" or err == "wantread") then)
                  loop.timeouted = true
                  coroutine_yield(err)
                  skynetco.yield(err)
              else
                  return data, err
              end
          end
    end

....
self.connection.recv_func = connector.receive() ------finally, block in here, is this correct? 
I used default connector, so block in function luasocket.receive()
....

sorry, I didn't get the format right

xHasKx commented 2 years ago

Yes, the luasocket.receive() call blocks the OS thread.

sorry, I didn't get the format right

I mean please enclose your code samples into `-char repeated 3 times to make block of code

pxyove commented 2 years ago

But I set the ioloop timeout to one second and it didn't seem to work, so is block,why? Two devices same code, one working, one not working. I'll keep track. Thank you very much.

xHasKx commented 2 years ago
  1. Try to uncomment print() debug code in luasocket.send() and luasocket.receive() functions to track MQTT traffic, maybe you can find a difference for the two devices
  2. Try to set timeout to 0.1
pxyove commented 2 years ago

I tried to do that, but two devices luasocket.send one and luasocket.receive three data, they are same. Thank you very much. I'll keep track.

pxyove commented 2 years ago
  1. Try to uncomment print() debug code in luasocket.send() and luasocket.receive() functions to track MQTT traffic, maybe you can find a difference for the two devices
  2. Try to set timeout to 0.1

Do you know the luasocket.settimeout of timeout where take effect and check. Because I'm suspicious of timeout no work

function luasocket.settimeout(conn, timeout)
    conn.timeout = timeout
    conn.sock:settimeout(timeout, "t")
end
xHasKx commented 2 years ago

I mean

local mq_loop = ioloop.create {
        timeout = 0.1,
...
}
pxyove commented 2 years ago

I mean

local mq_loop = ioloop.create {
        timeout = 0.1,
...
}

I tried to do that, but two devices luasocket.send one and luasocket.receive three data, they are same.

pxyove commented 2 years ago

Sorry, accidentally hit Closed ioloop itself is non-blocking,but now it is blocking,I want to know where to check timeout(default 0.005) and take effect, do you know where ?

xHasKx commented 2 years ago

Sorry, accidentally hit Closed ioloop itself is non-blocking,but now it is blocking,I want to know where to check timeout(default 0.005) and take effect, do you know where ?

Sorry, I cannot understand you

Did you try to uncomment network packet print()-traces as I advised?

pxyove commented 2 years ago

Sorry, accidentally hit Closed ioloop itself is non-blocking,but now it is blocking,I want to know where to check timeout(default 0.005) and take effect, do you know where ?

Sorry, I cannot understand you

Did you try to uncomment network packet print()-traces as I advised?

Yes, I've tried that,two devices luasocket.send and luasocket.receive data is same

I mean, ioloop itself is non-blocking,but now it is blocking, maybe timeout no effective, Do you know where it effect?

local mq_loop = ioloop.create {
        timeout = 0.1,
}
xHasKx commented 2 years ago

Timeout is applied to the luasocket's socket at https://github.com/xHasKx/luamqtt/blob/master/mqtt/luasocket.lua#L46 Can you please describe in more detail, what do you mean by "but I have two devices, same code, one communicating normally and the other having this problem". Does your code is blocked (i.e. code other than luamqtt does not run at all) or do you get disconnected from the broker on the second device?

pxyove commented 2 years ago

I'm sorry. I've been on vacation. After I traced Luasocket deeply, I found that timeout in Luasocket did not take effect. I will reply to you when I solve this problem. Thank you very much.

pxyove commented 2 years ago

Thank you for everyone, I have found out the cause of the problem(system-called connect/recv ret=-1,errno=0), It's caused by compile options error for LINUXTHREADS_OLD=y in uClibc-0.9.33.2.so .config, should select UCLIBC_HAS_THREADS_NATIVE=y.

for https://github.com/cloudwu/skynet/issues/1522 for https://github.com/lunarmodules/luasocket/issues/349 for https://github.com/xHasKx/luamqtt/issues/38