xHasKx / luamqtt

luamqtt - Pure-lua MQTT v3.1.1 and v5.0 client
https://xhaskx.github.io/luamqtt/
MIT License
160 stars 42 forks source link

Design or code example of using custom loop iterator #21

Closed dmitryuv closed 4 years ago

dmitryuv commented 4 years ago

There's a placeholder argument for ioloop

-- @tparam[opt] function args.sleep_function custom sleep function to call after each iteration but arguments are not used anywhere and there's no direct access to this setting

My case is to connect 2 streams of operations for back and fourth messaging (between KNX bus and MQTT), so I need simultaneously read messages from both local bus and MQTT. My first assumption was that sleep function might be best place to put my bus loop.

Here's an example of bus read loop:

function groupcallback(event)
    if event.dst == '1/1/1' then
        local value = knxdatatype.decode(event.datahex, dt.uint16)
        submit_to_mqtt(value)
    end
end

lb = require('localbus').new(0.5) -- timeout is 0.5 seconds
lb:sethandler('groupwrite', groupcallback)

while true do
    lb:step()
    do_some_other_stuff()
end

What is the best way to implement this type of messaging loop?

dmitryuv commented 4 years ago

I actually used code of run_ioloop() function directly to achieve desired result, but would be good to see better design of manual "stepping" through the iterations, thanks!

xHasKx commented 4 years ago

@dmitryuv , if if I understand you correctly, you intend to use ioloop builtin into the luamqtt. In this case, you may try this way. There is a https://github.com/xHasKx/luamqtt/blob/master/mqtt/ioloop.lua module (require("mqtt.ioloop")) This module .get() method will return you an instance of ioloop_mt which has a methods :add(client) and :run_until_clients().

So you may create an object with methods :set_ioloop(self) and :_ioloop_iteration() (through metatable or its own). Then use :add(it) method of the ioloop. Then ioloop will call it:_ioloop_iteration() over and over in its :run_until_clients() method the same way it process all MQTT clients in the luamqtt.

Please try this way and if it will help - I'll think how to export functions to simplify the described workflow.

dmitryuv commented 4 years ago

Thanks for getting back on this,

If I understand you correctly you propose to create "fake" mqtt client with my own loop implementation. Might sound like a solution, but it washes out meaning of the existing class (mqtt client), unless there's an additional abstraction on top of it. The other option i've implemented right now is plugging in my own step into the loop, but i had to extract portion of the code from the loop function:

-- part of the setup
local lb = require('localbus').new(0.005)
lb:sethandler('groupwrite', knx_cb)

-- main loop
function run_loop(client, bus)
  print("running ioloop for it")

  local loop = mqtt.get_ioloop()

  loop:add(client)
  loop.running = true   -- not sure why i need this? part of the original loop
  while next(loop.clients) do
    loop:iteration()
    bus:step()  -- KNX bus iteration
  end
  loop.running = false
  print("done, ioloop is stopped")
end

Probably my concern is that i had to extract part of the library's code instead of plugging in my custom step function. My original first thought that the easiest solution to address this is supplying user's callback that will be called from within the loop, but it's less flexible than your proposal.

xHasKx commented 4 years ago

The Lua has no interfaces/classes in its core like in Java or C#, so masquerading some object as mqtt client is not a "forbidden black magic" thing. It's more like a duck-typing concept in Python.

But inserting a function to the mqtt client lists in ioloop might be a good idea. A function is a first-class value in Lua, so we can easily distinguishing it from usual mqtt clients during ioloop iteration.

Consider this example:

local bus = ...

local loop = mqtt.get_ioloop()
loop:add(function() bus:step()  --[[ KNX bus iteration ]] end)
-- ...

loop:run_until_clients()

What do you think?

dmitryuv commented 4 years ago

Thanks, that sounds like a good plan. One more thought – since you have mqtt.run_ioloop(..) as an "official" entry point, could it be simplified to something like mqtt.run_ioloop( client1, function() --[[ custom loop ]] end)

overwise there will be duplication of library parts, also i'm not sure how important are the additional steps in the pre-step, i mean line cl:start_connecting() in the run_ioloop() code

xHasKx commented 4 years ago

@dmitryuv , check the new version v3.2.0 and example in a test https://github.com/xHasKx/luamqtt/blob/master/tests/spec/ioloop.lua

dmitryuv commented 4 years ago

@xHasKx looks great to me, thanks!

dvv commented 4 years ago

@xHasKx please clarify how to use with copas standard loop?

I attempted twice:

  copas.addthread(function()
    while true do
      client:_ioloop_iteration() -- choke
    end
  end)

./mqtt/client.lua:898: bad argument #1 to 'coroutine_resume' (thread expected, got nil)

  client:start_connecting()
  copas.addthread(function()
    while true do
      client:_io_iteration(client.connection.recv_func) -- choke
    end
  end)

./mqtt/luasocket.lua:34: bad argument #1 to 'receive' (string expected, got table)

TIA

dvv commented 4 years ago
  local ioloop = require("mqtt.ioloop").get(true, { timeout = 0.001 })
  copas.addthread(function()
    client.ioloop = ioloop
    while true do
      client:_ioloop_iteration()
      copas.sleep(ioloop.args.timeout)
    end
  end)

This works but shows CPU busy loop.

xHasKx commented 4 years ago

@dvv, I've added an example: https://github.com/xHasKx/luamqtt/blob/master/examples/copas-example.lua

The main thing is to use a new copas thread for each ioloop and mqtt client you plan to use (and proper arguments for ioloop creation: { sleep = 0.001, sleep_function = copas.sleep }, sleep=X is configurable )

dvv commented 4 years ago

@xHasKx Great, thanks!

However it still eats 6-10% of CPU in timeout busy loop.

xHasKx commented 4 years ago

@dvv, try to play with ioloop creation params - set sleep=0.01 and maybe add timeout=0.1

xHasKx commented 4 years ago

Actually, luamqtt's internal ioloop can be designed better (for example, with socket.select). I'm sure there is a way to improve luamqtt behavior when it runs inside copas loop

dvv commented 4 years ago

@xHasKx I sent a pull request with the idea. Please validate and consider applying. Running under copas allows for many benefits, imho.

xHasKx commented 4 years ago

@xHasKx I sent a pull request with the idea. Please validate and consider applying. Running under copas allows for many benefits, imho.

Thanks, I'll check