micropython / micropython

MicroPython - a lean and efficient Python implementation for microcontrollers and constrained systems
https://micropython.org
Other
19.21k stars 7.7k forks source link

esp8266: Implement MQTT client support #2055

Closed pfalcon closed 2 years ago

pfalcon commented 8 years ago

The basic plan is: start with implementing in Python, if places requiring optimization are spotted, implement them in C.

The reference server/broker: Mosquitto 1.4.8 http://mosquitto.org/download/ (implemented in C). Built from source without changes. Run as ./mosquitto -v. Default builtin configuration has on-disk persistence disabled. It needs to be enabled for client stored sessions (the basis of efficient robust (re)connection handling) to work as expected. So, have a file mosquitto.conf:

persistence true

And run as: ./mosquitto -c mosquitto.conf -v. The database file will be mosquitto.db in the same dir as executable. Expected output on start followed by SIGTERM:

1464389170: mosquitto version 1.4.8 (build date 2016-05-08 21:09:19+0300) starting
1464389170: Config loaded from mosquitto.conf.
1464389170: Opening ipv4 listen socket on port 1883.
1464389170: Opening ipv6 listen socket on port 1883.
^C1464389177: mosquitto version 1.4.8 terminating
1464389177: Saving in-memory database to mosquitto.db.
pfalcon commented 8 years ago

Finding a decent Python MQTT client to reference API to is a challenge. Google doesn't know about anything but xxx-ported-from-javascript Paho. Searching directly on PyPI gives more hits of cause, but finding something decent is still a problem. Here's at least asyncio-based impl: http://hbmqtt.readthedocs.io/en/latest/references/mqttclient.html .

The question which interests me though is as simple whether to use nice short .pub() or full .publish().

pfalcon commented 8 years ago

Initial implementation pushed as https://github.com/micropython/micropython-lib/tree/mqtt/umqtt (branch rebased). Thinsg to decide are as usual division of functionality, structure, naming. I'd like to add auto-reconnect support, and that may be too bloated for simple client, so would be a separate module with inheriting class => need to switch to package structure. Maybe also publish-only and subscribe-only clients, but that seems like spreading too thin (publish-only is ~5 lines of socket.read/write).

Tested on linux. Reliable working on esp8266 would depend on https://github.com/micropython/micropython/issues/2056

mkarliner commented 8 years ago

Where is a good place to put my 2 cents worth on MQTT?

slzatz commented 8 years ago

A few quick comments:

In the example, the order for server and id is reversed from the mqtt module where client id comes first.

Also, at least under python 3 (can't test under micropython right now) - the various strings needed to be encoded into bytes so for example:

self.client_id = client_id.encode('utf-8')

and later:

self.send_str(topic.encode('utf-8'))
self.sock.send(msg.encode('utf-8'))
mkarliner commented 8 years ago
mkarliner commented 8 years ago

Working version of example client: from uqmtt import MQTTClient import time

c = MQTTClient("testclient", "192.168.10.159") c.connect() c.publish("foo", "hello") c.subscribe("foo") while 1:

print(c.wait_msg())

print(c.check_msg())
time.sleep(1)

c.disconnect()

Note that it is still has a wrong name for the module (IMHO) of uqmtt

pfalcon commented 8 years ago

In the example, the order for server and id is reversed from the mqtt module where client id comes first.

Tell that there were last-minute changes ;-). Fixed.

Also, at least under python 3 (can't test under micropython right now) - the various strings needed to be encoded into bytes

Nope, instead bytes should be used right away ;-). But MicroPython has relaxed stance on bytes vs str interoperatability, so one can be passed where another is expected (as string are guaranteed to be utf-8). That's of course not CPython compatible, I discourage everyone else from doing that :-P.

pfalcon commented 8 years ago

I don't think that publish or subscribe only clients are that much use.

Contrary, I don't need my button to subscribe to anything and my temperature display to publish anything. esp8266 devices in particular would largely be used these ways (there's simply not enough memory to bloat them to do both).

I regard auto reconnect as basic, otherwise I just land up implementing it myself every time.

