sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
392 stars 71 forks source link

Implement automatic reconnection #287

Open empicano opened 3 months ago

empicano commented 3 months ago

Hi there @frederikaalund @JonathanPlasse 😊

Frederik drafted how reconnection could look like a while ago already (thank you again, master of asyncio πŸ™πŸ˜„). I finally had some time to hack around with this! Some thoughts:

For now, I've implemented the reconnection background task and adapted the publish method to wait until the connection returns and the message could actually be published (both are probably still full of bugs). You can play around with it by shutting a local MQTT broker on and off (e.g. with ./scripts/develop) and by running:

import aiomqtt
import asyncio

async def main():
    client = aiomqtt.Client("localhost", 1883, reconnect=True, timeout=2)
    async with client:
        while True:
            await asyncio.sleep(1)
            print("Publishing message...")
            await client.publish("test", "Hello, World!")
            print("Message published.")

asyncio.run(main())

I'm still thinking about how to deal with existing subscriptions and last wills. We probably have to resend them when clean_session=True, otherwise they will cease to exist without notice to the user after a reconnection in the background.

Happy to hear what your thoughts are on this (or anyone else's who wants to chip in) 😊

frederikaalund commented 3 months ago

Thank you for looking into this! πŸ‘ Automatic reconnect is definitely on the wish list of many of our users. :)

I do like the idea to have "everything in one client" as opposed to my approach with a separate low-level and high-level client. At least the single-client approach is easier to grok for our users. πŸ‘

I looked through the code, and it looks good. I'm (as always) a little bit concerned about adding extra internal state to our client (the new reconnect task). Increases the maintenance burden. In any case, for this feature I don't think we can avoid it if we want to use a single client. πŸ˜„

Aside: There is a lot of "reset state" going on (re-creating the futures). That's not your fault (or the fault of this PR) but my own fault (using futures in the first place). πŸ˜… I strongly suggest that we look at alternatives for all the internal futures and background tasks. I'm thinking anyio (to no ones surprise I guess). :)

I'm still thinking about how to deal with existing subscriptions and last wills. We probably have to resend them when clean_session=True, otherwise they will cease to exist without notice to the user after a reconnection in the background.

That is a concern of mine as well! Indeed, we would have to save all the subscriptions inside the client and then "resubscribe" when the connection is back online. These subscriptions are even more state to manage. :)


Here is a suggestion that is a slight variation of the current approach:

This way, we get the separation of high-level and low-level and the easier maintenance that follows. It also allows allows to replace the _InternalClient with a new implementation in the future (maybe written in rust, using anyio, or something else).

Consequently, this means that all publish/subscribe/etc. calls have to be asynchronous, contrary to Frederik's original design. Instead of queuing the calls up, I'm thinking of blocking them and returning only when the connection returns and the call goes through.

I see the benefits of this approach! πŸ‘ I suggest that we do both πŸ˜„ That is, to have both async def publish() and def publish_nowait(). This mimicks how both asyncio and anyio handles the "do you want to wait or not" question. In turn, it let's the user get to decide whether they want to wait for the connection or not. :)

Again, thank you for looking into this issue and very well done on the draft implementation. πŸ‘ Let me know what you think of my comments above and do say if you have any questions. πŸ˜„

empicano commented 3 months ago

Thanks for your thoughts on this! Your reviews are one of the main reasons this project is so fun for me πŸ˜‰

You're right about the internal state getting slightly out of control. While implementing this draft I already got bitten resetting futures while they were awaited elsewhere (which raises CancelledError) πŸ˜…

Use streams and broadcast mechanisms in the new Client to implement the reconnect logic.

I'm not sure I understand this point. Do you mean using streams and broadcast for the _nowait variants like you did in your example, or actually a different way to implement the reconnection? Apart from that, I like the idea of bringing more structure into it with the low-level client class πŸ™‚

