eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.17k stars 723 forks source link

MQTTv5 and QoS 2 does not work #696

Closed thatsdone closed 7 months ago

thatsdone commented 1 year ago

Hi,

I noticed currently MQTTv5 and QoS 2 does not work. I found a similar issue in MQTTv3 days (6 years ago!): https://github.com/eclipse/paho.mqtt.python/issues/103, and are there similar problems in MQTTv5?

Distro: Ubuntu 22.04(amd64) Python: 3.10.6 Paho: paho-mqtt 1.6.1 MQTT Broker : EMQX Community Edition docker image (emqx/emqx:4.4.11)

$ python3 server.py 2
qos = 2
on_log(): server : 16 Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'' properties=None
on_log(): server : 16 Sending SUBSCRIBE (d0, m1) [(b'topic1', {QoS=2, noLocal=False, retainAsPublished=False, retainHandling=0})]
on_log(): server : 16 Received CONNACK (0, Success) properties=[AssignedClientIdentifier : MzA4NDA4NjYwMzY0NDMzNTU1MTkzMzQ3NTUwNzU0ODk3OTC, TopicAliasMaximum : 65535, RetainAvailable : 1, MaximumPacketSize : 1048576, WildcardSubscriptionAvailable : 1, SubscriptionIdentifierAvailable : 1, SharedSubscriptionAvailable : 1]
on_log(): server : 16 Received SUBACK
on_log(): server : 16 Received PUBLISH (d0, q2, r0, m1), 'topic1', properties=[], ...  (18 bytes)
on_log(): server : 16 Sending PUBREC (Mid: 1)
Traceback (most recent call last):
  File "server.py", line 27, in <module>
    mqttc.loop_forever()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 1756, in loop_forever
    rc = self._loop(timeout)
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 1164, in _loop
    rc = self.loop_read()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 1556, in loop_read
    rc = self._packet_read()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 2439, in _packet_read
    rc = self._packet_handle()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 3037, in _packet_handle
    return self._handle_pubrel()
  File "/usr/local/lib/python3.10/dist-packages/paho/mqtt/client.py", line 3348, in _handle_pubrel
    mid, = struct.unpack("!H", self._in_packet['packet'])
struct.error: unpack requires a buffer of 2 bytes
$

Here are my stupid programs.

If I give '1' for the argument, asking to use QoS 1, it works. So, I'm wondering there is something wrong in QoS 2 code path.

server side

$ cat server.py
import sys
import paho.mqtt.client as mqtt

