fizista / micropython-umqtt.robust2

MIT License
47 stars 6 forks source link

Constant disconnects, and publish example #12

Open jt274 opened 1 year ago

jt274 commented 1 year ago

Describe the bug I recently switched to this library from umqtt.simple. The client does not subscribe to anything, only publishes data every 30 seconds. On the previous library, it would work consistently for 8-10 hours, then have some kind of crash (I assume a network error). After switching to this library, it publishes data 2-3 times and then says there is a connection error and reconnects. It does this constantly.

The error it shows is (MQTTException(7,), 9)

Relevant code:

from umqtt.robust2 import MQTTClient

def mqtt_connect():
    mqtt_client = MQTTClient(mqtt_client_id, broker, 1883, mqtt_user, mqtt_pass, keepalive=60)
    mqtt_client.set_last_will(last_will_topic, 'Disconnected', retain=True)
    mqtt_client.connect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)
    return mqtt_client

def mqtt_reconnect():
    print('Could not connect to MQTT broker, reconnecting: ' + str(mqtt_client.conn_issue))
    mqtt_client.reconnect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)

try:
    mqtt_client = mqtt_connect()
    print('MQTT Connected')
except OSError as e:
    print('Error connecting to MQTT, retrying...')
    mqtt_reconnect()

while True:
    if mqtt_client.is_conn_issue():
            while mqtt_client.is_conn_issue():
                mqtt_reconnect()
    else:
        mqtt_client.publish(topic, data, qos=1)
        mqtt_client.check_msg()
        mqtt_client.send_queue()
        sleep(30)

I'd also request an example for a publishing (not subscribing) client. There doesn't seem to be much documentation on that.

Details (please complete the following information):

fizista commented 1 year ago

In mqtt_reconnect function, the variable mqtt_client has the value null. Change to:

def mqtt_reconnect(mqtt_client):
    ....

Also add resubscribe, see below:

        while c.is_conn_issue():
            c.reconnect()
        else:
            c.resubscribe()

I also uploaded a small code fix for this library. But rather, the problem is in your code. Let us know if it works.

jt274 commented 1 year ago

Hello!

I do not believe the mqtt_client variable is null since it is defined by mqtt_client = mqtt_connect(). At least the mqtt_reconnect() function appears to work correctly and not throw any errors, and the client reconnects.

For the resubscribe() function, is that necessary if the client is not subscribed to any topics, but only publishing messages?

fizista commented 1 year ago

I looked into it. The problem is keepalive. You are not keeping the connection with any packages. Therefore, the connection is closed. Below is an example of how to do it. This is one of many solutions.

from umqtt.robust2 import MQTTClient

def mqtt_connect():
    mqtt_client = MQTTClient(mqtt_client_id, broker, 1883, mqtt_user, mqtt_pass, keepalive=60)
    mqtt_client.set_last_will(last_will_topic, 'Disconnected', retain=True)
    mqtt_client.connect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)
    return mqtt_client

def mqtt_reconnect():
    print('Could not connect to MQTT broker, reconnecting: ' + str(mqtt_client.conn_issue))
    mqtt_client.reconnect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)

try:
    mqtt_client = mqtt_connect()
    print('MQTT Connected')
except OSError as e:
    print('Error connecting to MQTT, retrying...')
    mqtt_reconnect()

while True:
    if mqtt_client.is_conn_issue():
            while mqtt_client.is_conn_issue():
                mqtt_reconnect()
    else:
        mqtt_client.publish(topic, data, qos=1)
        mqtt_client.check_msg()
        mqtt_client.send_queue()
        # While waiting for the next action, you send a "ping" to keep the connection.
        count = 0
        while count < 30:
            count += 1
            sleep(1)
            if count % 5: # For example, every 5 seconds
                mqtt_client.ping()
fizista commented 1 year ago

I'm just looking, and I see that the errors continue to appear. So this is not the solution I wrote above.

fizista commented 1 year ago

This code works

from umqtt.robust2 import MQTTClient

def mqtt_connect():
    mqtt_client = MQTTClient(mqtt_client_id, broker, 1883, mqtt_user, mqtt_pass, keepalive=60)
    mqtt_client.set_last_will(last_will_topic, 'Disconnected', retain=True)
    mqtt_client.connect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)
    return mqtt_client

