peterhinch / micropython-mqtt

A 'resilient' asynchronous MQTT driver. Recovers from WiFi and broker outages.
MIT License
549 stars 116 forks source link

async `subs_cb`? #48

Closed jdtsmith closed 3 years ago

jdtsmith commented 3 years ago

Presumably it's very common to publish an MQTT message in response to receiving one (after changing some internal state, for example). Since publish is async, I was surprised that subs_cb is not async, so that you cannot await anything that publishes.

I presume a workaround is something like:

def cb(client):
    uasyncio.create_task(some_async_function())

but then you can't wait, catch exceptions, etc.

kevinkk525 commented 3 years ago

Async callbacks "subs_cb" have been discussed already: https://github.com/peterhinch/micropython-mqtt/issues/8

so that you cannot await anything that publishes. I presume a workaround is something like:

That is the correct way to handle it. Either create a new async task for each incoming message or put them on a queue and start a consumer task.

but then you can't wait, catch exceptions, etc.

Not sure where you can't wait but you can definitely catch exceptions because that's what your "some_async_function()" can do. It can catch and process exceptions, just as you could in "subs_cb".

jdtsmith commented 3 years ago

Thanks. I should have said you can't conveniently await an async function directly in the callback. For example, suppose you need to publish two messages, one after the other, in your callback.

My solution using create_task is of the "fire and forget" flavor. I guess the right approach for full functionality would be something like:

def cb(client):
    args=...
    queue.put(lambda: some_async_function(*args))
    queue.put(lambda: some_other_async_function(*args))

async def run():
    while True:
        perform_sync_checks()
        ...
        await asyncio.sleep_ms(loopDelay) #say 50ms

async def consume():
    while True:
        result = await queue.get()  # Blocks until data is ready
        await result()

def start():
    global queue = Queue()
    asyncio.create_task(consume())
    asyncio.run(run())
    ...
kevinkk525 commented 3 years ago

Why would you need to publish 2 messages in your callback? Doing anything in the callback is a bad idea because it blocks the processing of further mqtt messages. If you are interested in a complex example for processing mqtt messages, have a look at my mqtt handler: https://github.com/kevinkk525/pysmartnode/blob/a0998ad6582a28fe5a0529fb15dd4f61e254d25f/pysmartnode/networking/mqtt.py#L400

I don't understand what you are trying, so I'm kind of fishing blind.. (But since this isn't my repo, maybe @peterhinch understands your intentions)

jdtsmith commented 3 years ago

I can think of many scenarios where callbacks would result in multiple published messages or other asynchronous actions you’d like to wait on. But I do now understand the harm in doing that waiting in the callback itself. If you can sketch a concrete and simple suggested workaround, that may be useful for anyone in this situation who finds themselves reading here.

peterhinch commented 3 years ago

Another approach would be to have the callback set an Event. The publish task would wait on the Event.

jdtsmith commented 3 years ago

Good idea thanks. Is there an example implementing this approach anywhere?