I'm on board with the publish and publish_nowait design as well πŸ‘ I can imagine how these can be implemented in the reconnection case, but I wonder how publish_nowait could work when the client shouldn't reconnect. (How) Do we fail in the case the client is disconnected? Given that we probably work through the queued messages in the background without awaiting the task anywhere, throwing an exception won't show to the user, or can we propagate it somehow?

frederikaalund commented 3 months ago

Thanks for your thoughts on this! Your reviews are one of the main reasons this project is so fun for me πŸ˜‰

Thank you for saying that. πŸ˜„ I don't have that much time these days but I do try to find it anyhow to at least do these reviews. :) It's a bit easier here during Easter.

I'm not sure I understand this point. Do you mean using streams and broadcast for the _nowait variants like you did in your example, or actually a different way to implement the reconnection? Apart from that, I like the idea of bringing more structure into it with the low-level client class πŸ™‚

I'm was leaning towards doing "streams and broadcasts" to do the reconnection itself (like in my sample code). E.g., keep the current Client class more or less as is. Mostly, I'm worried about whether we (not just you and the code in this PR but all of us maintainers) can maintain the solution going forward. It's difficult enough as it is with all the internal futures and "partial reset state". Specifically, I'm concerned whether some task may await a future (e.g., _connected or _disconnected) while another task accidentally resets it (assigns a new value to _connected leaving the task to hang forever). I have similar worries for our _lock that we manually acquire and release. Do you get me? πŸ˜„


In any case, let's be pragmatic and review our options here:

  1. Continue with the current implementation in this PR (single client with reconnect=True flag, futures, background tasks, state resets).
    • Pro: We already have most of the implementation (thanks to this PR! πŸ™)
    • Con: Client is more difficult to maintain now that the logic is more involved.
  2. Address the maintenance concerns first as another PR and build on that. E.g., split the client into a low-level and high-level implementation, and/or, use anyio instead of all the futures/asyncio.Tasks.
    • Pro: It's easier to maintain this project going forward and it's easier to add new features without accidentally breaking stuff.
    • Con: It takes a lot of time to write this. It'll delay the reconnection feature.

So in the larger perspective (time and resources being essential) I do actually lean towards option (1). Option 1 provides value here and now at the cost of future maintenance. πŸ˜„

If you agree, I think the next steps is to write out some test cases for this (to mitigate the maintenance cost). With tests in place, the pro-con calculation becomes easy since the maintenance cost goes towards zero. πŸ˜‰

Again, thank you for all the time that you put into this PR (and the aiomqtt project in general). πŸ‘ Let me know if you have any comments or questions. :)

empicano commented 3 months ago

Thanks for elaborating, I think I understand what you mean now 😊 I'll play around with streams and broadcasts to understand them a little better and see how much work the second option would bring! You convinced me that that's the better option πŸ˜„

I'll report back once I have more, could take a bit, though, as I'm busy the next few days πŸ˜‹

spacemanspiff2007 commented 2 months ago

I just stumbled across this so sorry if I am late to the party. From my experience it might be better to split the client into a low level and high level because as mentioned it makes maintenance much easier, however I have no strong opinion on that.

While looking through the PR I am not sure I understood everything correctly: It seems like publish will never return until the client is connected again possibly locking up the program. This is imho very unexpected. Here I have a small program that only publishes and I just discard all messages on disconnect. I even made publish non-blocking and even non async through the use of a Queue. On the other hand _messages still raises an error when the connection is lost. I think it would be good if the behavior of these two methods is aligned. In case of a disconnect there will be a retry every two seconds. It would be nice if there is the possibility to provide something (e.g. generator) that backs up gracefully (e.g. 2, 4, 8 ... 300) secs delay. Additionally it would be nice to have the possibility to specify an additional message that gets sent on graceful disconnect on the last will topic because last will is only used on abnormal disconnect.

