xHasKx / luamqtt

luamqtt - Pure-lua MQTT v3.1.1 and v5.0 client
https://xhaskx.github.io/luamqtt/
MIT License
154 stars 41 forks source link

Large messages get corrupted or never arrive #48

Open seeseemelk opened 1 month ago

seeseemelk commented 1 month ago

When large messages are received (I'm testing with a payload of 2625000 bytes), the library starts to misbehave.

Sometimes it hangs forever waiting for the actual message data to come in. Other times it'll think that the payload is part of the topic name. Sometimes it will not read the entire message, and think that a part of the payload is the header for the next message.

I noticed that increasing the socket timeout seems to largely resolve/workaround the issue.

seeseemelk commented 1 month ago

I managed to get a more stable fix by doing chunked reads. Instead of reading the entire 2.5 megabytes in one go, I changed the timeout version of conn.recv_func to only do reads of up to 16k. If more data is request, multiple reads are done.

This allows the function to still yield periodically, while also being able to read a large packet.

I noticed that large sends also have issues. For this I simply disable the luasocket timeout before the send, and enable it again after. Not ideal, but function for my use case.

Tieske commented 1 month ago

Can you provide a test that shows the behaviour? or a minimal reproducing example?

oktoberfest6 commented 1 month ago

Hello,

I also observed the same phenomenon. Using the script below I have three different behaviors:

local mqtt = require("mqtt")

local client = mqtt.client{ uri = os.getenv("MQTTBROKER"), clean = true }

local function onsubscribe()
    local payload = string.rep('a', 200000)
    assert(client:publish{ topic = "luamqtt", payload = payload })
end

local function onconnect()
    assert(client:subscribe{ topic="luamqtt", qos=1, callback = onsubscribe })
end

local function onmessage(msg)
    assert(client:acknowledge(msg))
    print("received message #msg =", #msg.payload)
    client:disconnect()
end

client:on{ connect = onconnect, message = onmessage }
mqtt.run_ioloop(client)
Tieske commented 1 month ago

@oktoberfest6 thx for the test code. I tried to take this into account when I rewrote the connectors in this branch: https://github.com/xHasKx/luamqtt/pull/31 But your test code equally failed. I have now fixed that (last 2 commits I just added).

So if you need this, use that branch.

@xHasKx any chance we can merge that branch? I'd love to drop my fork in favour of this original library, but it needs those fixes.

Tieske commented 1 month ago

@seeseemelk since you also reported the same issue when sending, would you mind giving #31 a try, it should fix that as well, since it caters for partial sends.

seeseemelk commented 1 month ago

Just tried it, but it's still broken in interesting ways.

I've uploaded the code I'm using to test it here: https://github.com/seeseemelk/luamqtt-failure-example

test-image.lua sends a payload of the ASCII character 'A' 537726 times, get-image.lua receives it and prints out the length.

To test, start get-image.lua, then run test-image.lua repeatedly. Sometimes test-image.lua will hang without anything ever arriving at get-image.lua, other times get-image.lua will print out with some error message. Varying the length of the payload changes the error message sometimes.

$ lua get-image.lua 
Connected
Subscribed
Wrote image 1 of length 525310
MQTT client error:  failed to receive next packet: PUBREL: unexpected flags value: 1
Connected
Subscribed
seeseemelk commented 1 month ago

I'm using a Mosquitto configured to allowed anonymous access as a broker.

seeseemelk commented 1 month ago

Just did a quick test, LuaSocket will sometimes return both a timeout and some data as part of a third return value containing the data that could be read within the time. Next time one calls the receive function, the data that was partially read the last time is not returned again.

For example, take the following bit of code:

local socket = require("socket")

local conn = socket.connect("localhost", 1234)
print("connected")

conn:settimeout(3)

local data, err, part = conn:receive(3)
print("Data:", data)
print("Err:", err)
print("Part:", part)

local data, err, part = conn:receive(3)
print("Data:", data)
print("Err:", err)
print("Part:", part)

conn:close()

This can in some circumstances return the following result (tested by running netcat in a separate terminal and pressing a key followed by enter twice):

connected
Data:   nil
Err:    timeout
Part:   e

Data:   nil
Err:    timeout
Part:   f

The byte e that was returned in the first call was not returned in the second call. In other words, when a timeout occurs and some data was received, this data gets dropped.

The receive function does take a second argument, a data prefix. If you pass the partial data of the last call to this function, it will behave as expected.

local socket = require("socket")

local conn = socket.connect("localhost", 1234)
print("connected")

conn:settimeout(3)

local data, err, part = conn:receive(3)
print("Data:", data)
print("Err:", err)
print("Part:", part)

local data, err, part2 = conn:receive(3, part)
print("Data:", data)
print("Err:", err)
print("Part:", part2)

conn:close()

Will result in

connected
Data:   nil
Err:    timeout
Part:   e

Data:   e
f
Err:    nil
Part:   nil

Reading the luamqtt code, it does not seem that this behaviour is being taken into account.

seeseemelk commented 1 month ago

And indeed, changing the luasocket:plain_receive in @Tieske's branch from

local data, err, partial = sock:receive(size)
if data then
    return data
end

to

local data, err, partial = sock:receive(size, self.sockPartialData)
self.sockPartialData = partial
if data then
    return data
end

seems to fix the issue for me.

seeseemelk commented 1 month ago

I have not checked the behaviour of the send function, but reading the documentation of it and comparing it to the receive function I think @Tieske's send function is sound.

Tieske commented 1 month ago

@seeseemelk

Just did a quick test, LuaSocket will sometimes return both a timeout and some data as part of a third return value containing the data that could be read within the time.

The fix I created only lives in my keepalive branch, which is in PR #31 . If you used the code from my fork, then ensure you use the keepalive branch and neither main nor master in that case.

Here's the fix I added: https://github.com/xHasKx/luamqtt/pull/31/files#diff-d0c02746691eb0a97d87327874dc1658dfb7946448270d9e298e419b12f0897dR121-R126 which is in line with your reported fix (but as mentioned that is NOT in my main and master branches)

Tieske commented 1 month ago

fyi; I've merged the fix into my main branch and released version t1.0.1 of my fork, available on LuaRocks.

install via luarocks install luamqttt (note the extra t, and there can only be one of luamqttt and luamqtt be installed at the same time, since they use the same filenames).

seeseemelk commented 1 month ago

Indeed, I was using the main branch instead of the keepalive branch. The latter branch does indeed work, as does t1.0.1. Dankuwel!

oktoberfest6 commented 1 month ago

@Tieske: thanks for the fix!

I also noticed that you implemented (since 1.0.0) signal_idle and signal_code, which are a nice way to detect timeouts when using the module with external connectors.

I hope all these great improvements will be merged into the original library