def on_message(client, userdata, msg):
    print('on_message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

def on_log(mqttc, userdata, level, string):
    print('on_log(): %s : %s %s' % (userdata, level, string))

def message(client, userdata, msg):
    print('message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

mqttc = mqtt.Client(protocol=mqtt.MQTTv5, userdata='server')
mqttc.on_message = on_message
mqttc.on_log = on_log
host = '192.168.241.12'
port = 31883
topic = 'topic1'
if len(sys.argv) >= 1:
    qos = int(sys.argv[1])
    print(f'qos = {qos}')
mqttc.connect(host, port, 60)
mqttc.message_callback_add(topic, message)

mqttc.subscribe(topic, qos=2)

mqttc.loop_forever()

client side

$ cat client.py
import sys
import paho.mqtt.client as mqtt

def on_log(mqttc, userdata, level, string):
    print('on_log(): %s : %s %s' % (userdata, level, string))

def on_message(client, userdata, msg):
    print('on_message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

def message(client, userdata, msg):
    print('message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

host = '192.168.241.12'
port = 31883
topic = 'topic1'
interval = 60
client_id = 'client'
mqttc = mqtt.Client(client_id=client_id, protocol=mqtt.MQTTv5, userdata='client')

mqttc.connect(host=host, port=port, keepalive=interval, clean_start=False)
mqttc.message_callback_add(topic, message)

def on_message(client, userdata, msg):
    print('on_message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

def on_log(mqttc, userdata, level, string):
    print('on_log(): %s : %s %s' % (userdata, level, string))

def message(client, userdata, msg):
    print('message(): %s : %s %s %s %s / %s' % (userdata, msg.topic, msg.mid, msg.timestamp, msg.retain, msg.payload.decode()))

qos = 2
if len(sys.argv) >= 1:
    qos = int(sys.argv[1])
    print(f'qos = {qos}')
msg = 'Hello from client!'
print('publish(): publishing a message to: %s payload: %s' % (topic, msg))
mqttc.publish(topic, msg.encode('utf-8'), qos=qos)

mqttc.loop_forever()
naknz commented 1 year ago

Looks like this is a bug in the handling of the PUBREL messages, where with MQTTv5 after the packet identifier it will now include a reason code, and property length. The pack error is because there is these new trailing bytes after the identifier.

Quick work around is just passing the first 2 bytes to unpack: client.py:3348 mid, = struct.unpack("!H", self._in_packet['packet'][:2])

Better long term would be parsing the PUBREL reason code out as it can contain an error value.

Lenormju commented 1 year ago

I have the same problem when publishing my messages with QoS=2 while QoS=1 works fine. The solution provided by @naknz works fine. I am still using the 1.5.1, and to check if I have the problem I use :

python -c 'line_number=3238; import paho.mqtt.client as target; from pathlib import Path; print(target.__file__); line_text = Path(target.__file__).read_text().split("\n")[line_number-1]; expected = "        mid, = struct.unpack(\"!H\", self._in_packet['\''packet'\''])"; print(line_text); print(expected); print(line_text == expected)'

You may have to adjust the line_number for other versions.

ralight commented 1 year ago

Thank you, this is now fixed and will be in the next release.

matheuscandido commented 1 year ago

Hey @ralight, when will the next release happen? I really needed this QoS 2 for a product launch.

NSchrading commented 1 year ago

I also hit this issue. @ralight when can we expect a release to occur with this fix? It looks like there hasn't been a release in quite some time. Is this project still actively maintained?

drsantos89 commented 1 year ago

Same here. Happy to help in anything needed to close this.

KonssnoK commented 1 year ago

@ralight we faced the same issue as soon as we switched to mqtt5.

When is a new release expected?

drsantos89 commented 12 months ago

FYI, installing from branch 1.6.x fixed the issues for me.

matiasAS commented 7 months ago

@drsantos89 How to install the module from the 1.6.x branch? I don't understand, what version is it? (with 1.6.1 and broker emqx 5.4 it gives me the error when I receive a message to which I am subscribed with qos=2)

speak spanish?, i from chile!

hostmasterpontuax commented 7 months ago

I ran an update in my project and noticed that banch 1.6.x was removed. Now, what is the solution?

KonssnoK commented 7 months ago

i guess it was removed because it was merged ab5e7da5118841ab744965294db68f579b335f05

hostmasterpontuax commented 7 months ago

Great. Can't wait to see 2.0 becoming stable release. Till then, I managed to fork an already forked repo (with 1.6.x branch, of course).

PierreF commented 7 months ago

You can also use the 2.0.0rc2 available on PyPI: https://pypi.org/project/paho-mqtt/2.0.0rc2/

If not regressions are found on this release candidate, the 2.0 will be release in February.

matiasAS commented 7 months ago

@PierreF what day in February?, 1st?

matiasAS commented 7 months ago

@PierreF 2.0.0rc2 is stable?

Erickrk commented 4 months ago

Hi,

I've been testing both version 1.3 and 2.0 for a project and it looks like the issue persists. I am currently running Mosquitto version 2.0.18 in a Docker container and have configured publishers using both versions 1.3 and 2.0 of Paho.

Paho 2.0 publisher:

import time
import random

import paho.mqtt.client as mqtt

# Based on: https://eclipse.dev/paho/files/paho.mqtt.python/html/migrations.html
# This should be static
broker_address = "192.168.122.48" 
topic = "sensor/data"

# create new client instance
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="P1")
client.connect(broker_address) 

# Publish sensor data with QoS 2 and retain flag
counter = 100
MAX_SIZE = 60 * 1024  # 60 KB
message = "A" * MAX_SIZE # This generates an interesting behavior in the broker, exchanging a lot of ACKs

for i in range(counter):
    # sensor_data = random.randint(0, 999)
    client.publish(topic, message, qos=2, retain=True)
    print(f"Message {i} published to {topic}")
    time.sleep(0.01)  # Wait before next publish, increasing this didnt made it work

I've noticed it when trying to publish with QoS 2, the current code still doesn't reply to PUBREL as follows:

Version 1.3

pubrec-till-20

Version 2.0

pubrec-till-20-v2

Is my client somehow wrong for QoS 2?