I think he usage of the client is hard because there are multiple places where the disconnect error can occur. Typically I have a publish and a messages worker task and both can be the cause for a reconnect which I have to sync back to the task where I create the client and do the connect. Do you know by chance a good solution for that? I've come up with a quite complex solution, a stateful connection manager which connects and then creates the tasks. If one task throws an MqttError I cancel both tasks and try to trigger a reconnect. Example from one application: connection handler publish task messages task

Edit: I think one of the reasons why it's currently hard is that the client object can not be reused. On reconnect it's a new client and the method for processing messages has also be entered again.

empicano commented 1 month ago

Thanks for chipping in and sorry for the late reply! πŸ™‚

It seems like publish will never return until the client is connected again possibly locking up the program. This is imho very unexpected. [...] On the other hand _messages still raises an error when the connection is lost. I think it would be good if the behavior of these two methods is aligned.

I agree that publish and _messages should behave the same. If I understand you right, you're suggesting that the client throws an exception in both cases until the client has successfully reconnected in the background? Of the top of my head, I can't think of a way to know when to retry calling publish or _messages in that case without accessing internals (which I'd like to avoid). Waiting a fixed amount of time seems crude, and with the other option we could always use asyncio.timeout if we want to set an upper limit for how long we want to wait.

What benefits do you see from failing right away?

In case of a disconnect there will be a retry every two seconds. It would be nice if there is the possibility to provide something (e.g. generator) that backs up gracefully (e.g. 2, 4, 8 ... 300) secs delay.

Good idea πŸ‘

Additionally it would be nice to have the possibility to specify an additional message that gets sent on graceful disconnect on the last will topic because last will is only used on abnormal disconnect.

I noticed that, too. Currently we can only set the last will on client initialization. Maybe we can add a function to set the last will dynamically in the future.

spacemanspiff2007 commented 1 month ago

I agree that publish and _messages should behave the same. If I understand you right, you're suggesting that the client throws an exception in both cases until the client has successfully reconnected in the background? Of the top of my head, I can't think of a way to know when to retry calling publish or _messages in that case without accessing internals (which I'd like to avoid). Waiting a fixed amount of time seems crude, and with the other option we could always use asyncio.timeout if we want to set an upper limit for how long we want to wait.

Maybe expose some client status through a property - e.g. client.is_connected?

I am suggesting that publish should not block when the client is disconnected because it's very unexpected. If I have a small program likes this

while True:
    await read_sensors()
    try:
        await client.publish(...)
    except Exception:
        pass
    await write_sensor_to_local_db()

it will stop reading sensors when the client is disconnected and it's not clear at all that it will behave like that.


I think it depends on the program how publish should behave and currently I can think of three different desired behaviors

The third one is the most generic one because with it it's easy to implement the first two behaviors and I would expect a high level client to return something like that.

Maybe we can add a function to set the last will dynamically in the future.

I currently don't understand how a function will provide a message when gracefully disconnecting. Here I have a dedicated publish task and when I cancel the task (graceful disconnect) I catch that and publish the message.

empicano commented 4 weeks ago

Maybe expose some client status through a property - e.g. client.is_connected?

I'd like to avoid that, if possible. The whole internal connected / disconnected state is already all over the place, so I'm concerned that exposing this would lead to problems down the line.

I am suggesting that publish should not block when the client is disconnected because it's very unexpected.

I think I see why now. When reconnect=False, publish blocks until the message is published or fails when the connection is lost. With the initial idea and reconnect=True, publish blocks until the message is published, but doesn't fail when the connection is lost. The most intuitive design would be that publish behaves exactly the same in both cases, meaning also in the case of disconnection, right?

If we think about how an asynchronous publish and a synchronous publish_nowait with Frederik's suggestion could work in the two reconnect=True/False cases, with the above the most logical would thus be:

  1. reconnect=True/False & publish: block until the message is sent, fail on disconnection
  2. reconnect=True/False & publish_nowait: queue the message and return immediately, no failure on disconnection

However, publish_nowait doesn't make too much sense in case of reconnect=False because if that's all we use, we'll append and append messages to the queue without ever noticing that the client has disconnected. And if we don't block publish until reconnection in the reconnect=True case, publish_nowait doesn't really have a benefit any more, either.

I think you convinced me that a better design would be to implement only an asynchronous publish, but also make it fail on disconnection in the reconnect=True case. What I miss is a good way to know when to retry a failed publish in the case of reconnect=True.

I think it depends on the program how publish should behave and currently I can think of three different desired behaviors

Returning something (third bullet point) wouldn't work very well in the case of the _messages generator, so I'd focus on the second point and think about how we can achieve the first one on top.

One option would be to do the reconnect manually, so we could do await client.reconnect() to achieve something similar to client.is_connected. That would eliminate the whole discussion about publish in the reconnect=False and reconnect=True cases, because there won't be a reconnect client argument anymore. We're basically pulling the reconnection from the background into a manual function call. This seems pretty flexible.

I was in the past, and am still very against manual connect and disconnect methods, I'm not yet sure about how I feel about a reconnect method with regards to that.

This turned out to be a long rambling, I hope it's understandable πŸ˜„ Do say if something's not clear, and let me know what you think! Again, thanks for chipping in, it helps quite a lot to have other perspectives on this. You make some good points, not only here, but also in the other issues and PRs that you're involved with πŸ‘


Here I have a dedicated publish task and when I cancel the task (graceful disconnect) I catch that and publish the message.

Ah, I think I understand what you mean now πŸ‘ #28 is related, and a very interesting discussion. There already has been a solution proposed, I'm interested to hear what you think about it. Maybe you are up to do a PR? πŸ™‚

spacemanspiff2007 commented 3 days ago

The most intuitive design would be that publish behaves exactly the same in both cases, meaning also in the case of disconnection, right?

This is not very nice but not my main issue. With reconnect=True the application will be stuck in await client.publish() until the client is connected again. In my example above the program also writes the sensor values to a database. If the broker goes down all values will be lost until the broker is up again and there will be a gap in the db values. This behavior is very unexpected and it's not clear that this will happen from looking at the code snippet.

However, publish_nowait doesn't make too much sense in case of reconnect=False because if that's all we use, we'll append and append messages to the queue without ever noticing that the client has disconnected.

I would have expected publish_nowait to discard messages when the client is not connected. On disconnect the queue should be emptied. In the sml2mqtt program I implemented it an same manner: publish only works when the client is connected and on disconnect the queue is discarded. A behavior like this would make small programs that continuously collect data very easy to implement.

Returning something (third bullet point) wouldn't work very well in the case of the _messages generator, so I'd focus on the second point and think about how we can achieve the first one on top.

Could you please elaborate why this would be an issue? Currently I fail to see how the publish call interferes with messages which is used to process incoming messages. I still think from a usability perspective the is the best option because it covers all use cases and makes it very clear from the calling code what is happening:

# create obj
msg = client.publish(...)

msg.cancel_publish()

# status or flags
msg.status
msg.is_published

# wait for publish
await msg.published()
await msg.published(reconnect=True)  # Could be on a per message basis

One option would be to do the reconnect manually, so we could do await client.reconnect() to achieve something similar to client.is_connected. That would eliminate the whole discussion about publish in the reconnect=False and reconnect=True cases, because there won't be a reconnect client argument anymore. We're basically pulling the reconnection from the background into a manual function call. This seems pretty flexible.

What would your goal be with that and can you make a pseudo example? Do I as a caller still have to catch an error and manually call reconnect? Do I have to do it both in publish and messages?

Again, thanks for chipping in, it helps quite a lot to have other perspectives on this. You make some good points, not only here, but also in the other issues and PRs that you're involved with πŸ‘

Thanks - that means a lot!