Contrary, normal way to use MQTT is to connect-publish-disconnect. Subscribers are order(s) of magnitude less numbered, and usually belong to more powerful devices (for example, temperature display mentioned above is likely a web page [open on an android device nailed to the wall]). How they handle reconnect is outside of the topic of this ticket.

mkarliner commented 8 years ago

I certainly don't want a flame war, but I have to disagree about the 'normal' use of MQTT. If connect publish disconnect were the norm, we wouldn't have keep alive times or last will and testament. MQTT is connection based and ( I believe) meant to be used as such. MQTT-SN is the variant specifically designed to use connectionless UDP.

slzatz commented 8 years ago

Because the last 4 bits of the initial byte of a message from a subscribed topic can be set or not (relating to things like retain and QoS), you may want to generalize the assertion in wait_msg to

assert res[0]&240 == 48
slzatz commented 8 years ago

Or probably clearer:

assert res[0] >> 4 == 3

Since the documentation will say that Publish (sent or received) has a control value of 3.

slzatz commented 8 years ago

Some suggestions for wait_msg. If we don't want to deal with QoS > 0 then I believe that you still want to pick up messages the come with the retain flag set, which modified the assertion below. This version also handles messages longer than 127 bytes by looking for a continuation (i.e., first byte is greater than 127) and reading the next byte if it is there and then using it in the calculation of remaining length (var sz).

def wait_msg(self):
    res = self.socket.read(1)
    if res is None:
        return None
    self.sock.setblocking(True)
    # below catches a message with a retain flag
    assert res == b"\x30" or res==b"\x31"
    sz = self.sock.recv(1)[0]
    if sz > 127:
        sz1 = self.sock.recv(1)[0]
        sz = sz1*128 + sz - 128
    s = self.sock.recv(sz)
    topic_len = s[:2]
    topic_len = (topic_len[0] << 8) | topic_len[1]
    topic = s[2:topic_len+2]
    msg = s[topic_len+2:]
    return (topic, msg)
dpgeorge commented 8 years ago

Thinsg to decide are as usual division of functionality, structure, naming. I'd like to add auto-reconnect support, and that may be too bloated for simple client, so would be a separate module with inheriting class => need to switch to package structure. Maybe also publish-only and subscribe-only clients, but that seems like spreading too thin (publish-only is ~5 lines of socket.read/write).

I'm +1 on calling the methods .publish() and .subscribe(). I'd say keep both these methods in the basic client class. A separate derived class with auto-reconnect might be a good idea if such functionality requires many lines of code.

mkarliner commented 8 years ago

On a more general note, given that MQTT was the top rated goal, it would be nice to have a statement of intent about the level of coverage for MQTT. Personally, I'd accept that it may not be possible to implement the entire 3.1 client spec, but it would be good to have a list of intended implemented features (or rather non-implemented features). For instance, I see that there is another discussion about QoS > 0.

pfalcon commented 8 years ago

@slzatz: Thanks, all those are of course things to work on, with things like https://github.com/micropython/micropython/issues/2056 and https://github.com/micropython/micropython/issues/1814 being the next steps however.

pfalcon commented 8 years ago

@mkarliner : Feel free to drive that discussion, and not be surprised (or discouraged) if there's lack of critical mass to keep it up - some time may be needed to gather it.

slzatz commented 8 years ago

@pfalcon -- makes sense that those issues need to be addressed. Nice that even the initial client module works and is usable for basic stuff. @mkarliner - my vote would be to keep the implementation pretty lean but would definitely add a ping method but let the user worry about when to call it.

pfalcon commented 8 years ago

See the updated starting message for important information about running mosquitto with persistent session store enabled.

pfalcon commented 8 years ago

There otherwise were (and probably will be) updates to the branch mentioned above. The basis for auto-retries will be server-side persistent sessions (clean_session=False).

pfalcon commented 8 years ago

Ok, per the plans above, "umqtt" is a package now, with 2 submodules: umqtt.simple and umqtt.robust. There're individual commits so far to let interested parties track step-by-step progress, by everything will be squashed together soon.

mkarliner commented 8 years ago

@pfalcon Just to report a 'non-issue'. My first smoke test on the publish side has now sent 2.7M messages today at full tilt without any crashes. Good stuff!

Ignorant question: Can I use asyncio safely with mqtt, ie: publish and subscribe on different tasks?

