eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.15k stars 724 forks source link

Publishing multiple messages simultaneously fail #748

Closed lu-maca closed 1 week ago

lu-maca commented 10 months ago

Hi,

I'm trying to run a simple publisher-subscriber example but a really weird behaviour happens:

# publisher.py
------------------------------------------
import paho.mqtt.client as mqtt

client = mqtt.Client()
client.connect("localhost",port=1883)

client.publish("mtq40_1", b"\x01")
client.publish("mtq40_2", b"\x01")
client.publish("mtq40_3", b"\x01")
client.publish("mtq40_4", b"\x01")
client.publish("rw250_1", b"\x01")
client.publish("rw250_2", b"\x01")
client.publish("rw250_3", b"\x01") 
client.publish("rw250_4", b"\x01")  

# subscriber.py
------------------------------------------
import paho.mqtt.client as mqtt

client = mqtt.Client()

def _initialize(connection_timeout=100):
    def on_connect(client, userdata, flags, rc):
        # check if the connection receives a CONNACK response from the server
        if rc != 0:
            raise ConnectionRefusedError("mqtt connection refused.")

        # subscribe to downlink
        client.subscribe("mtq40_1")
        client.subscribe("mtq40_2")
        client.subscribe("mtq40_3")
        client.subscribe("mtq40_4")
        client.subscribe("rw250_1")
        client.subscribe("rw250_2")
        client.subscribe("rw250_3")
        client.subscribe("rw250_4")

    def on_message(client, userdata, msg): 
        # add the sniffed message to the correct queue 
        print(msg.topic, msg.payload)

    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(host="localhost", port=1883)

_initialize()
client.loop_forever()

When I run publisher.py, the weird stuff happens:

# first run
mtq40_1 b'\x01'
mtq40_2 b'\x01'
mtq40_3 b'\x01'
mtq40_4 b'\x01'

# second run
mtq40_1 b'\x01'
mtq40_2 b'\x01'

# third run
mtq40_1 b'\x01'
mtq40_2 b'\x01'
mtq40_3 b'\x01'
mtq40_4 b'\x01'

It seems that the publisher is not sending messages correctly. When a time.sleep(0.05) is added between the publish operations, it works fine:

mtq40_1 b'\x01'
mtq40_2 b'\x01'
mtq40_3 b'\x01'
mtq40_4 b'\x01'
rw250_1 b'\x01'
rw250_2 b'\x01'
rw250_3 b'\x01'
rw250_4 b'\x01'

mtq40_1 b'\x01'
mtq40_2 b'\x01'
mtq40_3 b'\x01'
mtq40_4 b'\x01'
rw250_1 b'\x01'
rw250_2 b'\x01'
rw250_3 b'\x01'
rw250_4 b'\x01'

Why does this happens? Am I doing something wrong?

Thanks! Luca

EngrealSun commented 8 months ago

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718099 The MQTT protocol allows discarding messages with a QoS equal to 0。 You can switch to a different MQTT broker or specify the QoS。

PierreF commented 6 months ago

The publisher client is not running. You never called loop/loop_start or loop_forever. It also seems you never wait for publisher to finish publishing (no disconnect, no even a arbitrary sleep). Call to publish isn't synchronous and don't wait to publish packet to be received by broken (which isn't possible in QoS=0).

Publish() will only submit packet to the TCP socket, and OS might no send them on network immediately. I'm not 100% sure, but I think that OS is allowed to discard not yet send data on socket close (which happen if you terminate the program). That why adding some sleep (a sleep at the end of publisher.py is enough) "solve" your problem. A fix should be to start the client (loop_forever) and disconnect + wait for disconnection before exiting.

A better solution is paho.mqtt.publish (https://github.com/eclipse/paho.mqtt.python#id3) which take care of all needs to send message.

MattBrittan commented 1 week ago

Closing due to inactivity (and it looks like an answer has been provided).