def mqtt_reconnect():
    print('Could not connect to MQTT broker, reconnecting: ' + str(mqtt_client.conn_issue))
    mqtt_client.reconnect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)

try:
    mqtt_client = mqtt_connect()
    print('MQTT Connected')
except OSError as e:
    print('Error connecting to MQTT, retrying...')
    mqtt_reconnect()

while True:
    if mqtt_client.is_conn_issue():
            while mqtt_client.is_conn_issue():
                mqtt_reconnect()
    else:
        mqtt_client.publish(topic, data, qos=1)

        # While waiting for the next action, you send a "ping" to keep the connection.
        count = 0
        while count < 30:
            mqtt_client.check_msg()
            mqtt_client.send_queue()
            count += 1
            sleep(1)
            if count % 5: # For example, every 5 seconds
                mqtt_client.ping()
jt274 commented 1 year ago

Why is it necessary to ping every 5 seconds, if the keepalive is set to 60 seconds with this line? Isn't that the point of keepalive?

mqtt_client = MQTTClient(mqtt_client_id, broker, 1883, mqtt_user, mqtt_pass, keepalive=60)

I think in devices with batteries a ping every 5 seconds would drain it more quickly.

jt274 commented 1 year ago

The MQTT specification here says: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

The Keep Alive is a time interval measured in seconds. Expressed as a 16-bit word, it is the maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one Control Packet and the point it starts sending the next. It is the responsibility of the Client to ensure that the interval between Control Packets being sent does not exceed the Keep Alive value. In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet [MQTT-3.1.2-23].

The Client can send PINGREQ at any time, irrespective of the Keep Alive value, and use the PINGRESP to determine that the network and the Server are working.

If the Keep Alive value is non-zero and the Server does not receive a Control Packet from the Client within one and a half times the Keep Alive time period, it MUST disconnect the Network Connection to the Client as if the network had failed [MQTT-3.1.2-24].

If a Client does not receive a PINGRESP Packet within a reasonable amount of time after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server.

A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism. This means that, in this case, the Server is not required to disconnect the Client on the grounds of inactivity.
Note that a Server is permitted to disconnect a Client that it determines to be inactive or non-responsive at any time, regardless of the Keep Alive value provided by that Client.

Shouldn't this mean that with a keepalive time of 60 seconds, sending a publish command every 30 seconds should be sufficient to keep the connection open, without manually sending a ping?

fizista commented 1 year ago

In theory, you're right. In practice, the sent packet has 15 seconds to reach the server, and then we have 15 seconds to receive a response from the server.

In your code, there was an additional delay associated with processing the received data. You processed the received data every 30 seconds (check_msg()), so more than 60 seconds passed between sending and processing the data.

You also need to keep in mind that the packet sent by the client may not reach the server for some reason. A packet sent to the client from the server may also not arrive in the expected time. Therefore, you need to call the send_queue method, which tries to resend packets that did not reach the server.

jt274 commented 1 year ago

I moved some things around and it appears working now.

I set keepalive to 120. I call check_msg and send_queue but now sleep for 60 seconds between transmissions. And I combined a few publish commands into a single publish of JSON data. No manual pinging. With this, no more constant disconnects.

For what it's worth, using the umqtt.simple library I had no issues. Watching the broker and client, messages were sent and received almost instantly, so I'm not sure how a 30 second interval could have timed out a 60 second keepalive (since it didn't with the old library). Using the simple library I wasn't using check_msg or send_queue though.

fizista commented 1 year ago

It is interesting that I am not now repeat the problem.

I have keepalive set to 11 seconds and check_msg() every 10 seconds. And everything is working.

jt274 commented 1 year ago

I had the device running for about 12 hours with no issues. Then I updated the MQTT broker software, which caused a disconnect of the clients. There was some error on the device, so I've set it to reboot when there is an exception. (If it was just an MQTT disconnect it should have reconnected). The device successfully rebooted and reconnected to the MQTT broker. But now it is having the constant disconnects and reconnects every couple minutes. No change in the code.

fizista commented 1 year ago

The solution to this problem may be this implementation (micropython-umqtt.robust2==2.2.0):

import utime
from umqtt.robust2 import MQTTClient

def mqtt_connect():
    mqtt_client = MQTTClient(mqtt_client_id, broker, 1883, mqtt_user, mqtt_pass, keepalive=60)
    mqtt_client.set_last_will(last_will_topic, 'Disconnected', retain=True)
    mqtt_client.connect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)
    return mqtt_client