pfalcon commented 8 years ago

My first smoke test on the publish side has now sent 2.7M messages today at full tilt without any crashes.

On what hardware?

Can I use asyncio safely with mqtt, ie: publish and subscribe on different tasks?

Do you mean CPython's asyncio or MicroPython uasyncio? Well, the "umqtt" module discussed here isn't compatible with CPython (that's by intent, as esp8266 port for which it's intended is rather low on memory, so any compatibility tricks are expensive). And this module is intended for standalone usage, it won't work with uasyncio. (Any good I mean, it uses blocking I/O, so while it waits for any I/O completion, no other task can run.)

mkarliner commented 8 years ago

As I say, it was just a smoke test, but the test rig was as follows. nodemcu 0.9, mosquitto 1.3 on raspberry pi 3 (jessi). Client on the other end was just mosquitto_sub.

micropython code as follows.

import time
from simple import MQTTClient

# Test reception e.g. with:
# mosquitto_sub -t foo_topic

count = 0
mstimer = 0

c = MQTTClient("umqtt_client", "192.168.10.159")
c.connect()
#time.sleep_ms(100)

while True:
    for x in range(1, 1000):
        c.publish(b"foo_topic", "hello {}, {}".format(count, x))
        time.sleep_ms(mstimer)
        # c.disconnect()
    count +=1
    if(mstimer > 1):
        mstimer -= 1
pfalcon commented 8 years ago

Further design discussion:

Thinking of a good way to support publishing and subscribing at the same time, I come to conclusion that the "easiest" way to deal with subscribed message is to feed them to a user callback after all. That's because subscribed message can arrive "out of band" at any time. For example, with PUBLISH qos>0, a .publish() function may wait for PUBACK message, and instead get subscribed message. The only alternative to passing it to callback would be queuing it.

The simplicity of callback approach is illusory of course. For example, if a callback will decide to publish with qos>0, then callback may be easily called recursively and must be reenterable. And if there's any hiccup, there may be growing recursion depth, leading to stack overflow and application abort. Compare that with a queuing approach, where queue depth can be much easily controlled.

Writing all this, I think about possibility mentioned above - move all the complexity to a server, where it belongs when we talk about simple resource-constrained endpoint devices. And that's supporting either publishing or subscribing clients. If an application needs both, well, it's welcome to instantiate both. It will need 2 MQTT client_id's and there will be 2 connections open to server. But they will save MQTT client from the need to demultiplex incoming messages - and such demultiplexion costs memory, as you need to store "unexpected" messages. With separate connections, all the flow control is handled by TCP.

Comments welcome, but I love this last solution, so it has high chance of being adopted ;-).

pfalcon commented 8 years ago

As I say, it was just a smoke test, but the test rig was as follows. nodemcu 0.9, mosquitto 1.3 on raspberry pi 3 (jessi).

I'm still not sure - do you run test on ESP8266?

To make program, etc. code readable, you need to enclose it in 3 tildes up and down (other people use 3 single quotes):

~~~
<stuff here>
~~~
pfalcon commented 8 years ago

I'm still not sure - do you run test on ESP8266?

And if yes, how much real time it took to send those 2.7M messages?

mkarliner commented 8 years ago

@pfalcon , Addressing your comments in order:

The idea of an MQTT server with either publish or subscribe clients seems a very reasonable one, and I accept that it may be a good solution, however.... taken in the context of the larger MQTT ecosystem, I do see problems. Many of the IoT platform require specific client ids, which have to be pre-registered, which means twice the admin on the platform end, and with IBM Bluemix, Amazon etc, that can be a pain. However, that is just nuisance and does not invalidate the solution. Of more concern are the platforms that have a one-to-one mapping between a client_id and a 'device', ie: https://www.opensensors.io. In this case two client id's may cause a real issue about management. Of course, this is not anything to do with the actual MQTT spec, but it is to do with the real world, so I think its valid to at least bring it up.

I'm probably being a bit of a pain in the ass, but IMHO I think that micropython is a serious platform and, as far as possible, be suitable for real, industrial use, which includes taking the major platforms' idiosyncrasies into consideration.

