eclipse-paho / paho.mqtt.c

An Eclipse Paho C client library for MQTT for Windows, Linux and MacOS. API documentation: https://eclipse.github.io/paho.mqtt.c/
https://eclipse.org/paho
Other
1.98k stars 1.1k forks source link

async_client: unblock delivery_token::wait_for() during shutdown #1057

Open gambels opened 3 years ago

gambels commented 3 years ago

Hello,

I'm using the async_client from paho-cpp in my application, which is publishing messages in several threads. To check, if the message has been delivered, the wait_for() function is used.

Now, I want to shutdown the application immediately without waiting for any timeouts.

Is it possible to unblock the delivery_token::wait_for(long timeout) before the function has timed out, respectively to shutdown the client in a orderly state, which will call immediately all the registered delivery token handler?

I would be very pleased to get any hints on whether this is possible.

Best Regards, Sven

icraggs commented 3 years ago

Why is it taking a long time? Is there still activity going on? Have you tried disconnecting or is it disconnected already?

fpagliughi commented 3 years ago

My reading is that there are still messages in-flight, but there is a desire to shut down the client before those operations complete.

The function delivery_token::wait_for(timeout) blocks on an internal condition variable in the token which gets signaled when on_success() or on_failure() is called.

So my assumption is that there are one or more threads blocked waiting for delivery tokens to complete (i.e. PUBACKs to be received) and that @gambels wants them all to be signaled simultaneously when another thread decides to shut down the client. I'm assuming that they would all complete via on_failure() with a return code of MQTTASYNC_DISCONNECTED or similar.

What happens when a thread calls MQTTAsync_disconnect() using a timeout in the MQTTAsync_disconnectOptions struct and the timeout expires? Does the library call on_failure() for all the pending tokens?

I can't recall if a zero (0) timeout has special meaning (wait forever?), or if it literally means no timeout.

gambels commented 3 years ago

Yes @fpagliughi is totally right.

My application waits for the delivery of a message by calling the blocking function _delivery_token::waitfor(timeout). Currently the timeout is set to 300s.

Now, there is a use case to immediately shutdown the application in a proper way. Unfortunately the application has to wait until the timeout of 300s has been timed out.

I already tried to disconnect the connection using a timeout. But the _delivery_token::waitfor(timeout) function still blocks.

So it would be nice to notify all registered delivery tokens respectively conditional variables, as already mentioned by @fpagliughi, with a return code of MQTTASYNC_DISCONNECTED.

icraggs commented 3 years ago

waitForCompletion does check that the client is still connected, so a disconnect should work.

Taking a client library trace, as described in the readme (at minimum level), might help.

fpagliughi commented 3 years ago

Unfortunately, the C++ and Rust libraries do not use waitForCompletion() to block on the operations. They expect the callback onSuccess or onFailure.

What I'm suggesting is that the C lib call onFailure for each of the in-flight operations when disconnecting (i.e. when the disconnect timeout expires). From the application standpoint I think that makes sense (even for pure C apps using the library).

But I don't want it to have additional, unintended, consequences like affecting persistence or anything like that. Just call the failure callback(s) if registered.

icraggs commented 3 years ago

Yep, that's what it does, or is supposed to do. Except if you connect cleansession=false, then the onFailure callback isn't called until destroy() because you might reconnect and complete the exchanges.

So a trace will help figure out what's going on.

(You mean you got me to add waitForCompletion, the only blocking call in the library, and you didn't use it! :-)

fpagliughi commented 3 years ago

In the Spring of 2014, while laid up with a shattered ankle from a snowboarding accident, I wrote a complex microservices and RPC messaging system for use inside an IoT gateway... using Paho C directly. As I mentioned, I've never been able to use my own C++ library for a large project!

Having waitForCompletion() for that project was essential, since the services were single-threaded. I thank you for it!

And I still maintain that it makes the API more complete and directly useful especially for those who want asynchronous behavior without managing thread safety and callbacks on their own.

icraggs commented 3 years ago

It still seems odd to me for an asynchronous API, but I'm glad it was useful!

gambels commented 3 years ago

Maybe I miss some hint regarding my test environment.

To test if it is possible to get immediately unblock while waiting for completion by using the _delivery_token::waitfor(timeout) function, I put my broker into sleep before shutting down my application.

This is why my mqtt client disconnect timed out and the _waitfor() is still blocking, because the mqtt state should be still connected. Same issue, if I would just unplugged the ethernet cable.

Nevertheless even under this circumstances (disconnect timed out) it would by desirable from my point of view to call any registered delivery tokens with some kind of error code.

I'm not sure, if calling destroy() maybe a proper way in this situation. Indeed destroy() is automatically called in the paho-cpp async client destructor. So I wanted to avoid calling it twice.

I'm going to take a client library trace and will make this available here...

But regardless of that thank you very much for your feedback so far...

icraggs commented 3 years ago

If you have connected cleansession=false, then a disconnect is merely an interruption in the TCP connection. There is no way of telling that you want to reconnect now, or later after recreating the application. Destroy() is that currently that indication.

icraggs commented 3 years ago

@fpagliughi we might need a close function here (or a similar name) to allow the client object to be cleaned up including callbacks for any outstanding messages before the destroy is called.

fpagliughi commented 3 years ago

Oh, apologies, I never saw the last few messages here.

That's interesting about Destroy(). I don't think I ever realized the distinction. @icraggs So does destroy() fire off the callbacks for any in-flight messages? And, if so, is it a synchronous call? Meaning, does it complete all the callbacks before returning? If not, there's probably a race condition at the end of a lot of programs!

The C++ and Rust wrappers automatically call destroy(), or more precisely MQTTAsync_destroy(), when going out of scope. But even if that completed the C++ tokens, it still creates a chicken-and-egg problem... you want to spin up the service threads after creating the client, but shut them down cleanly before destroying the client, so I don't thing the client object destruction is the best way to signal threads that are using that client!

Would a new flag in the disconnect options be worth considering? Like "shutdown" or "close", or similar? I think I'd prefer that over adding another function.

But either way, when the client is shutting down and/or being destroyed, when a callback is called (I assume onFailure), what return code is used? MQTTASYNC_DISCONNECTED? Or should we use a new one, like MQTTASYNC_SHUTDOWN?

icraggs commented 3 years ago

Yes, destroy does that if the session hasn't been closed before. If you connected with cleansession=true, then all the close session processing will be done on disconnect. But you can't do that for cleansession=false.

So I suggest we need a closesession function/method. Having an associated return code of MQTTASYNC_CLOSESESSION would seem like a good idea.