def mqtt_reconnect():
    print('Could not connect to MQTT broker, reconnecting: ' + str(mqtt_client.conn_issue))
    mqtt_client.reconnect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)

try:
    mqtt_client = mqtt_connect()
    print('MQTT Connected')
except OSError as e:
    print('Error connecting to MQTT, retrying...')
    mqtt_reconnect()

while True:
    if mqtt_client.is_conn_issue():
            while mqtt_client.is_conn_issue():
                mqtt_reconnect()
    else:
        mqtt_client.publish(topic, data, qos=1)

        #########################################
        # 500ms wait for response. This depends on the delays that may occur in the network.
        for _ in range(500):
            mqtt_client.check_msg()
            mqtt_client.send_queue()
            if not mqtt_client.things_to_do():
                break
            utime.sleep_ms(1)
        #########################################

        sleep(30)
jt274 commented 1 year ago

I have updated micropython to the latest 1.20 release, and umqtt.robust2 to 2.2.0, and tried your code above. I am now getting MQTT disconnects/reconnects between every 3-15 minutes.

fizista commented 1 year ago

There is another variable that can affect the triggering of restarts (indirectly).

Set "message_timeout=2*keepalive".

jt274 commented 1 year ago

There is another variable that can affect the triggering of restarts (indirectly).

Set "message_timeout=2*keepalive".

I set that variable at the top of the program, but still getting period disconnects. The strange thing to me is that the umqtt.simple library did not have this disconnect issue.

fizista commented 1 year ago

umqtt.simple does not report a lack of response from the server for a certain period of time (sometimes it can just hang).

The error reported with umqtt.robust2 mentioned only occurs with me when restarting the mqtt server.

For the code below, in my case the connection is stable for hours. And most importantly messages for qos=1 always reach the server. If, in fact, the library is doing occasional connection restarts, it means that something with the connection is wrong.

keepalive = 60
message_timeout = keepalive * 2

def mqtt_connect():
    mqtt_client = MQTTClient(hexlify(machine.unique_id()).decode('ascii'), MQTT_BROKER_IP, 1883, keepalive=keepalive,
                             message_timeout=message_timeout)
    mqtt_client.DEBUG = True
    mqtt_client.set_last_will(last_will_topic, 'Disconnected', retain=True)
    mqtt_client.connect()
    mqtt_client.publish(last_will_topic, 'Connected', retain=True)
    return mqtt_client

start = time()

def from_start():
    ct = time()
    fs = ct - start
    # print(ct, start, fs, str(fs))
    return str(fs)

def mqtt_reconnect():
    print('Could not connect to MQTT broker, reconnecting: ' + str(mqtt_client.conn_issue))
    mqtt_client.reconnect()
    mqtt_client.publish(last_will_topic,
                        'Connected from start: %s from cpackiet: %s' % (from_start(), last_cp(mqtt_client)),
                        retain=True)
    # mqtt_client.check_msg()
    # mqtt_client.send_queue()

pk = 0

try:
    mqtt_client = mqtt_connect()
    print('MQTT Connected')
except OSError as e:
    print('Error connecting to MQTT, retrying...')
    mqtt_reconnect()

while True:
    if mqtt_client.is_conn_issue():
        while mqtt_client.is_conn_issue():
            mqtt_reconnect()
    else:
        pk += 1
        for i in range(20):
            mqtt_client.publish('test/pub',
                                'OK [%s%s] from start: %s from cpackiet: %s' % (
                                    pk, '.' * i, from_start(), last_cp(mqtt_client)),
                                qos=1)

        # 500ms wait for response. This depends on the delays that may occur in the network.
        for _ in range(500):
            mqtt_client.check_msg()
            mqtt_client.send_queue()
            if not mqtt_client.things_to_do():
                break
            utime.sleep_ms(1)

        sleep(keepalive - 1 / 2 * keepalive)
        print('.', end='')

If the connection restarts in your case are not occasional, then try to find the cause. I can't replicate this problem.