The code I posted was what I ran on the ESP. I saw your release while I was on holiday and spent the flight back starting to plan a series of tests. I this case, I started with your publish example, and put it in a loop with some delays, and then tried decrementing the delay, expecting some sort of buffering problem, which I'm impressed to report didn't happen. I wasn't doing any formal timing, especially as the current module prints debug messages to the repl on publish, which I assume slows it down a fair amount. I think the run took around 5 hours.

slzatz commented 8 years ago

What is working for me is to expand check_msg a bit so it handles more than receiving a message on a subscribed topic. So for example below, since I am sending a periodic ping, it's also looking for a pingresp -- if it gets something that isn't a published msg or a ping response it just returns the raw response to the calling program that runs a loop that calls check_msg and does whatever it needs to based on what is returned. I realize this probably isn't a generalizable solution but it has been very stable and at the least accounts for a ping response, which is essential if you are subscribing to topic that is low volume and you want to maintain the connection.

def check_msg(self):
    self.sock.setblocking(False)
    res = self.sock.read(1)
    if res is None:
        return None

    #if res[0] >> 4 !=3: #more general but not handling QoS > 0 right now
    if res[0] in (48,49):
        self.sock.setblocking(True)
        sz = self.sock.recv(1)[0]
        if sz > 127:
            sz1 = self.sock.recv(1)[0]
            sz+= (sz1<<7) - 128

        z = self.sock.recv(sz)
        # topic length is first two bytes of variable header
        topic_len = z[:2]
        topic_len = (topic_len[0] << 8) | topic_len[1]
        topic = z[2:topic_len+2]
        msg = z[topic_len+2:]
        return (topic, msg)

    elif res[0]>>4==13:
        self.sock.setblocking(True)
        s = self.sock.recv(1) # second byte of pingresp should be 0
        return 13

    else:
        return res

def ping(self):
    pkt = bytearray([0b11000000,0x0])
    self.sock.send(pkt)
mkarliner commented 8 years ago

If I understand you, check msg now gives me all incoming messages, and its up to the client to do a filter. That seems acceptable as a way of avoiding implementing internal queuing and not using two client ID's. How about a higher level method over the top of check_msg that does what it used to do (ie: just return incoming messages) and/or implement queuing as an option, which will work for the majority of low bandwidth, but bi-directional apps?

We make an MQTT client GUI design app called ThingStudio, http://www.thingstud.io.

I don't know it it will help the design process but, here are a couple of our real use cases for reference.

StateSliderSwitch: This is a control typically used for say a light switch. On rendering the control issues a reqStatus message to ask the remote light switch to report its status (on/off). When it does so the slider goes to either the on or off position. When clicked on, the slider issues a toggle message, which makes the remote switch change its current value and report it. Thus multiple clients (that is to say GUI's) will remain in sync and show the correct value of the switch which also resolves race conditions.

ContinuousSlider: This is a slider control that emits changeValue commands continuously as the control is being moved. It's typically used when you are say, positioning a mechanical device, maybe a window or robot arm. It might emit up to 10 messages a second, but they would be fairly small, if messages are dropped by the arm, it is not critical, so long as the final message is kept.

Hope that helps.

cheers Mike

On 30/05/2016 23:19, slzatz wrote:

What is working for me is to expand check_msg a bit so it handles more than receiving a message on a subscribed topic. So for example below, since I am sending a periodic ping, it's also looking for a pingresp -- if it gets something that isn't a published msg or a ping response it just returns the raw response to the calling program that runs a loop that calls check_msg and does whatever it needs to based on what is returned. I realize this probably isn't a generalizable solution but it has been very stable and at the least accounts for a ping response, which is essential if you are subscribing to topic that is low volume and you want to maintain the connection.

|def check_msg(self): self.sock.setblocking(False) res = self.sock.read(1) if res is None: return None #if res[0] >> 4 !=3:

more general but not handling QoS > 0 right now if res[0] in (48,49):

self.sock.setblocking(True) sz = self.sock.recv(1)[0] if sz > 127: sz1 = self.sock.recv(1)[0] sz+= (sz1<<7) - 128 z = self.sock.recv(sz) # topic length is first two bytes of variable header topic_len = z[:2] topic_len = (topic_len[0] << 8) | topic_len[1] topic = z[2:topic_len+2] msg = z[topic_len+2:] return (topic, msg) elif res[0]>>4==13: self.sock.setblocking(True) s = self.sock.recv(1) # second byte of pingresp should be 0 return 13 else: return res def ping(self): pkt = bytearray([0b11000000,0x0]) self.sock.send(pkt) |

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/micropython/micropython/issues/2055#issuecomment-222560798, or mute the thread https://github.com/notifications/unsubscribe/ABkxYopcf9CcDqsD_edeug_gXr6Z-XNxks5qG2JwgaJpZM4IZsRf.

slzatz commented 8 years ago

If I understand you, check msg now gives me all incoming messages, and its up to the client to do a filter.

That's the idea. The main program runs a while loop and calls check_msg as frequently as it needs to and deals with whatever response it gets. check_msg can do a little processing on some key types of messages but if it's not a message that it recognizes, it just returns whatever it received and the main loop can either try to determine what the message was or ignore it. This may be too much of a hack for an "official" micropython program but just wanted to point out that it's working for my purposes.

pfalcon commented 8 years ago

I'm probably being a bit of a pain in the ass, but IMHO I think that micropython is a serious platform and, as far as possible, be suitable for real, industrial use, which includes taking the major platforms' idiosyncrasies into consideration.

No, and I very much appreciate both your attention to MicroPython in general and comments on this topic in particular. But as you note yourself, issues you raise are at best tangent to MicroPython. And we have core MicroPython issues. The meta issue is that we're doing "impossible" thing of trying to run very high-level language on resource-constrained devices. It's "impossible" because the way VHLLs are used on "big" machines isn't productive or even enabling to their usage on constrained devices. Typical anti-patterns (for constrained devices again) are over-abstraction ("it's already high-level, so let's make it even higher") and creeping featuritis ("resources are free and unlimited, so let's stuff more features - because we can").

MQTT support is currently developed for ESP8266, which is very resource-limited system (as for system which does networking). Saying "I want a library to provide more features" is equivalent to saying "I want other people not to be able to use it in real applications". Because the equation is simple: A = R - S, where A is user app resources, S are resources taken by system (including libraries), R is overall resources, a very small number.

It's not speculation, it's real. Here's example for much bigger (than esp8266) system: https://github.com/micropython/micropython/issues/2057 . ESP8266: http://forum.micropython.org/viewtopic.php?f=16&t=1876 , https://www.kickstarter.com/projects/214379695/micropython-on-the-esp8266-beautifully-easy-iot/posts/1568461 (there were more, it's something I could easily find now). You yourself unlikely would be happy to find out that esp8266 can't run any real MQTT app beyond your smoke test.

So, people ask to optimize memory usage, and that's what we will optimize for, not for ease of management with IBM Bluemix. One thing is however top-level requirement - try to do as much as possible in Python, because Python code is easy to tweak and extend. The current topic of work is however to provide the most minimal implementation which still can do useful things. A wireless button and a wireless indicator (publish-only and subscribe-only) are pretty useful, and again, we shoot for being able to do "a wireless sensor" (not just a button) and "a wireless display" (not just a LED). Once that aim is achieved, further aims can be set (likely, by the community, because MQTT is just one task of many, so we need to deliver MQTT support v1.0 and move on to other tasks).

Btw, you can read more of MicroPython development philosophy in https://github.com/micropython/micropython/wiki/ContributorGuidelines

pfalcon commented 8 years ago

This may be too much of a hack for an "official" micropython program but just wanted to point out that it's working for my purposes.

Indeed, it is. There's not much of an "MQTT client" then, your app is MQTT client. Indeed, that's possible - it's well-known that MQTT is rather simple protocol (well, it has many standalone-useful simple subsets). But that's against the idea of a module which abstracts away MQTT protocol details and provides a generic PubSub API to a user.

pfalcon commented 8 years ago

Ok, I guess it moves to finalization. I didn't go for splitting pub/sub after all, instead switched subscription processing to callback-based design. Also, wait_msg() actually started to return type of message it couldn't handle itself, but that's internal implementation detail. QOS 1 implemented and works in my testing. After looking at QOS 2, I "remembered" that its usecase is that "exactly once delivery", I somehow lately was thinking that it's to provide detailed reporting to a client of "message received" vs "message actually processed". MQTT spec explicitly says that MQTT signalling alone is not enough to convey "message actually processed" (that would be done by a separate application-level message). So, it explains why many projects ignore QOS 2, and I'm going to do the same, at least for now, and from purely resource limits, as MQTT client is already 5K of Python source.

It's still not rebased so far to easy review, but will be soon. Comments are welcome.

pfalcon commented 8 years ago

The branch was squashed, with parts moving to their intended locations.

pfalcon commented 8 years ago

I performed testing on ESP8266, everything works as expected so far, so I'm merging implementation to micropython-lib master.

koniarik commented 8 years ago

Hi folks, how about putting "ussl" in? (micropython module for TLS). While esp8266 for example still can't check certificates, it's stil nice to have this prepared for use.

I suppose it's just about wrapping entire thing into socket during "connect" part and adding connect option?

swkim01 commented 8 years ago

With offcially released firmware I tested MQTT module on nodemcu 1.0, then I had problem. Calling publish one by one is good, receiving the message by the mosquitto broker on my raspberrypi. However, whenever I try to call publish repeatedly in a while loop, the following error has beed occurred after normally sending the message several times.

>>> while True:
...   c.publish(b"home/temperature", str(count))
...   count+=1
...   time.sleep(2)
...
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "umqtt/simple.py", line 67, in publish
OSError: [Errno 104] ECONNRESET

Then I had to call connect by c.connect() in order to publish again.

peterhinch commented 8 years ago

Is it possible that you have WiFi connection issues? This (very hacky) test publishes every 5 seconds and is currently on 1444 repetitions. Try trapping OSError and re-establishing the connection:

import time
import ubinascii
from simple import MQTTClient
from machine import Pin, unique_id
SERVER = "192.168.0.23"
CLIENT_ID = ubinascii.hexlify(unique_id())
TOPIC = b"huzzah"

def main(server=SERVER):
    c = MQTTClient(CLIENT_ID, server)
    c.connect()
    print("Connected to %s, waiting for timer" % server)
    fail = False
    count = 0
    while True:
        count += 1
        time.sleep_ms(5000)
        print("Time to publish")
        try:
            if fail:
                print('Attempt to reconnect')
                c.connect()
                print('Reconnected to Huzzah')
        except OSError:
            print('Reconnect fail')
        try:
            c.publish(TOPIC, ('count '+str(count)).encode('UTF8'))
            print('Publish')
            fail = False
        except OSError:
            fail = True

    c.disconnect()
swkim01 commented 8 years ago

@peterhinch

Is it possible that you have WiFi connection issues? This (very hacky) test publishes every 5 seconds and is currently on 1444 repetitions. Try trapping OSError and re-establishing the connection:

Great! It works! Thank you so much. Now I can publish messages from dht sensor to my home-assistant home server.

peterhinch commented 8 years ago

As this issue is likely to crop up again I've put a tidier version of the code in the forum for general information or comment.

pfalcon commented 8 years ago

@peterhinch : If you didn't read the discussion in this ticket, robust connections are provided by umqtt.robust module.

rudydevolder commented 8 years ago

Somebody who can help me? I just downloaded and tried the examples: When I do c.publish(b"foo_topic", b"hello") I got the error-message: TypeError: function takes 2 positional arguments but 3 were given HUH?!?

Everything else like subscribing works.

danicampora commented 8 years ago

I'm not familiar with the lib, but the error indicates that the method is static, and you are calling it on an instance of the class, which is passing 'self' implicitly.

On Aug 7, 2016, at 3:45 PM, Rudy De Volder notifications@github.com wrote:

Somebody who can help me? I just downloaded and tried the examples: When I do c.publish(b"foo_topic", b"hello") I got the error-message: TypeError: function takes 2 positional arguments but 3 were given HUH?!?

Everything else like subscribing works.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub, or mute the thread.

peterhinch commented 8 years ago

It is a method of the MQTTClient class so the code posted by @rudydevolder should work. It does here with the same calling pattern.

dpgeorge commented 2 years ago

The umqtt library has been around for a while now, and this issue is no longer active. So closing.